957aa4b9cb967c50ebc78feb1dce44ffd6b36f76
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsFalse);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. It might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
485 // don't want it set when not running a Haskell thread.
486 cap->r.rCurrentTSO = NULL;
487
488 // And save the current errno in this thread.
489 // XXX: possibly bogus for SMP because this thread might already
490 // be running again, see code below.
491 t->saved_errno = errno;
492 #if mingw32_HOST_OS
493 // Similarly for Windows error code
494 t->saved_winerror = GetLastError();
495 #endif
496
497 if (ret == ThreadBlocked) {
498 if (t->why_blocked == BlockedOnBlackHole) {
499 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
500 traceEventStopThread(cap, t, t->why_blocked + 6,
501 owner != NULL ? owner->id : 0);
502 } else {
503 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
504 }
505 } else {
506 traceEventStopThread(cap, t, ret, 0);
507 }
508
509 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
510 ASSERT(t->cap == cap);
511
512 // ----------------------------------------------------------------------
513
514 // Costs for the scheduler are assigned to CCS_SYSTEM
515 stopHeapProfTimer();
516 #if defined(PROFILING)
517 cap->r.rCCCS = CCS_SYSTEM;
518 #endif
519
520 schedulePostRunThread(cap,t);
521
522 ready_to_gc = rtsFalse;
523
524 switch (ret) {
525 case HeapOverflow:
526 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
527 break;
528
529 case StackOverflow:
530 // just adjust the stack for this thread, then pop it back
531 // on the run queue.
532 threadStackOverflow(cap, t);
533 pushOnRunQueue(cap,t);
534 break;
535
536 case ThreadYielding:
537 if (scheduleHandleYield(cap, t, prev_what_next)) {
538 // shortcut for switching between compiler/interpreter:
539 goto run_thread;
540 }
541 break;
542
543 case ThreadBlocked:
544 scheduleHandleThreadBlocked(t);
545 break;
546
547 case ThreadFinished:
548 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
549 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
550 break;
551
552 default:
553 barf("schedule: invalid thread return code %d", (int)ret);
554 }
555
556 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
557 scheduleDoGC(&cap,task,rtsFalse);
558 }
559 } /* end of while() */
560 }
561
562 /* -----------------------------------------------------------------------------
563 * Run queue operations
564 * -------------------------------------------------------------------------- */
565
566 void
567 removeFromRunQueue (Capability *cap, StgTSO *tso)
568 {
569 if (tso->block_info.prev == END_TSO_QUEUE) {
570 ASSERT(cap->run_queue_hd == tso);
571 cap->run_queue_hd = tso->_link;
572 } else {
573 setTSOLink(cap, tso->block_info.prev, tso->_link);
574 }
575 if (tso->_link == END_TSO_QUEUE) {
576 ASSERT(cap->run_queue_tl == tso);
577 cap->run_queue_tl = tso->block_info.prev;
578 } else {
579 setTSOPrev(cap, tso->_link, tso->block_info.prev);
580 }
581 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
582
583 IF_DEBUG(sanity, checkRunQueue(cap));
584 }
585
586 void
587 promoteInRunQueue (Capability *cap, StgTSO *tso)
588 {
589 removeFromRunQueue(cap, tso);
590 pushOnRunQueue(cap, tso);
591 }
592
593 /* ----------------------------------------------------------------------------
594 * Setting up the scheduler loop
595 * ------------------------------------------------------------------------- */
596
597 static void
598 schedulePreLoop(void)
599 {
600 // initialisation for scheduler - what cannot go into initScheduler()
601
602 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
603 win32AllocStack();
604 #endif
605 }
606
607 /* -----------------------------------------------------------------------------
608 * scheduleFindWork()
609 *
610 * Search for work to do, and handle messages from elsewhere.
611 * -------------------------------------------------------------------------- */
612
613 static void
614 scheduleFindWork (Capability **pcap)
615 {
616 scheduleStartSignalHandlers(*pcap);
617
618 scheduleProcessInbox(pcap);
619
620 scheduleCheckBlockedThreads(*pcap);
621
622 #if defined(THREADED_RTS)
623 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
624 #endif
625 }
626
627 #if defined(THREADED_RTS)
628 STATIC_INLINE rtsBool
629 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
630 {
631 // we need to yield this capability to someone else if..
632 // - another thread is initiating a GC, and we didn't just do a GC
633 // (see Note [GC livelock])
634 // - another Task is returning from a foreign call
635 // - the thread at the head of the run queue cannot be run
636 // by this Task (it is bound to another Task, or it is unbound
637 // and this task it bound).
638 //
639 // Note [GC livelock]
640 //
641 // If we are interrupted to do a GC, then we do not immediately do
642 // another one. This avoids a starvation situation where one
643 // Capability keeps forcing a GC and the other Capabilities make no
644 // progress at all.
645
646 return ((pending_sync && !didGcLast) ||
647 cap->returning_tasks_hd != NULL ||
648 (!emptyRunQueue(cap) && (task->incall->tso == NULL
649 ? peekRunQueue(cap)->bound != NULL
650 : peekRunQueue(cap)->bound != task->incall)));
651 }
652
653 // This is the single place where a Task goes to sleep. There are
654 // two reasons it might need to sleep:
655 // - there are no threads to run
656 // - we need to yield this Capability to someone else
657 // (see shouldYieldCapability())
658 //
659 // Careful: the scheduler loop is quite delicate. Make sure you run
660 // the tests in testsuite/concurrent (all ways) after modifying this,
661 // and also check the benchmarks in nofib/parallel for regressions.
662
663 static void
664 scheduleYield (Capability **pcap, Task *task)
665 {
666 Capability *cap = *pcap;
667 int didGcLast = rtsFalse;
668
669 // if we have work, and we don't need to give up the Capability, continue.
670 //
671 if (!shouldYieldCapability(cap,task,rtsFalse) &&
672 (!emptyRunQueue(cap) ||
673 !emptyInbox(cap) ||
674 sched_state >= SCHED_INTERRUPTING)) {
675 return;
676 }
677
678 // otherwise yield (sleep), and keep yielding if necessary.
679 do {
680 didGcLast = yieldCapability(&cap,task, !didGcLast);
681 }
682 while (shouldYieldCapability(cap,task,didGcLast));
683
684 // note there may still be no threads on the run queue at this
685 // point, the caller has to check.
686
687 *pcap = cap;
688 return;
689 }
690 #endif
691
692 /* -----------------------------------------------------------------------------
693 * schedulePushWork()
694 *
695 * Push work to other Capabilities if we have some.
696 * -------------------------------------------------------------------------- */
697
698 static void
699 schedulePushWork(Capability *cap USED_IF_THREADS,
700 Task *task USED_IF_THREADS)
701 {
702 /* following code not for PARALLEL_HASKELL. I kept the call general,
703 future GUM versions might use pushing in a distributed setup */
704 #if defined(THREADED_RTS)
705
706 Capability *free_caps[n_capabilities], *cap0;
707 nat i, n_free_caps;
708
709 // migration can be turned off with +RTS -qm
710 if (!RtsFlags.ParFlags.migrate) return;
711
712 // Check whether we have more threads on our run queue, or sparks
713 // in our pool, that we could hand to another Capability.
714 if (emptyRunQueue(cap)) {
715 if (sparkPoolSizeCap(cap) < 2) return;
716 } else {
717 if (singletonRunQueue(cap) &&
718 sparkPoolSizeCap(cap) < 1) return;
719 }
720
721 // First grab as many free Capabilities as we can.
722 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
723 cap0 = capabilities[i];
724 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
725 if (!emptyRunQueue(cap0)
726 || cap0->returning_tasks_hd != NULL
727 || cap0->inbox != (Message*)END_TSO_QUEUE) {
728 // it already has some work, we just grabbed it at
729 // the wrong moment. Or maybe it's deadlocked!
730 releaseCapability(cap0);
731 } else {
732 free_caps[n_free_caps++] = cap0;
733 }
734 }
735 }
736
737 // we now have n_free_caps free capabilities stashed in
738 // free_caps[]. Share our run queue equally with them. This is
739 // probably the simplest thing we could do; improvements we might
740 // want to do include:
741 //
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
745 //
746 // - giving low priority to moving long-lived threads
747
748 if (n_free_caps > 0) {
749 StgTSO *prev, *t, *next;
750 #ifdef SPARK_PUSHING
751 rtsBool pushed_to_all;
752 #endif
753
754 debugTrace(DEBUG_sched,
755 "cap %d: %s and %d free capabilities, sharing...",
756 cap->no,
757 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
758 "excess threads on run queue":"sparks to share (>=2)",
759 n_free_caps);
760
761 i = 0;
762 #ifdef SPARK_PUSHING
763 pushed_to_all = rtsFalse;
764 #endif
765
766 if (cap->run_queue_hd != END_TSO_QUEUE) {
767 prev = cap->run_queue_hd;
768 t = prev->_link;
769 prev->_link = END_TSO_QUEUE;
770 for (; t != END_TSO_QUEUE; t = next) {
771 next = t->_link;
772 t->_link = END_TSO_QUEUE;
773 if (t->bound == task->incall // don't move my bound thread
774 || tsoLocked(t)) { // don't move a locked thread
775 setTSOLink(cap, prev, t);
776 setTSOPrev(cap, t, prev);
777 prev = t;
778 } else if (i == n_free_caps) {
779 #ifdef SPARK_PUSHING
780 pushed_to_all = rtsTrue;
781 #endif
782 i = 0;
783 // keep one for us
784 setTSOLink(cap, prev, t);
785 setTSOPrev(cap, t, prev);
786 prev = t;
787 } else {
788 appendToRunQueue(free_caps[i],t);
789
790 traceEventMigrateThread (cap, t, free_caps[i]->no);
791
792 if (t->bound) { t->bound->task->cap = free_caps[i]; }
793 t->cap = free_caps[i];
794 i++;
795 }
796 }
797 cap->run_queue_tl = prev;
798
799 IF_DEBUG(sanity, checkRunQueue(cap));
800 }
801
802 #ifdef SPARK_PUSHING
803 /* JB I left this code in place, it would work but is not necessary */
804
805 // If there are some free capabilities that we didn't push any
806 // threads to, then try to push a spark to each one.
807 if (!pushed_to_all) {
808 StgClosure *spark;
809 // i is the next free capability to push to
810 for (; i < n_free_caps; i++) {
811 if (emptySparkPoolCap(free_caps[i])) {
812 spark = tryStealSpark(cap->sparks);
813 if (spark != NULL) {
814 /* TODO: if anyone wants to re-enable this code then
815 * they must consider the fizzledSpark(spark) case
816 * and update the per-cap spark statistics.
817 */
818 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
819
820 traceEventStealSpark(free_caps[i], t, cap->no);
821
822 newSpark(&(free_caps[i]->r), spark);
823 }
824 }
825 }
826 }
827 #endif /* SPARK_PUSHING */
828
829 // release the capabilities
830 for (i = 0; i < n_free_caps; i++) {
831 task->cap = free_caps[i];
832 releaseAndWakeupCapability(free_caps[i]);
833 }
834 }
835 task->cap = cap; // reset to point to our Capability.
836
837 #endif /* THREADED_RTS */
838
839 }
840
841 /* ----------------------------------------------------------------------------
842 * Start any pending signal handlers
843 * ------------------------------------------------------------------------- */
844
845 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
846 static void
847 scheduleStartSignalHandlers(Capability *cap)
848 {
849 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
850 // safe outside the lock
851 startSignalHandlers(cap);
852 }
853 }
854 #else
855 static void
856 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
857 {
858 }
859 #endif
860
861 /* ----------------------------------------------------------------------------
862 * Check for blocked threads that can be woken up.
863 * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
867 {
868 #if !defined(THREADED_RTS)
869 //
870 // Check whether any waiting threads need to be woken up. If the
871 // run queue is empty, and there are no other tasks running, we
872 // can wait indefinitely for something to happen.
873 //
874 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
875 {
876 awaitEvent (emptyRunQueue(cap));
877 }
878 #endif
879 }
880
881 /* ----------------------------------------------------------------------------
882 * Detect deadlock conditions and attempt to resolve them.
883 * ------------------------------------------------------------------------- */
884
885 static void
886 scheduleDetectDeadlock (Capability **pcap, Task *task)
887 {
888 Capability *cap = *pcap;
889 /*
890 * Detect deadlock: when we have no threads to run, there are no
891 * threads blocked, waiting for I/O, or sleeping, and all the
892 * other tasks are waiting for work, we must have a deadlock of
893 * some description.
894 */
895 if ( emptyThreadQueues(cap) )
896 {
897 #if defined(THREADED_RTS)
898 /*
899 * In the threaded RTS, we only check for deadlock if there
900 * has been no activity in a complete timeslice. This means
901 * we won't eagerly start a full GC just because we don't have
902 * any threads to run currently.
903 */
904 if (recent_activity != ACTIVITY_INACTIVE) return;
905 #endif
906
907 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
908
909 // Garbage collection can release some new threads due to
910 // either (a) finalizers or (b) threads resurrected because
911 // they are unreachable and will therefore be sent an
912 // exception. Any threads thus released will be immediately
913 // runnable.
914 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
915 cap = *pcap;
916 // when force_major == rtsTrue. scheduleDoGC sets
917 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
918 // signal.
919
920 if ( !emptyRunQueue(cap) ) return;
921
922 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
923 /* If we have user-installed signal handlers, then wait
924 * for signals to arrive rather then bombing out with a
925 * deadlock.
926 */
927 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
928 debugTrace(DEBUG_sched,
929 "still deadlocked, waiting for signals...");
930
931 awaitUserSignals();
932
933 if (signals_pending()) {
934 startSignalHandlers(cap);
935 }
936
937 // either we have threads to run, or we were interrupted:
938 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
939
940 return;
941 }
942 #endif
943
944 #if !defined(THREADED_RTS)
945 /* Probably a real deadlock. Send the current main thread the
946 * Deadlock exception.
947 */
948 if (task->incall->tso) {
949 switch (task->incall->tso->why_blocked) {
950 case BlockedOnSTM:
951 case BlockedOnBlackHole:
952 case BlockedOnMsgThrowTo:
953 case BlockedOnMVar:
954 case BlockedOnMVarRead:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static void
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches != END_BF_QUEUE) {
979 processFetches();
980 }
981 # endif
982
983 if (RtsFlags.ParFlags.BufferTime) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
986 sendOldBuffers();
987 }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
994
995 static void
996 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
997 {
998 #if defined(THREADED_RTS)
999 Message *m, *next;
1000 int r;
1001 Capability *cap = *pcap;
1002
1003 while (!emptyInbox(cap)) {
1004 if (cap->r.rCurrentNursery->link == NULL ||
1005 g0->n_new_large_words >= large_alloc_lim) {
1006 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1007 cap = *pcap;
1008 }
1009
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1014 //
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r = TRY_ACQUIRE_LOCK(&cap->lock);
1021 if (r != 0) return;
1022
1023 m = cap->inbox;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025
1026 RELEASE_LOCK(&cap->lock);
1027
1028 while (m != (Message*)END_TSO_QUEUE) {
1029 next = m->link;
1030 executeMessage(cap, m);
1031 m = next;
1032 }
1033 }
1034 #endif
1035 }
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 //
1086 // If the current thread's allocation limit has run out, send it
1087 // the AllocationLimitExceeded exception.
1088
1089 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1090 // Use a throwToSelf rather than a throwToSingleThreaded, because
1091 // it correctly handles the case where the thread is currently
1092 // inside mask. Also the thread might be blocked (e.g. on an
1093 // MVar), and throwToSingleThreaded doesn't unblock it
1094 // correctly in that case.
1095 throwToSelf(cap, t, allocationLimitExceeded_closure);
1096 ASSIGN_Int64((W_*)&(t->alloc_limit),
1097 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1098 }
1099
1100 /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1105 * -------------------------------------------------------------------------- */
1106
1107 static rtsBool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110 // did the task ask for a large block?
1111 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1112 // if so, get one and push it on the front of the nursery.
1113 bdescr *bd;
1114 W_ blocks;
1115
1116 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1117
1118 if (blocks > BLOCKS_PER_MBLOCK) {
1119 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1120 }
1121
1122 debugTrace(DEBUG_sched,
1123 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1124 (long)t->id, what_next_strs[t->what_next], blocks);
1125
1126 // don't do this if the nursery is (nearly) full, we'll GC first.
1127 if (cap->r.rCurrentNursery->link != NULL ||
1128 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1129 // infinite loop if the
1130 // nursery has only one
1131 // block.
1132
1133 bd = allocGroup_lock(blocks);
1134 cap->r.rNursery->n_blocks += blocks;
1135
1136 // link the new group after CurrentNursery
1137 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1138
1139 // initialise it as a nursery block. We initialise the
1140 // step, gen_no, and flags field of *every* sub-block in
1141 // this large block, because this is easier than making
1142 // sure that we always find the block head of a large
1143 // block whenever we call Bdescr() (eg. evacuate() and
1144 // isAlive() in the GC would both have to do this, at
1145 // least).
1146 {
1147 bdescr *x;
1148 for (x = bd; x < bd + blocks; x++) {
1149 initBdescr(x,g0,g0);
1150 x->free = x->start;
1151 x->flags = 0;
1152 }
1153 }
1154
1155 // This assert can be a killer if the app is doing lots
1156 // of large block allocations.
1157 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1158
1159 // now update the nursery to point to the new block
1160 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1161 cap->r.rCurrentNursery = bd;
1162
1163 // we might be unlucky and have another thread get on the
1164 // run queue before us and steal the large block, but in that
1165 // case the thread will just end up requesting another large
1166 // block.
1167 pushOnRunQueue(cap,t);
1168 return rtsFalse; /* not actually GC'ing */
1169 }
1170 }
1171
1172 if (getNewNursery(cap)) {
1173 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1174 pushOnRunQueue(cap,t);
1175 return rtsFalse;
1176 }
1177
1178 if (cap->r.rHpLim == NULL || cap->context_switch) {
1179 // Sometimes we miss a context switch, e.g. when calling
1180 // primitives in a tight loop, MAYBE_GC() doesn't check the
1181 // context switch flag, and we end up waiting for a GC.
1182 // See #1984, and concurrent/should_run/1984
1183 cap->context_switch = 0;
1184 appendToRunQueue(cap,t);
1185 } else {
1186 pushOnRunQueue(cap,t);
1187 }
1188 return rtsTrue;
1189 /* actual GC is done at the end of the while loop in schedule() */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1198 {
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1202 * GC is finished.
1203 */
1204
1205 ASSERT(t->_link == END_TSO_QUEUE);
1206
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1209 // thread.
1210 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1211 debugTrace(DEBUG_sched,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t->id, what_next_strs[t->what_next]);
1214 return rtsTrue;
1215 }
1216
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap->context_switch != 0) {
1224 cap->context_switch = 0;
1225 appendToRunQueue(cap,t);
1226 } else {
1227 pushOnRunQueue(cap,t);
1228 }
1229
1230 IF_DEBUG(sanity,
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232 checkTSO(t));
1233
1234 return rtsFalse;
1235 }
1236
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1240
1241 static void
1242 scheduleHandleThreadBlocked( StgTSO *t
1243 #if !defined(DEBUG)
1244 STG_UNUSED
1245 #endif
1246 )
1247 {
1248
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1252
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1258
1259 #ifdef DEBUG
1260 traceThreadStatus(DEBUG_sched, t);
1261 #endif
1262 }
1263
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1267
1268 static rtsBool
1269 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1270 {
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1273 *
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1276 */
1277
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap, t);
1281
1282 //
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1285 //
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1290 //
1291
1292 if (t->bound) {
1293
1294 if (t->bound != task->incall) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap,t);
1303 return rtsFalse;
1304 #else
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1308 #endif
1309 }
1310
1311 ASSERT(task->incall->tso == t);
1312
1313 if (t->what_next == ThreadComplete) {
1314 if (task->incall->ret) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1317 }
1318 task->incall->stat = Success;
1319 } else {
1320 if (task->incall->ret) {
1321 *(task->incall->ret) = NULL;
1322 }
1323 if (sched_state >= SCHED_INTERRUPTING) {
1324 if (heap_overflow) {
1325 task->incall->stat = HeapExhausted;
1326 } else {
1327 task->incall->stat = Interrupted;
1328 }
1329 } else {
1330 task->incall->stat = Killed;
1331 }
1332 }
1333 #ifdef DEBUG
1334 removeThreadLabel((StgWord)task->incall->tso->id);
1335 #endif
1336
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1344 t->bound = NULL;
1345 task->incall->tso = NULL;
1346
1347 return rtsTrue; // tells schedule() to return
1348 }
1349
1350 return rtsFalse;
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1356
1357 static rtsBool
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1359 {
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile ||
1363 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1364 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1365 return rtsTrue;
1366 } else {
1367 return rtsFalse;
1368 }
1369 }
1370
1371 /* -----------------------------------------------------------------------------
1372 * Start a synchronisation of all capabilities
1373 * -------------------------------------------------------------------------- */
1374
1375 // Returns:
1376 // 0 if we successfully got a sync
1377 // non-0 if there was another sync request in progress,
1378 // and we yielded to it. The value returned is the
1379 // type of the other sync request.
1380 //
1381 #if defined(THREADED_RTS)
1382 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1383 {
1384 nat prev_pending_sync;
1385
1386 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1387
1388 if (prev_pending_sync)
1389 {
1390 do {
1391 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1392 prev_pending_sync);
1393 ASSERT(*pcap);
1394 yieldCapability(pcap,task,rtsTrue);
1395 } while (pending_sync);
1396 return prev_pending_sync; // NOTE: task->cap might have changed now
1397 }
1398 else
1399 {
1400 return 0;
1401 }
1402 }
1403
1404 //
1405 // Grab all the capabilities except the one we already hold. Used
1406 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1407 // before a fork (SYNC_OTHER).
1408 //
1409 // Only call this after requestSync(), otherwise a deadlock might
1410 // ensue if another thread is trying to synchronise.
1411 //
1412 static void acquireAllCapabilities(Capability *cap, Task *task)
1413 {
1414 Capability *tmpcap;
1415 nat i;
1416
1417 for (i=0; i < n_capabilities; i++) {
1418 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1419 tmpcap = capabilities[i];
1420 if (tmpcap != cap) {
1421 // we better hope this task doesn't get migrated to
1422 // another Capability while we're waiting for this one.
1423 // It won't, because load balancing happens while we have
1424 // all the Capabilities, but even so it's a slightly
1425 // unsavoury invariant.
1426 task->cap = tmpcap;
1427 waitForReturnCapability(&tmpcap, task);
1428 if (tmpcap->no != i) {
1429 barf("acquireAllCapabilities: got the wrong capability");
1430 }
1431 }
1432 }
1433 task->cap = cap;
1434 }
1435
1436 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1437 {
1438 nat i;
1439
1440 for (i = 0; i < n; i++) {
1441 if (cap->no != i) {
1442 task->cap = capabilities[i];
1443 releaseCapability(capabilities[i]);
1444 }
1445 }
1446 task->cap = cap;
1447 }
1448 #endif
1449
1450 /* -----------------------------------------------------------------------------
1451 * Perform a garbage collection if necessary
1452 * -------------------------------------------------------------------------- */
1453
1454 static void
1455 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1456 rtsBool force_major)
1457 {
1458 Capability *cap = *pcap;
1459 rtsBool heap_census;
1460 nat collect_gen;
1461 #ifdef THREADED_RTS
1462 nat gc_type;
1463 nat i, sync;
1464 StgTSO *tso;
1465 #endif
1466
1467 if (sched_state == SCHED_SHUTTING_DOWN) {
1468 // The final GC has already been done, and the system is
1469 // shutting down. We'll probably deadlock if we try to GC
1470 // now.
1471 return;
1472 }
1473
1474 heap_census = scheduleNeedHeapProfile(rtsTrue);
1475
1476 // Figure out which generation we are collecting, so that we can
1477 // decide whether this is a parallel GC or not.
1478 collect_gen = calcNeeded(force_major || heap_census, NULL);
1479
1480 #ifdef THREADED_RTS
1481 if (sched_state < SCHED_INTERRUPTING
1482 && RtsFlags.ParFlags.parGcEnabled
1483 && collect_gen >= RtsFlags.ParFlags.parGcGen
1484 && ! oldest_gen->mark)
1485 {
1486 gc_type = SYNC_GC_PAR;
1487 } else {
1488 gc_type = SYNC_GC_SEQ;
1489 }
1490
1491 // In order to GC, there must be no threads running Haskell code.
1492 // Therefore, the GC thread needs to hold *all* the capabilities,
1493 // and release them after the GC has completed.
1494 //
1495 // This seems to be the simplest way: previous attempts involved
1496 // making all the threads with capabilities give up their
1497 // capabilities and sleep except for the *last* one, which
1498 // actually did the GC. But it's quite hard to arrange for all
1499 // the other tasks to sleep and stay asleep.
1500 //
1501
1502 /* Other capabilities are prevented from running yet more Haskell
1503 threads if pending_sync is set. Tested inside
1504 yieldCapability() and releaseCapability() in Capability.c */
1505
1506 do {
1507 sync = requestSync(pcap, task, gc_type);
1508 cap = *pcap;
1509 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1510 // someone else had a pending sync request for a GC, so
1511 // let's assume GC has been done and we don't need to GC
1512 // again.
1513 return;
1514 }
1515 if (sched_state == SCHED_SHUTTING_DOWN) {
1516 // The scheduler might now be shutting down. We tested
1517 // this above, but it might have become true since then as
1518 // we yielded the capability in requestSync().
1519 return;
1520 }
1521 } while (sync);
1522
1523 // don't declare this until after we have sync'd, because
1524 // n_capabilities may change.
1525 rtsBool idle_cap[n_capabilities];
1526 #ifdef DEBUG
1527 unsigned int old_n_capabilities = n_capabilities;
1528 #endif
1529
1530 interruptAllCapabilities();
1531
1532 // The final shutdown GC is always single-threaded, because it's
1533 // possible that some of the Capabilities have no worker threads.
1534
1535 if (gc_type == SYNC_GC_SEQ)
1536 {
1537 traceEventRequestSeqGc(cap);
1538 }
1539 else
1540 {
1541 traceEventRequestParGc(cap);
1542 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1543 }
1544
1545 if (gc_type == SYNC_GC_SEQ)
1546 {
1547 // single-threaded GC: grab all the capabilities
1548 acquireAllCapabilities(cap,task);
1549 }
1550 else
1551 {
1552 // If we are load-balancing collections in this
1553 // generation, then we require all GC threads to participate
1554 // in the collection. Otherwise, we only require active
1555 // threads to participate, and we set gc_threads[i]->idle for
1556 // any idle capabilities. The rationale here is that waking
1557 // up an idle Capability takes much longer than just doing any
1558 // GC work on its behalf.
1559
1560 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1561 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1562 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1563 for (i=0; i < n_capabilities; i++) {
1564 if (capabilities[i]->disabled) {
1565 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1566 } else {
1567 idle_cap[i] = rtsFalse;
1568 }
1569 }
1570 } else {
1571 for (i=0; i < n_capabilities; i++) {
1572 if (capabilities[i]->disabled) {
1573 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1574 } else if (i == cap->no ||
1575 capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1576 idle_cap[i] = rtsFalse;
1577 } else {
1578 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1579 if (!idle_cap[i]) {
1580 n_failed_trygrab_idles++;
1581 } else {
1582 n_idle_caps++;
1583 }
1584 }
1585 }
1586 }
1587
1588 // We set the gc_thread[i]->idle flag if that
1589 // capability/thread is not participating in this collection.
1590 // We also keep a local record of which capabilities are idle
1591 // in idle_cap[], because scheduleDoGC() is re-entrant:
1592 // another thread might start a GC as soon as we've finished
1593 // this one, and thus the gc_thread[]->idle flags are invalid
1594 // as soon as we release any threads after GC. Getting this
1595 // wrong leads to a rare and hard to debug deadlock!
1596
1597 for (i=0; i < n_capabilities; i++) {
1598 gc_threads[i]->idle = idle_cap[i];
1599 capabilities[i]->idle++;
1600 }
1601
1602 // For all capabilities participating in this GC, wait until
1603 // they have stopped mutating and are standing by for GC.
1604 waitForGcThreads(cap);
1605
1606 #if defined(THREADED_RTS)
1607 // Stable point where we can do a global check on our spark counters
1608 ASSERT(checkSparkCountInvariant());
1609 #endif
1610 }
1611
1612 #endif
1613
1614 IF_DEBUG(scheduler, printAllThreads());
1615
1616 delete_threads_and_gc:
1617 /*
1618 * We now have all the capabilities; if we're in an interrupting
1619 * state, then we should take the opportunity to delete all the
1620 * threads in the system.
1621 */
1622 if (sched_state == SCHED_INTERRUPTING) {
1623 deleteAllThreads(cap);
1624 #if defined(THREADED_RTS)
1625 // Discard all the sparks from every Capability. Why?
1626 // They'll probably be GC'd anyway since we've killed all the
1627 // threads. It just avoids the GC having to do any work to
1628 // figure out that any remaining sparks are garbage.
1629 for (i = 0; i < n_capabilities; i++) {
1630 capabilities[i]->spark_stats.gcd +=
1631 sparkPoolSize(capabilities[i]->sparks);
1632 // No race here since all Caps are stopped.
1633 discardSparksCap(capabilities[i]);
1634 }
1635 #endif
1636 sched_state = SCHED_SHUTTING_DOWN;
1637 }
1638
1639 /*
1640 * When there are disabled capabilities, we want to migrate any
1641 * threads away from them. Normally this happens in the
1642 * scheduler's loop, but only for unbound threads - it's really
1643 * hard for a bound thread to migrate itself. So we have another
1644 * go here.
1645 */
1646 #if defined(THREADED_RTS)
1647 for (i = enabled_capabilities; i < n_capabilities; i++) {
1648 Capability *tmp_cap, *dest_cap;
1649 tmp_cap = capabilities[i];
1650 ASSERT(tmp_cap->disabled);
1651 if (i != cap->no) {
1652 dest_cap = capabilities[i % enabled_capabilities];
1653 while (!emptyRunQueue(tmp_cap)) {
1654 tso = popRunQueue(tmp_cap);
1655 migrateThread(tmp_cap, tso, dest_cap);
1656 if (tso->bound) {
1657 traceTaskMigrate(tso->bound->task,
1658 tso->bound->task->cap,
1659 dest_cap);
1660 tso->bound->task->cap = dest_cap;
1661 }
1662 }
1663 }
1664 }
1665 #endif
1666
1667 #if defined(THREADED_RTS)
1668 // reset pending_sync *before* GC, so that when the GC threads
1669 // emerge they don't immediately re-enter the GC.
1670 pending_sync = 0;
1671 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1672 #else
1673 GarbageCollect(collect_gen, heap_census, 0, cap);
1674 #endif
1675
1676 traceSparkCounters(cap);
1677
1678 switch (recent_activity) {
1679 case ACTIVITY_INACTIVE:
1680 if (force_major) {
1681 // We are doing a GC because the system has been idle for a
1682 // timeslice and we need to check for deadlock. Record the
1683 // fact that we've done a GC and turn off the timer signal;
1684 // it will get re-enabled if we run any threads after the GC.
1685 recent_activity = ACTIVITY_DONE_GC;
1686 #ifndef PROFILING
1687 stopTimer();
1688 #endif
1689 break;
1690 }
1691 // fall through...
1692
1693 case ACTIVITY_MAYBE_NO:
1694 // the GC might have taken long enough for the timer to set
1695 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1696 // but we aren't necessarily deadlocked:
1697 recent_activity = ACTIVITY_YES;
1698 break;
1699
1700 case ACTIVITY_DONE_GC:
1701 // If we are actually active, the scheduler will reset the
1702 // recent_activity flag and re-enable the timer.
1703 break;
1704 }
1705
1706 #if defined(THREADED_RTS)
1707 // Stable point where we can do a global check on our spark counters
1708 ASSERT(checkSparkCountInvariant());
1709 #endif
1710
1711 // The heap census itself is done during GarbageCollect().
1712 if (heap_census) {
1713 performHeapProfile = rtsFalse;
1714 }
1715
1716 #if defined(THREADED_RTS)
1717
1718 // If n_capabilities has changed during GC, we're in trouble.
1719 ASSERT(n_capabilities == old_n_capabilities);
1720
1721 if (gc_type == SYNC_GC_PAR)
1722 {
1723 releaseGCThreads(cap);
1724 for (i = 0; i < n_capabilities; i++) {
1725 if (i != cap->no) {
1726 if (idle_cap[i]) {
1727 ASSERT(capabilities[i]->running_task == task);
1728 task->cap = capabilities[i];
1729 releaseCapability(capabilities[i]);
1730 } else {
1731 ASSERT(capabilities[i]->running_task != task);
1732 }
1733 }
1734 }
1735 task->cap = cap;
1736 }
1737 #endif
1738
1739 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1740 // GC set the heap_overflow flag, so we should proceed with
1741 // an orderly shutdown now. Ultimately we want the main
1742 // thread to return to its caller with HeapExhausted, at which
1743 // point the caller should call hs_exit(). The first step is
1744 // to delete all the threads.
1745 //
1746 // Another way to do this would be to raise an exception in
1747 // the main thread, which we really should do because it gives
1748 // the program a chance to clean up. But how do we find the
1749 // main thread? It should presumably be the same one that
1750 // gets ^C exceptions, but that's all done on the Haskell side
1751 // (GHC.TopHandler).
1752 sched_state = SCHED_INTERRUPTING;
1753 goto delete_threads_and_gc;
1754 }
1755
1756 #ifdef SPARKBALANCE
1757 /* JB
1758 Once we are all together... this would be the place to balance all
1759 spark pools. No concurrent stealing or adding of new sparks can
1760 occur. Should be defined in Sparks.c. */
1761 balanceSparkPoolsCaps(n_capabilities, capabilities);
1762 #endif
1763
1764 #if defined(THREADED_RTS)
1765 if (gc_type == SYNC_GC_SEQ) {
1766 // release our stash of capabilities.
1767 releaseAllCapabilities(n_capabilities, cap, task);
1768 }
1769 #endif
1770
1771 return;
1772 }
1773
1774 /* ---------------------------------------------------------------------------
1775 * Singleton fork(). Do not copy any running threads.
1776 * ------------------------------------------------------------------------- */
1777
1778 pid_t
1779 forkProcess(HsStablePtr *entry
1780 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1781 STG_UNUSED
1782 #endif
1783 )
1784 {
1785 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1786 pid_t pid;
1787 StgTSO* t,*next;
1788 Capability *cap;
1789 nat g;
1790 Task *task = NULL;
1791 nat i;
1792 #ifdef THREADED_RTS
1793 nat sync;
1794 #endif
1795
1796 debugTrace(DEBUG_sched, "forking!");
1797
1798 task = newBoundTask();
1799
1800 cap = NULL;
1801 waitForReturnCapability(&cap, task);
1802
1803 #ifdef THREADED_RTS
1804 do {
1805 sync = requestSync(&cap, task, SYNC_OTHER);
1806 } while (sync);
1807
1808 acquireAllCapabilities(cap,task);
1809
1810 pending_sync = 0;
1811 #endif
1812
1813 // no funny business: hold locks while we fork, otherwise if some
1814 // other thread is holding a lock when the fork happens, the data
1815 // structure protected by the lock will forever be in an
1816 // inconsistent state in the child. See also #1391.
1817 ACQUIRE_LOCK(&sched_mutex);
1818 ACQUIRE_LOCK(&sm_mutex);
1819 ACQUIRE_LOCK(&stable_mutex);
1820 ACQUIRE_LOCK(&task->lock);
1821
1822 for (i=0; i < n_capabilities; i++) {
1823 ACQUIRE_LOCK(&capabilities[i]->lock);
1824 }
1825
1826 #ifdef THREADED_RTS
1827 ACQUIRE_LOCK(&all_tasks_mutex);
1828 #endif
1829
1830 stopTimer(); // See #4074
1831
1832 #if defined(TRACING)
1833 flushEventLog(); // so that child won't inherit dirty file buffers
1834 #endif
1835
1836 pid = fork();
1837
1838 if (pid) { // parent
1839
1840 startTimer(); // #4074
1841
1842 RELEASE_LOCK(&sched_mutex);
1843 RELEASE_LOCK(&sm_mutex);
1844 RELEASE_LOCK(&stable_mutex);
1845 RELEASE_LOCK(&task->lock);
1846
1847 for (i=0; i < n_capabilities; i++) {
1848 releaseCapability_(capabilities[i],rtsFalse);
1849 RELEASE_LOCK(&capabilities[i]->lock);
1850 }
1851
1852 #ifdef THREADED_RTS
1853 RELEASE_LOCK(&all_tasks_mutex);
1854 #endif
1855
1856 boundTaskExiting(task);
1857
1858 // just return the pid
1859 return pid;
1860
1861 } else { // child
1862
1863 #if defined(THREADED_RTS)
1864 initMutex(&sched_mutex);
1865 initMutex(&sm_mutex);
1866 initMutex(&stable_mutex);
1867 initMutex(&task->lock);
1868
1869 for (i=0; i < n_capabilities; i++) {
1870 initMutex(&capabilities[i]->lock);
1871 }
1872
1873 initMutex(&all_tasks_mutex);
1874 #endif
1875
1876 #ifdef TRACING
1877 resetTracing();
1878 #endif
1879
1880 // Now, all OS threads except the thread that forked are
1881 // stopped. We need to stop all Haskell threads, including
1882 // those involved in foreign calls. Also we need to delete
1883 // all Tasks, because they correspond to OS threads that are
1884 // now gone.
1885
1886 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1887 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1888 next = t->global_link;
1889 // don't allow threads to catch the ThreadKilled
1890 // exception, but we do want to raiseAsync() because these
1891 // threads may be evaluating thunks that we need later.
1892 deleteThread_(t->cap,t);
1893
1894 // stop the GC from updating the InCall to point to
1895 // the TSO. This is only necessary because the
1896 // OSThread bound to the TSO has been killed, and
1897 // won't get a chance to exit in the usual way (see
1898 // also scheduleHandleThreadFinished).
1899 t->bound = NULL;
1900 }
1901 }
1902
1903 discardTasksExcept(task);
1904
1905 for (i=0; i < n_capabilities; i++) {
1906 cap = capabilities[i];
1907
1908 // Empty the run queue. It seems tempting to let all the
1909 // killed threads stay on the run queue as zombies to be
1910 // cleaned up later, but some of them may correspond to
1911 // bound threads for which the corresponding Task does not
1912 // exist.
1913 truncateRunQueue(cap);
1914
1915 // Any suspended C-calling Tasks are no more, their OS threads
1916 // don't exist now:
1917 cap->suspended_ccalls = NULL;
1918
1919 #if defined(THREADED_RTS)
1920 // Wipe our spare workers list, they no longer exist. New
1921 // workers will be created if necessary.
1922 cap->spare_workers = NULL;
1923 cap->n_spare_workers = 0;
1924 cap->returning_tasks_hd = NULL;
1925 cap->returning_tasks_tl = NULL;
1926 #endif
1927
1928 // Release all caps except 0, we'll use that for starting
1929 // the IO manager and running the client action below.
1930 if (cap->no != 0) {
1931 task->cap = cap;
1932 releaseCapability(cap);
1933 }
1934 }
1935 cap = capabilities[0];
1936 task->cap = cap;
1937
1938 // Empty the threads lists. Otherwise, the garbage
1939 // collector may attempt to resurrect some of these threads.
1940 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1941 generations[g].threads = END_TSO_QUEUE;
1942 }
1943
1944 // On Unix, all timers are reset in the child, so we need to start
1945 // the timer again.
1946 initTimer();
1947 startTimer();
1948
1949 // TODO: need to trace various other things in the child
1950 // like startup event, capabilities, process info etc
1951 traceTaskCreate(task, cap);
1952
1953 #if defined(THREADED_RTS)
1954 ioManagerStartCap(&cap);
1955 #endif
1956
1957 rts_evalStableIO(&cap, entry, NULL); // run the action
1958 rts_checkSchedStatus("forkProcess",cap);
1959
1960 rts_unlock(cap);
1961 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
1962 }
1963 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1964 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1965 #endif
1966 }
1967
1968 /* ---------------------------------------------------------------------------
1969 * Changing the number of Capabilities
1970 *
1971 * Changing the number of Capabilities is very tricky! We can only do
1972 * it with the system fully stopped, so we do a full sync with
1973 * requestSync(SYNC_OTHER) and grab all the capabilities.
1974 *
1975 * Then we resize the appropriate data structures, and update all
1976 * references to the old data structures which have now moved.
1977 * Finally we release the Capabilities we are holding, and start
1978 * worker Tasks on the new Capabilities we created.
1979 *
1980 * ------------------------------------------------------------------------- */
1981
1982 void
1983 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1984 {
1985 #if !defined(THREADED_RTS)
1986 if (new_n_capabilities != 1) {
1987 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1988 }
1989 return;
1990 #elif defined(NOSMP)
1991 if (new_n_capabilities != 1) {
1992 errorBelch("setNumCapabilities: not supported on this platform");
1993 }
1994 return;
1995 #else
1996 Task *task;
1997 Capability *cap;
1998 nat sync;
1999 nat n;
2000 Capability *old_capabilities = NULL;
2001 nat old_n_capabilities = n_capabilities;
2002
2003 if (new_n_capabilities == enabled_capabilities) return;
2004
2005 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2006 enabled_capabilities, new_n_capabilities);
2007
2008 cap = rts_lock();
2009 task = cap->running_task;
2010
2011 do {
2012 sync = requestSync(&cap, task, SYNC_OTHER);
2013 } while (sync);
2014
2015 acquireAllCapabilities(cap,task);
2016
2017 pending_sync = 0;
2018
2019 if (new_n_capabilities < enabled_capabilities)
2020 {
2021 // Reducing the number of capabilities: we do not actually
2022 // remove the extra capabilities, we just mark them as
2023 // "disabled". This has the following effects:
2024 //
2025 // - threads on a disabled capability are migrated away by the
2026 // scheduler loop
2027 //
2028 // - disabled capabilities do not participate in GC
2029 // (see scheduleDoGC())
2030 //
2031 // - No spark threads are created on this capability
2032 // (see scheduleActivateSpark())
2033 //
2034 // - We do not attempt to migrate threads *to* a disabled
2035 // capability (see schedulePushWork()).
2036 //
2037 // but in other respects, a disabled capability remains
2038 // alive. Threads may be woken up on a disabled capability,
2039 // but they will be immediately migrated away.
2040 //
2041 // This approach is much easier than trying to actually remove
2042 // the capability; we don't have to worry about GC data
2043 // structures, the nursery, etc.
2044 //
2045 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2046 capabilities[n]->disabled = rtsTrue;
2047 traceCapDisable(capabilities[n]);
2048 }
2049 enabled_capabilities = new_n_capabilities;
2050 }
2051 else
2052 {
2053 // Increasing the number of enabled capabilities.
2054 //
2055 // enable any disabled capabilities, up to the required number
2056 for (n = enabled_capabilities;
2057 n < new_n_capabilities && n < n_capabilities; n++) {
2058 capabilities[n]->disabled = rtsFalse;
2059 traceCapEnable(capabilities[n]);
2060 }
2061 enabled_capabilities = n;
2062
2063 if (new_n_capabilities > n_capabilities) {
2064 #if defined(TRACING)
2065 // Allocate eventlog buffers for the new capabilities. Note this
2066 // must be done before calling moreCapabilities(), because that
2067 // will emit events about creating the new capabilities and adding
2068 // them to existing capsets.
2069 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2070 #endif
2071
2072 // Resize the capabilities array
2073 // NB. after this, capabilities points somewhere new. Any pointers
2074 // of type (Capability *) are now invalid.
2075 moreCapabilities(n_capabilities, new_n_capabilities);
2076
2077 // Resize and update storage manager data structures
2078 storageAddCapabilities(n_capabilities, new_n_capabilities);
2079 }
2080 }
2081
2082 // update n_capabilities before things start running
2083 if (new_n_capabilities > n_capabilities) {
2084 n_capabilities = enabled_capabilities = new_n_capabilities;
2085 }
2086
2087 // Start worker tasks on the new Capabilities
2088 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2089
2090 // We're done: release the original Capabilities
2091 releaseAllCapabilities(old_n_capabilities, cap,task);
2092
2093 // We can't free the old array until now, because we access it
2094 // while updating pointers in updateCapabilityRefs().
2095 if (old_capabilities) {
2096 stgFree(old_capabilities);
2097 }
2098
2099 // Notify IO manager that the number of capabilities has changed.
2100 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2101
2102 rts_unlock(cap);
2103
2104 #endif // THREADED_RTS
2105 }
2106
2107
2108
2109 /* ---------------------------------------------------------------------------
2110 * Delete all the threads in the system
2111 * ------------------------------------------------------------------------- */
2112
2113 static void
2114 deleteAllThreads ( Capability *cap )
2115 {
2116 // NOTE: only safe to call if we own all capabilities.
2117
2118 StgTSO* t, *next;
2119 nat g;
2120
2121 debugTrace(DEBUG_sched,"deleting all threads");
2122 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2123 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2124 next = t->global_link;
2125 deleteThread(cap,t);
2126 }
2127 }
2128
2129 // The run queue now contains a bunch of ThreadKilled threads. We
2130 // must not throw these away: the main thread(s) will be in there
2131 // somewhere, and the main scheduler loop has to deal with it.
2132 // Also, the run queue is the only thing keeping these threads from
2133 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2134
2135 #if !defined(THREADED_RTS)
2136 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2137 ASSERT(sleeping_queue == END_TSO_QUEUE);
2138 #endif
2139 }
2140
2141 /* -----------------------------------------------------------------------------
2142 Managing the suspended_ccalls list.
2143 Locks required: sched_mutex
2144 -------------------------------------------------------------------------- */
2145
2146 STATIC_INLINE void
2147 suspendTask (Capability *cap, Task *task)
2148 {
2149 InCall *incall;
2150
2151 incall = task->incall;
2152 ASSERT(incall->next == NULL && incall->prev == NULL);
2153 incall->next = cap->suspended_ccalls;
2154 incall->prev = NULL;
2155 if (cap->suspended_ccalls) {
2156 cap->suspended_ccalls->prev = incall;
2157 }
2158 cap->suspended_ccalls = incall;
2159 }
2160
2161 STATIC_INLINE void
2162 recoverSuspendedTask (Capability *cap, Task *task)
2163 {
2164 InCall *incall;
2165
2166 incall = task->incall;
2167 if (incall->prev) {
2168 incall->prev->next = incall->next;
2169 } else {
2170 ASSERT(cap->suspended_ccalls == incall);
2171 cap->suspended_ccalls = incall->next;
2172 }
2173 if (incall->next) {
2174 incall->next->prev = incall->prev;
2175 }
2176 incall->next = incall->prev = NULL;
2177 }
2178
2179 /* ---------------------------------------------------------------------------
2180 * Suspending & resuming Haskell threads.
2181 *
2182 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2183 * its capability before calling the C function. This allows another
2184 * task to pick up the capability and carry on running Haskell
2185 * threads. It also means that if the C call blocks, it won't lock
2186 * the whole system.
2187 *
2188 * The Haskell thread making the C call is put to sleep for the
2189 * duration of the call, on the suspended_ccalling_threads queue. We
2190 * give out a token to the task, which it can use to resume the thread
2191 * on return from the C function.
2192 *
2193 * If this is an interruptible C call, this means that the FFI call may be
2194 * unceremoniously terminated and should be scheduled on an
2195 * unbound worker thread.
2196 * ------------------------------------------------------------------------- */
2197
2198 void *
2199 suspendThread (StgRegTable *reg, rtsBool interruptible)
2200 {
2201 Capability *cap;
2202 int saved_errno;
2203 StgTSO *tso;
2204 Task *task;
2205 #if mingw32_HOST_OS
2206 StgWord32 saved_winerror;
2207 #endif
2208
2209 saved_errno = errno;
2210 #if mingw32_HOST_OS
2211 saved_winerror = GetLastError();
2212 #endif
2213
2214 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2215 */
2216 cap = regTableToCapability(reg);
2217
2218 task = cap->running_task;
2219 tso = cap->r.rCurrentTSO;
2220
2221 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2222
2223 // XXX this might not be necessary --SDM
2224 tso->what_next = ThreadRunGHC;
2225
2226 threadPaused(cap,tso);
2227
2228 if (interruptible) {
2229 tso->why_blocked = BlockedOnCCall_Interruptible;
2230 } else {
2231 tso->why_blocked = BlockedOnCCall;
2232 }
2233
2234 // Hand back capability
2235 task->incall->suspended_tso = tso;
2236 task->incall->suspended_cap = cap;
2237
2238 // Otherwise allocate() will write to invalid memory.
2239 cap->r.rCurrentTSO = NULL;
2240
2241 ACQUIRE_LOCK(&cap->lock);
2242
2243 suspendTask(cap,task);
2244 cap->in_haskell = rtsFalse;
2245 releaseCapability_(cap,rtsFalse);
2246
2247 RELEASE_LOCK(&cap->lock);
2248
2249 errno = saved_errno;
2250 #if mingw32_HOST_OS
2251 SetLastError(saved_winerror);
2252 #endif
2253 return task;
2254 }
2255
2256 StgRegTable *
2257 resumeThread (void *task_)
2258 {
2259 StgTSO *tso;
2260 InCall *incall;
2261 Capability *cap;
2262 Task *task = task_;
2263 int saved_errno;
2264 #if mingw32_HOST_OS
2265 StgWord32 saved_winerror;
2266 #endif
2267
2268 saved_errno = errno;
2269 #if mingw32_HOST_OS
2270 saved_winerror = GetLastError();
2271 #endif
2272
2273 incall = task->incall;
2274 cap = incall->suspended_cap;
2275 task->cap = cap;
2276
2277 // Wait for permission to re-enter the RTS with the result.
2278 waitForReturnCapability(&cap,task);
2279 // we might be on a different capability now... but if so, our
2280 // entry on the suspended_ccalls list will also have been
2281 // migrated.
2282
2283 // Remove the thread from the suspended list
2284 recoverSuspendedTask(cap,task);
2285
2286 tso = incall->suspended_tso;
2287 incall->suspended_tso = NULL;
2288 incall->suspended_cap = NULL;
2289 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2290
2291 traceEventRunThread(cap, tso);
2292
2293 /* Reset blocking status */
2294 tso->why_blocked = NotBlocked;
2295
2296 if ((tso->flags & TSO_BLOCKEX) == 0) {
2297 // avoid locking the TSO if we don't have to
2298 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2299 maybePerformBlockedException(cap,tso);
2300 }
2301 }
2302
2303 cap->r.rCurrentTSO = tso;
2304 cap->in_haskell = rtsTrue;
2305 errno = saved_errno;
2306 #if mingw32_HOST_OS
2307 SetLastError(saved_winerror);
2308 #endif
2309
2310 /* We might have GC'd, mark the TSO dirty again */
2311 dirty_TSO(cap,tso);
2312 dirty_STACK(cap,tso->stackobj);
2313
2314 IF_DEBUG(sanity, checkTSO(tso));
2315
2316 return &cap->r;
2317 }
2318
2319 /* ---------------------------------------------------------------------------
2320 * scheduleThread()
2321 *
2322 * scheduleThread puts a thread on the end of the runnable queue.
2323 * This will usually be done immediately after a thread is created.
2324 * The caller of scheduleThread must create the thread using e.g.
2325 * createThread and push an appropriate closure
2326 * on this thread's stack before the scheduler is invoked.
2327 * ------------------------------------------------------------------------ */
2328
2329 void
2330 scheduleThread(Capability *cap, StgTSO *tso)
2331 {
2332 // The thread goes at the *end* of the run-queue, to avoid possible
2333 // starvation of any threads already on the queue.
2334 appendToRunQueue(cap,tso);
2335 }
2336
2337 void
2338 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2339 {
2340 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2341 // move this thread from now on.
2342 #if defined(THREADED_RTS)
2343 cpu %= enabled_capabilities;
2344 if (cpu == cap->no) {
2345 appendToRunQueue(cap,tso);
2346 } else {
2347 migrateThread(cap, tso, capabilities[cpu]);
2348 }
2349 #else
2350 appendToRunQueue(cap,tso);
2351 #endif
2352 }
2353
2354 void
2355 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2356 {
2357 Task *task;
2358 DEBUG_ONLY( StgThreadID id );
2359 Capability *cap;
2360
2361 cap = *pcap;
2362
2363 // We already created/initialised the Task
2364 task = cap->running_task;
2365
2366 // This TSO is now a bound thread; make the Task and TSO
2367 // point to each other.
2368 tso->bound = task->incall;
2369 tso->cap = cap;
2370
2371 task->incall->tso = tso;
2372 task->incall->ret = ret;
2373 task->incall->stat = NoStatus;
2374
2375 appendToRunQueue(cap,tso);
2376
2377 DEBUG_ONLY( id = tso->id );
2378 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2379
2380 cap = schedule(cap,task);
2381
2382 ASSERT(task->incall->stat != NoStatus);
2383 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2384
2385 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2386 *pcap = cap;
2387 }
2388
2389 /* ----------------------------------------------------------------------------
2390 * Starting Tasks
2391 * ------------------------------------------------------------------------- */
2392
2393 #if defined(THREADED_RTS)
2394 void scheduleWorker (Capability *cap, Task *task)
2395 {
2396 // schedule() runs without a lock.
2397 cap = schedule(cap,task);
2398
2399 // On exit from schedule(), we have a Capability, but possibly not
2400 // the same one we started with.
2401
2402 // During shutdown, the requirement is that after all the
2403 // Capabilities are shut down, all workers that are shutting down
2404 // have finished workerTaskStop(). This is why we hold on to
2405 // cap->lock until we've finished workerTaskStop() below.
2406 //
2407 // There may be workers still involved in foreign calls; those
2408 // will just block in waitForReturnCapability() because the
2409 // Capability has been shut down.
2410 //
2411 ACQUIRE_LOCK(&cap->lock);
2412 releaseCapability_(cap,rtsFalse);
2413 workerTaskStop(task);
2414 RELEASE_LOCK(&cap->lock);
2415 }
2416 #endif
2417
2418 /* ---------------------------------------------------------------------------
2419 * Start new worker tasks on Capabilities from--to
2420 * -------------------------------------------------------------------------- */
2421
2422 static void
2423 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2424 {
2425 #if defined(THREADED_RTS)
2426 nat i;
2427 Capability *cap;
2428
2429 for (i = from; i < to; i++) {
2430 cap = capabilities[i];
2431 ACQUIRE_LOCK(&cap->lock);
2432 startWorkerTask(cap);
2433 RELEASE_LOCK(&cap->lock);
2434 }
2435 #endif
2436 }
2437
2438 /* ---------------------------------------------------------------------------
2439 * initScheduler()
2440 *
2441 * Initialise the scheduler. This resets all the queues - if the
2442 * queues contained any threads, they'll be garbage collected at the
2443 * next pass.
2444 *
2445 * ------------------------------------------------------------------------ */
2446
2447 void
2448 initScheduler(void)
2449 {
2450 #if !defined(THREADED_RTS)
2451 blocked_queue_hd = END_TSO_QUEUE;
2452 blocked_queue_tl = END_TSO_QUEUE;
2453 sleeping_queue = END_TSO_QUEUE;
2454 #endif
2455
2456 sched_state = SCHED_RUNNING;
2457 recent_activity = ACTIVITY_YES;
2458
2459 #if defined(THREADED_RTS)
2460 /* Initialise the mutex and condition variables used by
2461 * the scheduler. */
2462 initMutex(&sched_mutex);
2463 #endif
2464
2465 ACQUIRE_LOCK(&sched_mutex);
2466
2467 /* A capability holds the state a native thread needs in
2468 * order to execute STG code. At least one capability is
2469 * floating around (only THREADED_RTS builds have more than one).
2470 */
2471 initCapabilities();
2472
2473 initTaskManager();
2474
2475 /*
2476 * Eagerly start one worker to run each Capability, except for
2477 * Capability 0. The idea is that we're probably going to start a
2478 * bound thread on Capability 0 pretty soon, so we don't want a
2479 * worker task hogging it.
2480 */
2481 startWorkerTasks(1, n_capabilities);
2482
2483 RELEASE_LOCK(&sched_mutex);
2484
2485 }
2486
2487 void
2488 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2489 /* see Capability.c, shutdownCapability() */
2490 {
2491 Task *task = NULL;
2492
2493 task = newBoundTask();
2494
2495 // If we haven't killed all the threads yet, do it now.
2496 if (sched_state < SCHED_SHUTTING_DOWN) {
2497 sched_state = SCHED_INTERRUPTING;
2498 Capability *cap = task->cap;
2499 waitForReturnCapability(&cap,task);
2500 scheduleDoGC(&cap,task,rtsTrue);
2501 ASSERT(task->incall->tso == NULL);
2502 releaseCapability(cap);
2503 }
2504 sched_state = SCHED_SHUTTING_DOWN;
2505
2506 shutdownCapabilities(task, wait_foreign);
2507
2508 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2509 // n_failed_trygrab_idles, n_idle_caps);
2510
2511 boundTaskExiting(task);
2512 }
2513
2514 void
2515 freeScheduler( void )
2516 {
2517 nat still_running;
2518
2519 ACQUIRE_LOCK(&sched_mutex);
2520 still_running = freeTaskManager();
2521 // We can only free the Capabilities if there are no Tasks still
2522 // running. We might have a Task about to return from a foreign
2523 // call into waitForReturnCapability(), for example (actually,
2524 // this should be the *only* thing that a still-running Task can
2525 // do at this point, and it will block waiting for the
2526 // Capability).
2527 if (still_running == 0) {
2528 freeCapabilities();
2529 }
2530 RELEASE_LOCK(&sched_mutex);
2531 #if defined(THREADED_RTS)
2532 closeMutex(&sched_mutex);
2533 #endif
2534 }
2535
2536 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2537 void *user USED_IF_NOT_THREADS)
2538 {
2539 #if !defined(THREADED_RTS)
2540 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2541 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2542 evac(user, (StgClosure **)(void *)&sleeping_queue);
2543 #endif
2544 }
2545
2546 /* -----------------------------------------------------------------------------
2547 performGC
2548
2549 This is the interface to the garbage collector from Haskell land.
2550 We provide this so that external C code can allocate and garbage
2551 collect when called from Haskell via _ccall_GC.
2552 -------------------------------------------------------------------------- */
2553
2554 static void
2555 performGC_(rtsBool force_major)
2556 {
2557 Task *task;
2558 Capability *cap = NULL;
2559
2560 // We must grab a new Task here, because the existing Task may be
2561 // associated with a particular Capability, and chained onto the
2562 // suspended_ccalls queue.
2563 task = newBoundTask();
2564
2565 // TODO: do we need to traceTask*() here?
2566
2567 waitForReturnCapability(&cap,task);
2568 scheduleDoGC(&cap,task,force_major);
2569 releaseCapability(cap);
2570 boundTaskExiting(task);
2571 }
2572
2573 void
2574 performGC(void)
2575 {
2576 performGC_(rtsFalse);
2577 }
2578
2579 void
2580 performMajorGC(void)
2581 {
2582 performGC_(rtsTrue);
2583 }
2584
2585 /* ---------------------------------------------------------------------------
2586 Interrupt execution
2587 - usually called inside a signal handler so it mustn't do anything fancy.
2588 ------------------------------------------------------------------------ */
2589
2590 void
2591 interruptStgRts(void)
2592 {
2593 sched_state = SCHED_INTERRUPTING;
2594 interruptAllCapabilities();
2595 #if defined(THREADED_RTS)
2596 wakeUpRts();
2597 #endif
2598 }
2599
2600 /* -----------------------------------------------------------------------------
2601 Wake up the RTS
2602
2603 This function causes at least one OS thread to wake up and run the
2604 scheduler loop. It is invoked when the RTS might be deadlocked, or
2605 an external event has arrived that may need servicing (eg. a
2606 keyboard interrupt).
2607
2608 In the single-threaded RTS we don't do anything here; we only have
2609 one thread anyway, and the event that caused us to want to wake up
2610 will have interrupted any blocking system call in progress anyway.
2611 -------------------------------------------------------------------------- */
2612
2613 #if defined(THREADED_RTS)
2614 void wakeUpRts(void)
2615 {
2616 // This forces the IO Manager thread to wakeup, which will
2617 // in turn ensure that some OS thread wakes up and runs the
2618 // scheduler loop, which will cause a GC and deadlock check.
2619 ioManagerWakeup();
2620 }
2621 #endif
2622
2623 /* -----------------------------------------------------------------------------
2624 Deleting threads
2625
2626 This is used for interruption (^C) and forking, and corresponds to
2627 raising an exception but without letting the thread catch the
2628 exception.
2629 -------------------------------------------------------------------------- */
2630
2631 static void
2632 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2633 {
2634 // NOTE: must only be called on a TSO that we have exclusive
2635 // access to, because we will call throwToSingleThreaded() below.
2636 // The TSO must be on the run queue of the Capability we own, or
2637 // we must own all Capabilities.
2638
2639 if (tso->why_blocked != BlockedOnCCall &&
2640 tso->why_blocked != BlockedOnCCall_Interruptible) {
2641 throwToSingleThreaded(tso->cap,tso,NULL);
2642 }
2643 }
2644
2645 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2646 static void
2647 deleteThread_(Capability *cap, StgTSO *tso)
2648 { // for forkProcess only:
2649 // like deleteThread(), but we delete threads in foreign calls, too.
2650
2651 if (tso->why_blocked == BlockedOnCCall ||
2652 tso->why_blocked == BlockedOnCCall_Interruptible) {
2653 tso->what_next = ThreadKilled;
2654 appendToRunQueue(tso->cap, tso);
2655 } else {
2656 deleteThread(cap,tso);
2657 }
2658 }
2659 #endif
2660
2661 /* -----------------------------------------------------------------------------
2662 raiseExceptionHelper
2663
2664 This function is called by the raise# primitve, just so that we can
2665 move some of the tricky bits of raising an exception from C-- into
2666 C. Who knows, it might be a useful re-useable thing here too.
2667 -------------------------------------------------------------------------- */
2668
2669 StgWord
2670 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2671 {
2672 Capability *cap = regTableToCapability(reg);
2673 StgThunk *raise_closure = NULL;
2674 StgPtr p, next;
2675 StgRetInfoTable *info;
2676 //
2677 // This closure represents the expression 'raise# E' where E
2678 // is the exception raise. It is used to overwrite all the
2679 // thunks which are currently under evaluataion.
2680 //
2681
2682 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2683 // LDV profiling: stg_raise_info has THUNK as its closure
2684 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2685 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2686 // 1 does not cause any problem unless profiling is performed.
2687 // However, when LDV profiling goes on, we need to linearly scan
2688 // small object pool, where raise_closure is stored, so we should
2689 // use MIN_UPD_SIZE.
2690 //
2691 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2692 // sizeofW(StgClosure)+1);
2693 //
2694
2695 //
2696 // Walk up the stack, looking for the catch frame. On the way,
2697 // we update any closures pointed to from update frames with the
2698 // raise closure that we just built.
2699 //
2700 p = tso->stackobj->sp;
2701 while(1) {
2702 info = get_ret_itbl((StgClosure *)p);
2703 next = p + stack_frame_sizeW((StgClosure *)p);
2704 switch (info->i.type) {
2705
2706 case UPDATE_FRAME:
2707 // Only create raise_closure if we need to.
2708 if (raise_closure == NULL) {
2709 raise_closure =
2710 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2711 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2712 raise_closure->payload[0] = exception;
2713 }
2714 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2715 (StgClosure *)raise_closure);
2716 p = next;
2717 continue;
2718
2719 case ATOMICALLY_FRAME:
2720 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2721 tso->stackobj->sp = p;
2722 return ATOMICALLY_FRAME;
2723
2724 case CATCH_FRAME:
2725 tso->stackobj->sp = p;
2726 return CATCH_FRAME;
2727
2728 case CATCH_STM_FRAME:
2729 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2730 tso->stackobj->sp = p;
2731 return CATCH_STM_FRAME;
2732
2733 case UNDERFLOW_FRAME:
2734 tso->stackobj->sp = p;
2735 threadStackUnderflow(cap,tso);
2736 p = tso->stackobj->sp;
2737 continue;
2738
2739 case STOP_FRAME:
2740 tso->stackobj->sp = p;
2741 return STOP_FRAME;
2742
2743 case CATCH_RETRY_FRAME: {
2744 StgTRecHeader *trec = tso -> trec;
2745 StgTRecHeader *outer = trec -> enclosing_trec;
2746 debugTrace(DEBUG_stm,
2747 "found CATCH_RETRY_FRAME at %p during raise", p);
2748 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2749 stmAbortTransaction(cap, trec);
2750 stmFreeAbortedTRec(cap, trec);
2751 tso -> trec = outer;
2752 p = next;
2753 continue;
2754 }
2755
2756 default:
2757 p = next;
2758 continue;
2759 }
2760 }
2761 }
2762
2763
2764 /* -----------------------------------------------------------------------------
2765 findRetryFrameHelper
2766
2767 This function is called by the retry# primitive. It traverses the stack
2768 leaving tso->sp referring to the frame which should handle the retry.
2769
2770 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2771 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2772
2773 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2774 create) because retries are not considered to be exceptions, despite the
2775 similar implementation.
2776
2777 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2778 not be created within memory transactions.
2779 -------------------------------------------------------------------------- */
2780
2781 StgWord
2782 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2783 {
2784 StgPtr p, next;
2785 StgRetInfoTable *info;
2786
2787 p = tso->stackobj->sp;
2788 while (1) {
2789 info = get_ret_itbl((StgClosure *)p);
2790 next = p + stack_frame_sizeW((StgClosure *)p);
2791 switch (info->i.type) {
2792
2793 case ATOMICALLY_FRAME:
2794 debugTrace(DEBUG_stm,
2795 "found ATOMICALLY_FRAME at %p during retry", p);
2796 tso->stackobj->sp = p;
2797 return ATOMICALLY_FRAME;
2798
2799 case CATCH_RETRY_FRAME:
2800 debugTrace(DEBUG_stm,
2801 "found CATCH_RETRY_FRAME at %p during retry", p);
2802 tso->stackobj->sp = p;
2803 return CATCH_RETRY_FRAME;
2804
2805 case CATCH_STM_FRAME: {
2806 StgTRecHeader *trec = tso -> trec;
2807 StgTRecHeader *outer = trec -> enclosing_trec;
2808 debugTrace(DEBUG_stm,
2809 "found CATCH_STM_FRAME at %p during retry", p);
2810 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2811 stmAbortTransaction(cap, trec);
2812 stmFreeAbortedTRec(cap, trec);
2813 tso -> trec = outer;
2814 p = next;
2815 continue;
2816 }
2817
2818 case UNDERFLOW_FRAME:
2819 tso->stackobj->sp = p;
2820 threadStackUnderflow(cap,tso);
2821 p = tso->stackobj->sp;
2822 continue;
2823
2824 default:
2825 ASSERT(info->i.type != CATCH_FRAME);
2826 ASSERT(info->i.type != STOP_FRAME);
2827 p = next;
2828 continue;
2829 }
2830 }
2831 }
2832
2833 /* -----------------------------------------------------------------------------
2834 resurrectThreads is called after garbage collection on the list of
2835 threads found to be garbage. Each of these threads will be woken
2836 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2837 on an MVar, or NonTermination if the thread was blocked on a Black
2838 Hole.
2839
2840 Locks: assumes we hold *all* the capabilities.
2841 -------------------------------------------------------------------------- */
2842
2843 void
2844 resurrectThreads (StgTSO *threads)
2845 {
2846 StgTSO *tso, *next;
2847 Capability *cap;
2848 generation *gen;
2849
2850 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2851 next = tso->global_link;
2852
2853 gen = Bdescr((P_)tso)->gen;
2854 tso->global_link = gen->threads;
2855 gen->threads = tso;
2856
2857 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2858
2859 // Wake up the thread on the Capability it was last on
2860 cap = tso->cap;
2861
2862 switch (tso->why_blocked) {
2863 case BlockedOnMVar:
2864 case BlockedOnMVarRead:
2865 /* Called by GC - sched_mutex lock is currently held. */
2866 throwToSingleThreaded(cap, tso,
2867 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2868 break;
2869 case BlockedOnBlackHole:
2870 throwToSingleThreaded(cap, tso,
2871 (StgClosure *)nonTermination_closure);
2872 break;
2873 case BlockedOnSTM:
2874 throwToSingleThreaded(cap, tso,
2875 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2876 break;
2877 case NotBlocked:
2878 /* This might happen if the thread was blocked on a black hole
2879 * belonging to a thread that we've just woken up (raiseAsync
2880 * can wake up threads, remember...).
2881 */
2882 continue;
2883 case BlockedOnMsgThrowTo:
2884 // This can happen if the target is masking, blocks on a
2885 // black hole, and then is found to be unreachable. In
2886 // this case, we want to let the target wake up and carry
2887 // on, and do nothing to this thread.
2888 continue;
2889 default:
2890 barf("resurrectThreads: thread blocked in a strange way: %d",
2891 tso->why_blocked);
2892 }
2893 }
2894 }