89c5cde079fad115db7284e8635a9830ea1cd333
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 // Local stats
107 #ifdef THREADED_RTS
108 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
109 #endif
110
111 /* -----------------------------------------------------------------------------
112 * static function prototypes
113 * -------------------------------------------------------------------------- */
114
115 static Capability *schedule (Capability *initialCapability, Task *task);
116
117 //
118 // These functions all encapsulate parts of the scheduler loop, and are
119 // abstracted only to make the structure and control flow of the
120 // scheduler clearer.
121 //
122 static void schedulePreLoop (void);
123 static void scheduleFindWork (Capability **pcap);
124 #if defined(THREADED_RTS)
125 static void scheduleYield (Capability **pcap, Task *task);
126 #endif
127 #if defined(THREADED_RTS)
128 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
129 static void acquireAllCapabilities(Capability *cap, Task *task);
130 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
131 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
132 #endif
133 static void scheduleStartSignalHandlers (Capability *cap);
134 static void scheduleCheckBlockedThreads (Capability *cap);
135 static void scheduleProcessInbox(Capability **cap);
136 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
137 static void schedulePushWork(Capability *cap, Task *task);
138 #if defined(THREADED_RTS)
139 static void scheduleActivateSpark(Capability *cap);
140 #endif
141 static void schedulePostRunThread(Capability *cap, StgTSO *t);
142 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144 nat prev_what_next );
145 static void scheduleHandleThreadBlocked( StgTSO *t );
146 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
147 StgTSO *t );
148 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
150
151 static void deleteThread (Capability *cap, StgTSO *tso);
152 static void deleteAllThreads (Capability *cap);
153
154 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
155 static void deleteThread_(Capability *cap, StgTSO *tso);
156 #endif
157
158 /* ---------------------------------------------------------------------------
159 Main scheduling loop.
160
161 We use round-robin scheduling, each thread returning to the
162 scheduler loop when one of these conditions is detected:
163
164 * out of heap space
165 * timer expires (thread yields)
166 * thread blocks
167 * thread ends
168 * stack overflow
169
170 ------------------------------------------------------------------------ */
171
172 static Capability *
173 schedule (Capability *initialCapability, Task *task)
174 {
175 StgTSO *t;
176 Capability *cap;
177 StgThreadReturnCode ret;
178 nat prev_what_next;
179 rtsBool ready_to_gc;
180 #if defined(THREADED_RTS)
181 rtsBool first = rtsTrue;
182 #endif
183
184 cap = initialCapability;
185
186 // Pre-condition: this task owns initialCapability.
187 // The sched_mutex is *NOT* held
188 // NB. on return, we still hold a capability.
189
190 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
191
192 schedulePreLoop();
193
194 // -----------------------------------------------------------
195 // Scheduler loop starts here:
196
197 while (1) {
198
199 // Check whether we have re-entered the RTS from Haskell without
200 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
201 // call).
202 if (cap->in_haskell) {
203 errorBelch("schedule: re-entered unsafely.\n"
204 " Perhaps a 'foreign import unsafe' should be 'safe'?");
205 stg_exit(EXIT_FAILURE);
206 }
207
208 // The interruption / shutdown sequence.
209 //
210 // In order to cleanly shut down the runtime, we want to:
211 // * make sure that all main threads return to their callers
212 // with the state 'Interrupted'.
213 // * clean up all OS threads assocated with the runtime
214 // * free all memory etc.
215 //
216 // So the sequence for ^C goes like this:
217 //
218 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
219 // arranges for some Capability to wake up
220 //
221 // * all threads in the system are halted, and the zombies are
222 // placed on the run queue for cleaning up. We acquire all
223 // the capabilities in order to delete the threads, this is
224 // done by scheduleDoGC() for convenience (because GC already
225 // needs to acquire all the capabilities). We can't kill
226 // threads involved in foreign calls.
227 //
228 // * somebody calls shutdownHaskell(), which calls exitScheduler()
229 //
230 // * sched_state := SCHED_SHUTTING_DOWN
231 //
232 // * all workers exit when the run queue on their capability
233 // drains. All main threads will also exit when their TSO
234 // reaches the head of the run queue and they can return.
235 //
236 // * eventually all Capabilities will shut down, and the RTS can
237 // exit.
238 //
239 // * We might be left with threads blocked in foreign calls,
240 // we should really attempt to kill these somehow (TODO);
241
242 switch (sched_state) {
243 case SCHED_RUNNING:
244 break;
245 case SCHED_INTERRUPTING:
246 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
247 /* scheduleDoGC() deletes all the threads */
248 scheduleDoGC(&cap,task,rtsTrue);
249
250 // after scheduleDoGC(), we must be shutting down. Either some
251 // other Capability did the final GC, or we did it above,
252 // either way we can fall through to the SCHED_SHUTTING_DOWN
253 // case now.
254 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
255 // fall through
256
257 case SCHED_SHUTTING_DOWN:
258 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
259 // If we are a worker, just exit. If we're a bound thread
260 // then we will exit below when we've removed our TSO from
261 // the run queue.
262 if (!isBoundTask(task) && emptyRunQueue(cap)) {
263 return cap;
264 }
265 break;
266 default:
267 barf("sched_state: %d", sched_state);
268 }
269
270 scheduleFindWork(&cap);
271
272 /* work pushing, currently relevant only for THREADED_RTS:
273 (pushes threads, wakes up idle capabilities for stealing) */
274 schedulePushWork(cap,task);
275
276 scheduleDetectDeadlock(&cap,task);
277
278 // Normally, the only way we can get here with no threads to
279 // run is if a keyboard interrupt received during
280 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
281 // Additionally, it is not fatal for the
282 // threaded RTS to reach here with no threads to run.
283 //
284 // win32: might be here due to awaitEvent() being abandoned
285 // as a result of a console event having been delivered.
286
287 #if defined(THREADED_RTS)
288 if (first)
289 {
290 // XXX: ToDo
291 // // don't yield the first time, we want a chance to run this
292 // // thread for a bit, even if there are others banging at the
293 // // door.
294 // first = rtsFalse;
295 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
296 }
297
298 scheduleYield(&cap,task);
299
300 if (emptyRunQueue(cap)) continue; // look for work again
301 #endif
302
303 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
304 if ( emptyRunQueue(cap) ) {
305 ASSERT(sched_state >= SCHED_INTERRUPTING);
306 }
307 #endif
308
309 //
310 // Get a thread to run
311 //
312 t = popRunQueue(cap);
313
314 // Sanity check the thread we're about to run. This can be
315 // expensive if there is lots of thread switching going on...
316 IF_DEBUG(sanity,checkTSO(t));
317
318 #if defined(THREADED_RTS)
319 // Check whether we can run this thread in the current task.
320 // If not, we have to pass our capability to the right task.
321 {
322 InCall *bound = t->bound;
323
324 if (bound) {
325 if (bound->task == task) {
326 // yes, the Haskell thread is bound to the current native thread
327 } else {
328 debugTrace(DEBUG_sched,
329 "thread %lu bound to another OS thread",
330 (unsigned long)t->id);
331 // no, bound to a different Haskell thread: pass to that thread
332 pushOnRunQueue(cap,t);
333 continue;
334 }
335 } else {
336 // The thread we want to run is unbound.
337 if (task->incall->tso) {
338 debugTrace(DEBUG_sched,
339 "this OS thread cannot run thread %lu",
340 (unsigned long)t->id);
341 // no, the current native thread is bound to a different
342 // Haskell thread, so pass it to any worker thread
343 pushOnRunQueue(cap,t);
344 continue;
345 }
346 }
347 }
348 #endif
349
350 // If we're shutting down, and this thread has not yet been
351 // killed, kill it now. This sometimes happens when a finalizer
352 // thread is created by the final GC, or a thread previously
353 // in a foreign call returns.
354 if (sched_state >= SCHED_INTERRUPTING &&
355 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
356 deleteThread(cap,t);
357 }
358
359 // If this capability is disabled, migrate the thread away rather
360 // than running it. NB. but not if the thread is bound: it is
361 // really hard for a bound thread to migrate itself. Believe me,
362 // I tried several ways and couldn't find a way to do it.
363 // Instead, when everything is stopped for GC, we migrate all the
364 // threads on the run queue then (see scheduleDoGC()).
365 //
366 // ToDo: what about TSO_LOCKED? Currently we're migrating those
367 // when the number of capabilities drops, but we never migrate
368 // them back if it rises again. Presumably we should, but after
369 // the thread has been migrated we no longer know what capability
370 // it was originally on.
371 #ifdef THREADED_RTS
372 if (cap->disabled && !t->bound) {
373 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
374 migrateThread(cap, t, dest_cap);
375 continue;
376 }
377 #endif
378
379 /* context switches are initiated by the timer signal, unless
380 * the user specified "context switch as often as possible", with
381 * +RTS -C0
382 */
383 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
384 && !emptyThreadQueues(cap)) {
385 cap->context_switch = 1;
386 }
387
388 run_thread:
389
390 // CurrentTSO is the thread to run. It might be different if we
391 // loop back to run_thread, so make sure to set CurrentTSO after
392 // that.
393 cap->r.rCurrentTSO = t;
394
395 startHeapProfTimer();
396
397 // ----------------------------------------------------------------------
398 // Run the current thread
399
400 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
401 ASSERT(t->cap == cap);
402 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
403
404 prev_what_next = t->what_next;
405
406 errno = t->saved_errno;
407 #if mingw32_HOST_OS
408 SetLastError(t->saved_winerror);
409 #endif
410
411 // reset the interrupt flag before running Haskell code
412 cap->interrupt = 0;
413
414 cap->in_haskell = rtsTrue;
415 cap->idle = 0;
416
417 dirty_TSO(cap,t);
418 dirty_STACK(cap,t->stackobj);
419
420 switch (recent_activity)
421 {
422 case ACTIVITY_DONE_GC: {
423 // ACTIVITY_DONE_GC means we turned off the timer signal to
424 // conserve power (see #1623). Re-enable it here.
425 nat prev;
426 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
427 if (prev == ACTIVITY_DONE_GC) {
428 #ifndef PROFILING
429 startTimer();
430 #endif
431 }
432 break;
433 }
434 case ACTIVITY_INACTIVE:
435 // If we reached ACTIVITY_INACTIVE, then don't reset it until
436 // we've done the GC. The thread running here might just be
437 // the IO manager thread that handle_tick() woke up via
438 // wakeUpRts().
439 break;
440 default:
441 recent_activity = ACTIVITY_YES;
442 }
443
444 traceEventRunThread(cap, t);
445
446 switch (prev_what_next) {
447
448 case ThreadKilled:
449 case ThreadComplete:
450 /* Thread already finished, return to scheduler. */
451 ret = ThreadFinished;
452 break;
453
454 case ThreadRunGHC:
455 {
456 StgRegTable *r;
457 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
458 cap = regTableToCapability(r);
459 ret = r->rRet;
460 break;
461 }
462
463 case ThreadInterpret:
464 cap = interpretBCO(cap);
465 ret = cap->r.rRet;
466 break;
467
468 default:
469 barf("schedule: invalid what_next field");
470 }
471
472 cap->in_haskell = rtsFalse;
473
474 // The TSO might have moved, eg. if it re-entered the RTS and a GC
475 // happened. So find the new location:
476 t = cap->r.rCurrentTSO;
477
478 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
479 // don't want it set when not running a Haskell thread.
480 cap->r.rCurrentTSO = NULL;
481
482 // And save the current errno in this thread.
483 // XXX: possibly bogus for SMP because this thread might already
484 // be running again, see code below.
485 t->saved_errno = errno;
486 #if mingw32_HOST_OS
487 // Similarly for Windows error code
488 t->saved_winerror = GetLastError();
489 #endif
490
491 if (ret == ThreadBlocked) {
492 if (t->why_blocked == BlockedOnBlackHole) {
493 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
494 traceEventStopThread(cap, t, t->why_blocked + 6,
495 owner != NULL ? owner->id : 0);
496 } else {
497 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
498 }
499 } else {
500 traceEventStopThread(cap, t, ret, 0);
501 }
502
503 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
504 ASSERT(t->cap == cap);
505
506 // ----------------------------------------------------------------------
507
508 // Costs for the scheduler are assigned to CCS_SYSTEM
509 stopHeapProfTimer();
510 #if defined(PROFILING)
511 cap->r.rCCCS = CCS_SYSTEM;
512 #endif
513
514 schedulePostRunThread(cap,t);
515
516 ready_to_gc = rtsFalse;
517
518 switch (ret) {
519 case HeapOverflow:
520 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
521 break;
522
523 case StackOverflow:
524 // just adjust the stack for this thread, then pop it back
525 // on the run queue.
526 threadStackOverflow(cap, t);
527 pushOnRunQueue(cap,t);
528 break;
529
530 case ThreadYielding:
531 if (scheduleHandleYield(cap, t, prev_what_next)) {
532 // shortcut for switching between compiler/interpreter:
533 goto run_thread;
534 }
535 break;
536
537 case ThreadBlocked:
538 scheduleHandleThreadBlocked(t);
539 break;
540
541 case ThreadFinished:
542 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
543 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
544 break;
545
546 default:
547 barf("schedule: invalid thread return code %d", (int)ret);
548 }
549
550 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
551 scheduleDoGC(&cap,task,rtsFalse);
552 }
553 } /* end of while() */
554 }
555
556 /* -----------------------------------------------------------------------------
557 * Run queue operations
558 * -------------------------------------------------------------------------- */
559
560 static void
561 removeFromRunQueue (Capability *cap, StgTSO *tso)
562 {
563 if (tso->block_info.prev == END_TSO_QUEUE) {
564 ASSERT(cap->run_queue_hd == tso);
565 cap->run_queue_hd = tso->_link;
566 } else {
567 setTSOLink(cap, tso->block_info.prev, tso->_link);
568 }
569 if (tso->_link == END_TSO_QUEUE) {
570 ASSERT(cap->run_queue_tl == tso);
571 cap->run_queue_tl = tso->block_info.prev;
572 } else {
573 setTSOPrev(cap, tso->_link, tso->block_info.prev);
574 }
575 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
576
577 IF_DEBUG(sanity, checkRunQueue(cap));
578 }
579
580 void
581 promoteInRunQueue (Capability *cap, StgTSO *tso)
582 {
583 removeFromRunQueue(cap, tso);
584 pushOnRunQueue(cap, tso);
585 }
586
587 /* ----------------------------------------------------------------------------
588 * Setting up the scheduler loop
589 * ------------------------------------------------------------------------- */
590
591 static void
592 schedulePreLoop(void)
593 {
594 // initialisation for scheduler - what cannot go into initScheduler()
595
596 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
597 win32AllocStack();
598 #endif
599 }
600
601 /* -----------------------------------------------------------------------------
602 * scheduleFindWork()
603 *
604 * Search for work to do, and handle messages from elsewhere.
605 * -------------------------------------------------------------------------- */
606
607 static void
608 scheduleFindWork (Capability **pcap)
609 {
610 scheduleStartSignalHandlers(*pcap);
611
612 scheduleProcessInbox(pcap);
613
614 scheduleCheckBlockedThreads(*pcap);
615
616 #if defined(THREADED_RTS)
617 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
618 #endif
619 }
620
621 #if defined(THREADED_RTS)
622 STATIC_INLINE rtsBool
623 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
624 {
625 // we need to yield this capability to someone else if..
626 // - another thread is initiating a GC, and we didn't just do a GC
627 // (see Note [GC livelock])
628 // - another Task is returning from a foreign call
629 // - the thread at the head of the run queue cannot be run
630 // by this Task (it is bound to another Task, or it is unbound
631 // and this task it bound).
632 //
633 // Note [GC livelock]
634 //
635 // If we are interrupted to do a GC, then we do not immediately do
636 // another one. This avoids a starvation situation where one
637 // Capability keeps forcing a GC and the other Capabilities make no
638 // progress at all.
639
640 return ((pending_sync && !didGcLast) ||
641 cap->returning_tasks_hd != NULL ||
642 (!emptyRunQueue(cap) && (task->incall->tso == NULL
643 ? peekRunQueue(cap)->bound != NULL
644 : peekRunQueue(cap)->bound != task->incall)));
645 }
646
647 // This is the single place where a Task goes to sleep. There are
648 // two reasons it might need to sleep:
649 // - there are no threads to run
650 // - we need to yield this Capability to someone else
651 // (see shouldYieldCapability())
652 //
653 // Careful: the scheduler loop is quite delicate. Make sure you run
654 // the tests in testsuite/concurrent (all ways) after modifying this,
655 // and also check the benchmarks in nofib/parallel for regressions.
656
657 static void
658 scheduleYield (Capability **pcap, Task *task)
659 {
660 Capability *cap = *pcap;
661 int didGcLast = rtsFalse;
662
663 // if we have work, and we don't need to give up the Capability, continue.
664 //
665 if (!shouldYieldCapability(cap,task,rtsFalse) &&
666 (!emptyRunQueue(cap) ||
667 !emptyInbox(cap) ||
668 sched_state >= SCHED_INTERRUPTING)) {
669 return;
670 }
671
672 // otherwise yield (sleep), and keep yielding if necessary.
673 do {
674 didGcLast = yieldCapability(&cap,task, !didGcLast);
675 }
676 while (shouldYieldCapability(cap,task,didGcLast));
677
678 // note there may still be no threads on the run queue at this
679 // point, the caller has to check.
680
681 *pcap = cap;
682 return;
683 }
684 #endif
685
686 /* -----------------------------------------------------------------------------
687 * schedulePushWork()
688 *
689 * Push work to other Capabilities if we have some.
690 * -------------------------------------------------------------------------- */
691
692 static void
693 schedulePushWork(Capability *cap USED_IF_THREADS,
694 Task *task USED_IF_THREADS)
695 {
696 /* following code not for PARALLEL_HASKELL. I kept the call general,
697 future GUM versions might use pushing in a distributed setup */
698 #if defined(THREADED_RTS)
699
700 Capability *free_caps[n_capabilities], *cap0;
701 nat i, n_free_caps;
702
703 // migration can be turned off with +RTS -qm
704 if (!RtsFlags.ParFlags.migrate) return;
705
706 // Check whether we have more threads on our run queue, or sparks
707 // in our pool, that we could hand to another Capability.
708 if (emptyRunQueue(cap)) {
709 if (sparkPoolSizeCap(cap) < 2) return;
710 } else {
711 if (singletonRunQueue(cap) &&
712 sparkPoolSizeCap(cap) < 1) return;
713 }
714
715 // First grab as many free Capabilities as we can.
716 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
717 cap0 = capabilities[i];
718 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
719 if (!emptyRunQueue(cap0)
720 || cap0->returning_tasks_hd != NULL
721 || cap0->inbox != (Message*)END_TSO_QUEUE) {
722 // it already has some work, we just grabbed it at
723 // the wrong moment. Or maybe it's deadlocked!
724 releaseCapability(cap0);
725 } else {
726 free_caps[n_free_caps++] = cap0;
727 }
728 }
729 }
730
731 // we now have n_free_caps free capabilities stashed in
732 // free_caps[]. Share our run queue equally with them. This is
733 // probably the simplest thing we could do; improvements we might
734 // want to do include:
735 //
736 // - giving high priority to moving relatively new threads, on
737 // the gournds that they haven't had time to build up a
738 // working set in the cache on this CPU/Capability.
739 //
740 // - giving low priority to moving long-lived threads
741
742 if (n_free_caps > 0) {
743 StgTSO *prev, *t, *next;
744 #ifdef SPARK_PUSHING
745 rtsBool pushed_to_all;
746 #endif
747
748 debugTrace(DEBUG_sched,
749 "cap %d: %s and %d free capabilities, sharing...",
750 cap->no,
751 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
752 "excess threads on run queue":"sparks to share (>=2)",
753 n_free_caps);
754
755 i = 0;
756 #ifdef SPARK_PUSHING
757 pushed_to_all = rtsFalse;
758 #endif
759
760 if (cap->run_queue_hd != END_TSO_QUEUE) {
761 prev = cap->run_queue_hd;
762 t = prev->_link;
763 prev->_link = END_TSO_QUEUE;
764 for (; t != END_TSO_QUEUE; t = next) {
765 next = t->_link;
766 t->_link = END_TSO_QUEUE;
767 if (t->bound == task->incall // don't move my bound thread
768 || tsoLocked(t)) { // don't move a locked thread
769 setTSOLink(cap, prev, t);
770 setTSOPrev(cap, t, prev);
771 prev = t;
772 } else if (i == n_free_caps) {
773 #ifdef SPARK_PUSHING
774 pushed_to_all = rtsTrue;
775 #endif
776 i = 0;
777 // keep one for us
778 setTSOLink(cap, prev, t);
779 setTSOPrev(cap, t, prev);
780 prev = t;
781 } else {
782 appendToRunQueue(free_caps[i],t);
783
784 traceEventMigrateThread (cap, t, free_caps[i]->no);
785
786 if (t->bound) { t->bound->task->cap = free_caps[i]; }
787 t->cap = free_caps[i];
788 i++;
789 }
790 }
791 cap->run_queue_tl = prev;
792
793 IF_DEBUG(sanity, checkRunQueue(cap));
794 }
795
796 #ifdef SPARK_PUSHING
797 /* JB I left this code in place, it would work but is not necessary */
798
799 // If there are some free capabilities that we didn't push any
800 // threads to, then try to push a spark to each one.
801 if (!pushed_to_all) {
802 StgClosure *spark;
803 // i is the next free capability to push to
804 for (; i < n_free_caps; i++) {
805 if (emptySparkPoolCap(free_caps[i])) {
806 spark = tryStealSpark(cap->sparks);
807 if (spark != NULL) {
808 /* TODO: if anyone wants to re-enable this code then
809 * they must consider the fizzledSpark(spark) case
810 * and update the per-cap spark statistics.
811 */
812 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
813
814 traceEventStealSpark(free_caps[i], t, cap->no);
815
816 newSpark(&(free_caps[i]->r), spark);
817 }
818 }
819 }
820 }
821 #endif /* SPARK_PUSHING */
822
823 // release the capabilities
824 for (i = 0; i < n_free_caps; i++) {
825 task->cap = free_caps[i];
826 releaseAndWakeupCapability(free_caps[i]);
827 }
828 }
829 task->cap = cap; // reset to point to our Capability.
830
831 #endif /* THREADED_RTS */
832
833 }
834
835 /* ----------------------------------------------------------------------------
836 * Start any pending signal handlers
837 * ------------------------------------------------------------------------- */
838
839 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
840 static void
841 scheduleStartSignalHandlers(Capability *cap)
842 {
843 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
844 // safe outside the lock
845 startSignalHandlers(cap);
846 }
847 }
848 #else
849 static void
850 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
851 {
852 }
853 #endif
854
855 /* ----------------------------------------------------------------------------
856 * Check for blocked threads that can be woken up.
857 * ------------------------------------------------------------------------- */
858
859 static void
860 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
861 {
862 #if !defined(THREADED_RTS)
863 //
864 // Check whether any waiting threads need to be woken up. If the
865 // run queue is empty, and there are no other tasks running, we
866 // can wait indefinitely for something to happen.
867 //
868 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
869 {
870 awaitEvent (emptyRunQueue(cap));
871 }
872 #endif
873 }
874
875 /* ----------------------------------------------------------------------------
876 * Detect deadlock conditions and attempt to resolve them.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleDetectDeadlock (Capability **pcap, Task *task)
881 {
882 Capability *cap = *pcap;
883 /*
884 * Detect deadlock: when we have no threads to run, there are no
885 * threads blocked, waiting for I/O, or sleeping, and all the
886 * other tasks are waiting for work, we must have a deadlock of
887 * some description.
888 */
889 if ( emptyThreadQueues(cap) )
890 {
891 #if defined(THREADED_RTS)
892 /*
893 * In the threaded RTS, we only check for deadlock if there
894 * has been no activity in a complete timeslice. This means
895 * we won't eagerly start a full GC just because we don't have
896 * any threads to run currently.
897 */
898 if (recent_activity != ACTIVITY_INACTIVE) return;
899 #endif
900
901 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
902
903 // Garbage collection can release some new threads due to
904 // either (a) finalizers or (b) threads resurrected because
905 // they are unreachable and will therefore be sent an
906 // exception. Any threads thus released will be immediately
907 // runnable.
908 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
909 cap = *pcap;
910 // when force_major == rtsTrue. scheduleDoGC sets
911 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
912 // signal.
913
914 if ( !emptyRunQueue(cap) ) return;
915
916 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
917 /* If we have user-installed signal handlers, then wait
918 * for signals to arrive rather then bombing out with a
919 * deadlock.
920 */
921 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
922 debugTrace(DEBUG_sched,
923 "still deadlocked, waiting for signals...");
924
925 awaitUserSignals();
926
927 if (signals_pending()) {
928 startSignalHandlers(cap);
929 }
930
931 // either we have threads to run, or we were interrupted:
932 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
933
934 return;
935 }
936 #endif
937
938 #if !defined(THREADED_RTS)
939 /* Probably a real deadlock. Send the current main thread the
940 * Deadlock exception.
941 */
942 if (task->incall->tso) {
943 switch (task->incall->tso->why_blocked) {
944 case BlockedOnSTM:
945 case BlockedOnBlackHole:
946 case BlockedOnMsgThrowTo:
947 case BlockedOnMVar:
948 case BlockedOnMVarRead:
949 throwToSingleThreaded(cap, task->incall->tso,
950 (StgClosure *)nonTermination_closure);
951 return;
952 default:
953 barf("deadlock: main thread blocked in a strange way");
954 }
955 }
956 return;
957 #endif
958 }
959 }
960
961
962 /* ----------------------------------------------------------------------------
963 * Process message in the current Capability's inbox
964 * ------------------------------------------------------------------------- */
965
966 static void
967 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
968 {
969 #if defined(THREADED_RTS)
970 Message *m, *next;
971 int r;
972 Capability *cap = *pcap;
973
974 while (!emptyInbox(cap)) {
975 if (cap->r.rCurrentNursery->link == NULL ||
976 g0->n_new_large_words >= large_alloc_lim) {
977 scheduleDoGC(pcap, cap->running_task, rtsFalse);
978 cap = *pcap;
979 }
980
981 // don't use a blocking acquire; if the lock is held by
982 // another thread then just carry on. This seems to avoid
983 // getting stuck in a message ping-pong situation with other
984 // processors. We'll check the inbox again later anyway.
985 //
986 // We should really use a more efficient queue data structure
987 // here. The trickiness is that we must ensure a Capability
988 // never goes idle if the inbox is non-empty, which is why we
989 // use cap->lock (cap->lock is released as the last thing
990 // before going idle; see Capability.c:releaseCapability()).
991 r = TRY_ACQUIRE_LOCK(&cap->lock);
992 if (r != 0) return;
993
994 m = cap->inbox;
995 cap->inbox = (Message*)END_TSO_QUEUE;
996
997 RELEASE_LOCK(&cap->lock);
998
999 while (m != (Message*)END_TSO_QUEUE) {
1000 next = m->link;
1001 executeMessage(cap, m);
1002 m = next;
1003 }
1004 }
1005 #endif
1006 }
1007
1008 /* ----------------------------------------------------------------------------
1009 * Activate spark threads (THREADED_RTS)
1010 * ------------------------------------------------------------------------- */
1011
1012 #if defined(THREADED_RTS)
1013 static void
1014 scheduleActivateSpark(Capability *cap)
1015 {
1016 if (anySparks() && !cap->disabled)
1017 {
1018 createSparkThread(cap);
1019 debugTrace(DEBUG_sched, "creating a spark thread");
1020 }
1021 }
1022 #endif // THREADED_RTS
1023
1024 /* ----------------------------------------------------------------------------
1025 * After running a thread...
1026 * ------------------------------------------------------------------------- */
1027
1028 static void
1029 schedulePostRunThread (Capability *cap, StgTSO *t)
1030 {
1031 // We have to be able to catch transactions that are in an
1032 // infinite loop as a result of seeing an inconsistent view of
1033 // memory, e.g.
1034 //
1035 // atomically $ do
1036 // [a,b] <- mapM readTVar [ta,tb]
1037 // when (a == b) loop
1038 //
1039 // and a is never equal to b given a consistent view of memory.
1040 //
1041 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1042 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1043 debugTrace(DEBUG_sched | DEBUG_stm,
1044 "trec %p found wasting its time", t);
1045
1046 // strip the stack back to the
1047 // ATOMICALLY_FRAME, aborting the (nested)
1048 // transaction, and saving the stack of any
1049 // partially-evaluated thunks on the heap.
1050 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1051
1052 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1053 }
1054 }
1055
1056 //
1057 // If the current thread's allocation limit has run out, send it
1058 // the AllocationLimitExceeded exception.
1059
1060 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1061 // Use a throwToSelf rather than a throwToSingleThreaded, because
1062 // it correctly handles the case where the thread is currently
1063 // inside mask. Also the thread might be blocked (e.g. on an
1064 // MVar), and throwToSingleThreaded doesn't unblock it
1065 // correctly in that case.
1066 throwToSelf(cap, t, allocationLimitExceeded_closure);
1067 ASSIGN_Int64((W_*)&(t->alloc_limit),
1068 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1069 }
1070
1071 /* some statistics gathering in the parallel case */
1072 }
1073
1074 /* -----------------------------------------------------------------------------
1075 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1076 * -------------------------------------------------------------------------- */
1077
1078 static rtsBool
1079 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1080 {
1081 if (cap->r.rHpLim == NULL || cap->context_switch) {
1082 // Sometimes we miss a context switch, e.g. when calling
1083 // primitives in a tight loop, MAYBE_GC() doesn't check the
1084 // context switch flag, and we end up waiting for a GC.
1085 // See #1984, and concurrent/should_run/1984
1086 cap->context_switch = 0;
1087 appendToRunQueue(cap,t);
1088 } else {
1089 pushOnRunQueue(cap,t);
1090 }
1091
1092 // did the task ask for a large block?
1093 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1094 // if so, get one and push it on the front of the nursery.
1095 bdescr *bd;
1096 W_ blocks;
1097
1098 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1099
1100 if (blocks > BLOCKS_PER_MBLOCK) {
1101 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1102 }
1103
1104 debugTrace(DEBUG_sched,
1105 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1106 (long)t->id, what_next_strs[t->what_next], blocks);
1107
1108 // don't do this if the nursery is (nearly) full, we'll GC first.
1109 if (cap->r.rCurrentNursery->link != NULL ||
1110 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1111 // infinite loop if the
1112 // nursery has only one
1113 // block.
1114
1115 bd = allocGroup_lock(blocks);
1116 cap->r.rNursery->n_blocks += blocks;
1117
1118 // link the new group after CurrentNursery
1119 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1120
1121 // initialise it as a nursery block. We initialise the
1122 // step, gen_no, and flags field of *every* sub-block in
1123 // this large block, because this is easier than making
1124 // sure that we always find the block head of a large
1125 // block whenever we call Bdescr() (eg. evacuate() and
1126 // isAlive() in the GC would both have to do this, at
1127 // least).
1128 {
1129 bdescr *x;
1130 for (x = bd; x < bd + blocks; x++) {
1131 initBdescr(x,g0,g0);
1132 x->free = x->start;
1133 x->flags = 0;
1134 }
1135 }
1136
1137 // This assert can be a killer if the app is doing lots
1138 // of large block allocations.
1139 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1140
1141 // now update the nursery to point to the new block
1142 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1143 cap->r.rCurrentNursery = bd;
1144
1145 // we might be unlucky and have another thread get on the
1146 // run queue before us and steal the large block, but in that
1147 // case the thread will just end up requesting another large
1148 // block.
1149 return rtsFalse; /* not actually GC'ing */
1150 }
1151 }
1152
1153 // if we got here because we exceeded large_alloc_lim, then
1154 // proceed straight to GC.
1155 if (g0->n_new_large_words >= large_alloc_lim) {
1156 return rtsTrue;
1157 }
1158
1159 // Otherwise, we just ran out of space in the current nursery.
1160 // Grab another nursery if we can.
1161 if (getNewNursery(cap)) {
1162 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1163 return rtsFalse;
1164 }
1165
1166 return rtsTrue;
1167 /* actual GC is done at the end of the while loop in schedule() */
1168 }
1169
1170 /* -----------------------------------------------------------------------------
1171 * Handle a thread that returned to the scheduler with ThreadYielding
1172 * -------------------------------------------------------------------------- */
1173
1174 static rtsBool
1175 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1176 {
1177 /* put the thread back on the run queue. Then, if we're ready to
1178 * GC, check whether this is the last task to stop. If so, wake
1179 * up the GC thread. getThread will block during a GC until the
1180 * GC is finished.
1181 */
1182
1183 ASSERT(t->_link == END_TSO_QUEUE);
1184
1185 // Shortcut if we're just switching evaluators: don't bother
1186 // doing stack squeezing (which can be expensive), just run the
1187 // thread.
1188 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1189 debugTrace(DEBUG_sched,
1190 "--<< thread %ld (%s) stopped to switch evaluators",
1191 (long)t->id, what_next_strs[t->what_next]);
1192 return rtsTrue;
1193 }
1194
1195 // Reset the context switch flag. We don't do this just before
1196 // running the thread, because that would mean we would lose ticks
1197 // during GC, which can lead to unfair scheduling (a thread hogs
1198 // the CPU because the tick always arrives during GC). This way
1199 // penalises threads that do a lot of allocation, but that seems
1200 // better than the alternative.
1201 if (cap->context_switch != 0) {
1202 cap->context_switch = 0;
1203 appendToRunQueue(cap,t);
1204 } else {
1205 pushOnRunQueue(cap,t);
1206 }
1207
1208 IF_DEBUG(sanity,
1209 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1210 checkTSO(t));
1211
1212 return rtsFalse;
1213 }
1214
1215 /* -----------------------------------------------------------------------------
1216 * Handle a thread that returned to the scheduler with ThreadBlocked
1217 * -------------------------------------------------------------------------- */
1218
1219 static void
1220 scheduleHandleThreadBlocked( StgTSO *t
1221 #if !defined(DEBUG)
1222 STG_UNUSED
1223 #endif
1224 )
1225 {
1226
1227 // We don't need to do anything. The thread is blocked, and it
1228 // has tidied up its stack and placed itself on whatever queue
1229 // it needs to be on.
1230
1231 // ASSERT(t->why_blocked != NotBlocked);
1232 // Not true: for example,
1233 // - the thread may have woken itself up already, because
1234 // threadPaused() might have raised a blocked throwTo
1235 // exception, see maybePerformBlockedException().
1236
1237 #ifdef DEBUG
1238 traceThreadStatus(DEBUG_sched, t);
1239 #endif
1240 }
1241
1242 /* -----------------------------------------------------------------------------
1243 * Handle a thread that returned to the scheduler with ThreadFinished
1244 * -------------------------------------------------------------------------- */
1245
1246 static rtsBool
1247 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1248 {
1249 /* Need to check whether this was a main thread, and if so,
1250 * return with the return value.
1251 *
1252 * We also end up here if the thread kills itself with an
1253 * uncaught exception, see Exception.cmm.
1254 */
1255
1256 // blocked exceptions can now complete, even if the thread was in
1257 // blocked mode (see #2910).
1258 awakenBlockedExceptionQueue (cap, t);
1259
1260 //
1261 // Check whether the thread that just completed was a bound
1262 // thread, and if so return with the result.
1263 //
1264 // There is an assumption here that all thread completion goes
1265 // through this point; we need to make sure that if a thread
1266 // ends up in the ThreadKilled state, that it stays on the run
1267 // queue so it can be dealt with here.
1268 //
1269
1270 if (t->bound) {
1271
1272 if (t->bound != task->incall) {
1273 #if !defined(THREADED_RTS)
1274 // Must be a bound thread that is not the topmost one. Leave
1275 // it on the run queue until the stack has unwound to the
1276 // point where we can deal with this. Leaving it on the run
1277 // queue also ensures that the garbage collector knows about
1278 // this thread and its return value (it gets dropped from the
1279 // step->threads list so there's no other way to find it).
1280 appendToRunQueue(cap,t);
1281 return rtsFalse;
1282 #else
1283 // this cannot happen in the threaded RTS, because a
1284 // bound thread can only be run by the appropriate Task.
1285 barf("finished bound thread that isn't mine");
1286 #endif
1287 }
1288
1289 ASSERT(task->incall->tso == t);
1290
1291 if (t->what_next == ThreadComplete) {
1292 if (task->incall->ret) {
1293 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1294 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1295 }
1296 task->incall->rstat = Success;
1297 } else {
1298 if (task->incall->ret) {
1299 *(task->incall->ret) = NULL;
1300 }
1301 if (sched_state >= SCHED_INTERRUPTING) {
1302 if (heap_overflow) {
1303 task->incall->rstat = HeapExhausted;
1304 } else {
1305 task->incall->rstat = Interrupted;
1306 }
1307 } else {
1308 task->incall->rstat = Killed;
1309 }
1310 }
1311 #ifdef DEBUG
1312 removeThreadLabel((StgWord)task->incall->tso->id);
1313 #endif
1314
1315 // We no longer consider this thread and task to be bound to
1316 // each other. The TSO lives on until it is GC'd, but the
1317 // task is about to be released by the caller, and we don't
1318 // want anyone following the pointer from the TSO to the
1319 // defunct task (which might have already been
1320 // re-used). This was a real bug: the GC updated
1321 // tso->bound->tso which lead to a deadlock.
1322 t->bound = NULL;
1323 task->incall->tso = NULL;
1324
1325 return rtsTrue; // tells schedule() to return
1326 }
1327
1328 return rtsFalse;
1329 }
1330
1331 /* -----------------------------------------------------------------------------
1332 * Perform a heap census
1333 * -------------------------------------------------------------------------- */
1334
1335 static rtsBool
1336 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1337 {
1338 // When we have +RTS -i0 and we're heap profiling, do a census at
1339 // every GC. This lets us get repeatable runs for debugging.
1340 if (performHeapProfile ||
1341 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1342 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1343 return rtsTrue;
1344 } else {
1345 return rtsFalse;
1346 }
1347 }
1348
1349 /* -----------------------------------------------------------------------------
1350 * Start a synchronisation of all capabilities
1351 * -------------------------------------------------------------------------- */
1352
1353 // Returns:
1354 // 0 if we successfully got a sync
1355 // non-0 if there was another sync request in progress,
1356 // and we yielded to it. The value returned is the
1357 // type of the other sync request.
1358 //
1359 #if defined(THREADED_RTS)
1360 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1361 {
1362 nat prev_pending_sync;
1363
1364 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1365
1366 if (prev_pending_sync)
1367 {
1368 do {
1369 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1370 prev_pending_sync);
1371 ASSERT(*pcap);
1372 yieldCapability(pcap,task,rtsTrue);
1373 } while (pending_sync);
1374 return prev_pending_sync; // NOTE: task->cap might have changed now
1375 }
1376 else
1377 {
1378 return 0;
1379 }
1380 }
1381
1382 //
1383 // Grab all the capabilities except the one we already hold. Used
1384 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1385 // before a fork (SYNC_OTHER).
1386 //
1387 // Only call this after requestSync(), otherwise a deadlock might
1388 // ensue if another thread is trying to synchronise.
1389 //
1390 static void acquireAllCapabilities(Capability *cap, Task *task)
1391 {
1392 Capability *tmpcap;
1393 nat i;
1394
1395 for (i=0; i < n_capabilities; i++) {
1396 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1397 tmpcap = capabilities[i];
1398 if (tmpcap != cap) {
1399 // we better hope this task doesn't get migrated to
1400 // another Capability while we're waiting for this one.
1401 // It won't, because load balancing happens while we have
1402 // all the Capabilities, but even so it's a slightly
1403 // unsavoury invariant.
1404 task->cap = tmpcap;
1405 waitForCapability(&tmpcap, task);
1406 if (tmpcap->no != i) {
1407 barf("acquireAllCapabilities: got the wrong capability");
1408 }
1409 }
1410 }
1411 task->cap = cap;
1412 }
1413
1414 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1415 {
1416 nat i;
1417
1418 for (i = 0; i < n; i++) {
1419 if (cap->no != i) {
1420 task->cap = capabilities[i];
1421 releaseCapability(capabilities[i]);
1422 }
1423 }
1424 task->cap = cap;
1425 }
1426 #endif
1427
1428 /* -----------------------------------------------------------------------------
1429 * Perform a garbage collection if necessary
1430 * -------------------------------------------------------------------------- */
1431
1432 static void
1433 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1434 rtsBool force_major)
1435 {
1436 Capability *cap = *pcap;
1437 rtsBool heap_census;
1438 nat collect_gen;
1439 rtsBool major_gc;
1440 #ifdef THREADED_RTS
1441 nat gc_type;
1442 nat i, sync;
1443 StgTSO *tso;
1444 #endif
1445
1446 if (sched_state == SCHED_SHUTTING_DOWN) {
1447 // The final GC has already been done, and the system is
1448 // shutting down. We'll probably deadlock if we try to GC
1449 // now.
1450 return;
1451 }
1452
1453 heap_census = scheduleNeedHeapProfile(rtsTrue);
1454
1455 // Figure out which generation we are collecting, so that we can
1456 // decide whether this is a parallel GC or not.
1457 collect_gen = calcNeeded(force_major || heap_census, NULL);
1458 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1459
1460 #ifdef THREADED_RTS
1461 if (sched_state < SCHED_INTERRUPTING
1462 && RtsFlags.ParFlags.parGcEnabled
1463 && collect_gen >= RtsFlags.ParFlags.parGcGen
1464 && ! oldest_gen->mark)
1465 {
1466 gc_type = SYNC_GC_PAR;
1467 } else {
1468 gc_type = SYNC_GC_SEQ;
1469 }
1470
1471 // In order to GC, there must be no threads running Haskell code.
1472 // Therefore, the GC thread needs to hold *all* the capabilities,
1473 // and release them after the GC has completed.
1474 //
1475 // This seems to be the simplest way: previous attempts involved
1476 // making all the threads with capabilities give up their
1477 // capabilities and sleep except for the *last* one, which
1478 // actually did the GC. But it's quite hard to arrange for all
1479 // the other tasks to sleep and stay asleep.
1480 //
1481
1482 /* Other capabilities are prevented from running yet more Haskell
1483 threads if pending_sync is set. Tested inside
1484 yieldCapability() and releaseCapability() in Capability.c */
1485
1486 do {
1487 sync = requestSync(pcap, task, gc_type);
1488 cap = *pcap;
1489 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1490 // someone else had a pending sync request for a GC, so
1491 // let's assume GC has been done and we don't need to GC
1492 // again.
1493 return;
1494 }
1495 if (sched_state == SCHED_SHUTTING_DOWN) {
1496 // The scheduler might now be shutting down. We tested
1497 // this above, but it might have become true since then as
1498 // we yielded the capability in requestSync().
1499 return;
1500 }
1501 } while (sync);
1502
1503 // don't declare this until after we have sync'd, because
1504 // n_capabilities may change.
1505 rtsBool idle_cap[n_capabilities];
1506 #ifdef DEBUG
1507 unsigned int old_n_capabilities = n_capabilities;
1508 #endif
1509
1510 interruptAllCapabilities();
1511
1512 // The final shutdown GC is always single-threaded, because it's
1513 // possible that some of the Capabilities have no worker threads.
1514
1515 if (gc_type == SYNC_GC_SEQ)
1516 {
1517 traceEventRequestSeqGc(cap);
1518 }
1519 else
1520 {
1521 traceEventRequestParGc(cap);
1522 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1523 }
1524
1525 if (gc_type == SYNC_GC_SEQ)
1526 {
1527 // single-threaded GC: grab all the capabilities
1528 acquireAllCapabilities(cap,task);
1529 }
1530 else
1531 {
1532 // If we are load-balancing collections in this
1533 // generation, then we require all GC threads to participate
1534 // in the collection. Otherwise, we only require active
1535 // threads to participate, and we set gc_threads[i]->idle for
1536 // any idle capabilities. The rationale here is that waking
1537 // up an idle Capability takes much longer than just doing any
1538 // GC work on its behalf.
1539
1540 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1541 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1542 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1543 for (i=0; i < n_capabilities; i++) {
1544 if (capabilities[i]->disabled) {
1545 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1546 } else {
1547 idle_cap[i] = rtsFalse;
1548 }
1549 }
1550 } else {
1551 for (i=0; i < n_capabilities; i++) {
1552 if (capabilities[i]->disabled) {
1553 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1554 } else if (i == cap->no ||
1555 capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1556 idle_cap[i] = rtsFalse;
1557 } else {
1558 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1559 if (!idle_cap[i]) {
1560 n_failed_trygrab_idles++;
1561 } else {
1562 n_idle_caps++;
1563 }
1564 }
1565 }
1566 }
1567
1568 // We set the gc_thread[i]->idle flag if that
1569 // capability/thread is not participating in this collection.
1570 // We also keep a local record of which capabilities are idle
1571 // in idle_cap[], because scheduleDoGC() is re-entrant:
1572 // another thread might start a GC as soon as we've finished
1573 // this one, and thus the gc_thread[]->idle flags are invalid
1574 // as soon as we release any threads after GC. Getting this
1575 // wrong leads to a rare and hard to debug deadlock!
1576
1577 for (i=0; i < n_capabilities; i++) {
1578 gc_threads[i]->idle = idle_cap[i];
1579 capabilities[i]->idle++;
1580 }
1581
1582 // For all capabilities participating in this GC, wait until
1583 // they have stopped mutating and are standing by for GC.
1584 waitForGcThreads(cap);
1585
1586 #if defined(THREADED_RTS)
1587 // Stable point where we can do a global check on our spark counters
1588 ASSERT(checkSparkCountInvariant());
1589 #endif
1590 }
1591
1592 #endif
1593
1594 IF_DEBUG(scheduler, printAllThreads());
1595
1596 delete_threads_and_gc:
1597 /*
1598 * We now have all the capabilities; if we're in an interrupting
1599 * state, then we should take the opportunity to delete all the
1600 * threads in the system.
1601 * Checking for major_gc ensures that the last GC is major.
1602 */
1603 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1604 deleteAllThreads(cap);
1605 #if defined(THREADED_RTS)
1606 // Discard all the sparks from every Capability. Why?
1607 // They'll probably be GC'd anyway since we've killed all the
1608 // threads. It just avoids the GC having to do any work to
1609 // figure out that any remaining sparks are garbage.
1610 for (i = 0; i < n_capabilities; i++) {
1611 capabilities[i]->spark_stats.gcd +=
1612 sparkPoolSize(capabilities[i]->sparks);
1613 // No race here since all Caps are stopped.
1614 discardSparksCap(capabilities[i]);
1615 }
1616 #endif
1617 sched_state = SCHED_SHUTTING_DOWN;
1618 }
1619
1620 /*
1621 * When there are disabled capabilities, we want to migrate any
1622 * threads away from them. Normally this happens in the
1623 * scheduler's loop, but only for unbound threads - it's really
1624 * hard for a bound thread to migrate itself. So we have another
1625 * go here.
1626 */
1627 #if defined(THREADED_RTS)
1628 for (i = enabled_capabilities; i < n_capabilities; i++) {
1629 Capability *tmp_cap, *dest_cap;
1630 tmp_cap = capabilities[i];
1631 ASSERT(tmp_cap->disabled);
1632 if (i != cap->no) {
1633 dest_cap = capabilities[i % enabled_capabilities];
1634 while (!emptyRunQueue(tmp_cap)) {
1635 tso = popRunQueue(tmp_cap);
1636 migrateThread(tmp_cap, tso, dest_cap);
1637 if (tso->bound) {
1638 traceTaskMigrate(tso->bound->task,
1639 tso->bound->task->cap,
1640 dest_cap);
1641 tso->bound->task->cap = dest_cap;
1642 }
1643 }
1644 }
1645 }
1646 #endif
1647
1648 #if defined(THREADED_RTS)
1649 // reset pending_sync *before* GC, so that when the GC threads
1650 // emerge they don't immediately re-enter the GC.
1651 pending_sync = 0;
1652 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1653 #else
1654 GarbageCollect(collect_gen, heap_census, 0, cap);
1655 #endif
1656
1657 traceSparkCounters(cap);
1658
1659 switch (recent_activity) {
1660 case ACTIVITY_INACTIVE:
1661 if (force_major) {
1662 // We are doing a GC because the system has been idle for a
1663 // timeslice and we need to check for deadlock. Record the
1664 // fact that we've done a GC and turn off the timer signal;
1665 // it will get re-enabled if we run any threads after the GC.
1666 recent_activity = ACTIVITY_DONE_GC;
1667 #ifndef PROFILING
1668 stopTimer();
1669 #endif
1670 break;
1671 }
1672 // fall through...
1673
1674 case ACTIVITY_MAYBE_NO:
1675 // the GC might have taken long enough for the timer to set
1676 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1677 // but we aren't necessarily deadlocked:
1678 recent_activity = ACTIVITY_YES;
1679 break;
1680
1681 case ACTIVITY_DONE_GC:
1682 // If we are actually active, the scheduler will reset the
1683 // recent_activity flag and re-enable the timer.
1684 break;
1685 }
1686
1687 #if defined(THREADED_RTS)
1688 // Stable point where we can do a global check on our spark counters
1689 ASSERT(checkSparkCountInvariant());
1690 #endif
1691
1692 // The heap census itself is done during GarbageCollect().
1693 if (heap_census) {
1694 performHeapProfile = rtsFalse;
1695 }
1696
1697 #if defined(THREADED_RTS)
1698
1699 // If n_capabilities has changed during GC, we're in trouble.
1700 ASSERT(n_capabilities == old_n_capabilities);
1701
1702 if (gc_type == SYNC_GC_PAR)
1703 {
1704 releaseGCThreads(cap);
1705 for (i = 0; i < n_capabilities; i++) {
1706 if (i != cap->no) {
1707 if (idle_cap[i]) {
1708 ASSERT(capabilities[i]->running_task == task);
1709 task->cap = capabilities[i];
1710 releaseCapability(capabilities[i]);
1711 } else {
1712 ASSERT(capabilities[i]->running_task != task);
1713 }
1714 }
1715 }
1716 task->cap = cap;
1717 }
1718 #endif
1719
1720 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1721 // GC set the heap_overflow flag, so we should proceed with
1722 // an orderly shutdown now. Ultimately we want the main
1723 // thread to return to its caller with HeapExhausted, at which
1724 // point the caller should call hs_exit(). The first step is
1725 // to delete all the threads.
1726 //
1727 // Another way to do this would be to raise an exception in
1728 // the main thread, which we really should do because it gives
1729 // the program a chance to clean up. But how do we find the
1730 // main thread? It should presumably be the same one that
1731 // gets ^C exceptions, but that's all done on the Haskell side
1732 // (GHC.TopHandler).
1733 sched_state = SCHED_INTERRUPTING;
1734 goto delete_threads_and_gc;
1735 }
1736
1737 #ifdef SPARKBALANCE
1738 /* JB
1739 Once we are all together... this would be the place to balance all
1740 spark pools. No concurrent stealing or adding of new sparks can
1741 occur. Should be defined in Sparks.c. */
1742 balanceSparkPoolsCaps(n_capabilities, capabilities);
1743 #endif
1744
1745 #if defined(THREADED_RTS)
1746 if (gc_type == SYNC_GC_SEQ) {
1747 // release our stash of capabilities.
1748 releaseAllCapabilities(n_capabilities, cap, task);
1749 }
1750 #endif
1751
1752 return;
1753 }
1754
1755 /* ---------------------------------------------------------------------------
1756 * Singleton fork(). Do not copy any running threads.
1757 * ------------------------------------------------------------------------- */
1758
1759 pid_t
1760 forkProcess(HsStablePtr *entry
1761 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1762 STG_UNUSED
1763 #endif
1764 )
1765 {
1766 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1767 pid_t pid;
1768 StgTSO* t,*next;
1769 Capability *cap;
1770 nat g;
1771 Task *task = NULL;
1772 nat i;
1773 #ifdef THREADED_RTS
1774 nat sync;
1775 #endif
1776
1777 debugTrace(DEBUG_sched, "forking!");
1778
1779 task = newBoundTask();
1780
1781 cap = NULL;
1782 waitForCapability(&cap, task);
1783
1784 #ifdef THREADED_RTS
1785 do {
1786 sync = requestSync(&cap, task, SYNC_OTHER);
1787 } while (sync);
1788
1789 acquireAllCapabilities(cap,task);
1790
1791 pending_sync = 0;
1792 #endif
1793
1794 // no funny business: hold locks while we fork, otherwise if some
1795 // other thread is holding a lock when the fork happens, the data
1796 // structure protected by the lock will forever be in an
1797 // inconsistent state in the child. See also #1391.
1798 ACQUIRE_LOCK(&sched_mutex);
1799 ACQUIRE_LOCK(&sm_mutex);
1800 ACQUIRE_LOCK(&stable_mutex);
1801 ACQUIRE_LOCK(&task->lock);
1802
1803 for (i=0; i < n_capabilities; i++) {
1804 ACQUIRE_LOCK(&capabilities[i]->lock);
1805 }
1806
1807 #ifdef THREADED_RTS
1808 ACQUIRE_LOCK(&all_tasks_mutex);
1809 #endif
1810
1811 stopTimer(); // See #4074
1812
1813 #if defined(TRACING)
1814 flushEventLog(); // so that child won't inherit dirty file buffers
1815 #endif
1816
1817 pid = fork();
1818
1819 if (pid) { // parent
1820
1821 startTimer(); // #4074
1822
1823 RELEASE_LOCK(&sched_mutex);
1824 RELEASE_LOCK(&sm_mutex);
1825 RELEASE_LOCK(&stable_mutex);
1826 RELEASE_LOCK(&task->lock);
1827
1828 for (i=0; i < n_capabilities; i++) {
1829 releaseCapability_(capabilities[i],rtsFalse);
1830 RELEASE_LOCK(&capabilities[i]->lock);
1831 }
1832
1833 #ifdef THREADED_RTS
1834 RELEASE_LOCK(&all_tasks_mutex);
1835 #endif
1836
1837 boundTaskExiting(task);
1838
1839 // just return the pid
1840 return pid;
1841
1842 } else { // child
1843
1844 #if defined(THREADED_RTS)
1845 initMutex(&sched_mutex);
1846 initMutex(&sm_mutex);
1847 initMutex(&stable_mutex);
1848 initMutex(&task->lock);
1849
1850 for (i=0; i < n_capabilities; i++) {
1851 initMutex(&capabilities[i]->lock);
1852 }
1853
1854 initMutex(&all_tasks_mutex);
1855 #endif
1856
1857 #ifdef TRACING
1858 resetTracing();
1859 #endif
1860
1861 // Now, all OS threads except the thread that forked are
1862 // stopped. We need to stop all Haskell threads, including
1863 // those involved in foreign calls. Also we need to delete
1864 // all Tasks, because they correspond to OS threads that are
1865 // now gone.
1866
1867 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1868 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1869 next = t->global_link;
1870 // don't allow threads to catch the ThreadKilled
1871 // exception, but we do want to raiseAsync() because these
1872 // threads may be evaluating thunks that we need later.
1873 deleteThread_(t->cap,t);
1874
1875 // stop the GC from updating the InCall to point to
1876 // the TSO. This is only necessary because the
1877 // OSThread bound to the TSO has been killed, and
1878 // won't get a chance to exit in the usual way (see
1879 // also scheduleHandleThreadFinished).
1880 t->bound = NULL;
1881 }
1882 }
1883
1884 discardTasksExcept(task);
1885
1886 for (i=0; i < n_capabilities; i++) {
1887 cap = capabilities[i];
1888
1889 // Empty the run queue. It seems tempting to let all the
1890 // killed threads stay on the run queue as zombies to be
1891 // cleaned up later, but some of them may correspond to
1892 // bound threads for which the corresponding Task does not
1893 // exist.
1894 truncateRunQueue(cap);
1895
1896 // Any suspended C-calling Tasks are no more, their OS threads
1897 // don't exist now:
1898 cap->suspended_ccalls = NULL;
1899
1900 #if defined(THREADED_RTS)
1901 // Wipe our spare workers list, they no longer exist. New
1902 // workers will be created if necessary.
1903 cap->spare_workers = NULL;
1904 cap->n_spare_workers = 0;
1905 cap->returning_tasks_hd = NULL;
1906 cap->returning_tasks_tl = NULL;
1907 #endif
1908
1909 // Release all caps except 0, we'll use that for starting
1910 // the IO manager and running the client action below.
1911 if (cap->no != 0) {
1912 task->cap = cap;
1913 releaseCapability(cap);
1914 }
1915 }
1916 cap = capabilities[0];
1917 task->cap = cap;
1918
1919 // Empty the threads lists. Otherwise, the garbage
1920 // collector may attempt to resurrect some of these threads.
1921 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1922 generations[g].threads = END_TSO_QUEUE;
1923 }
1924
1925 // On Unix, all timers are reset in the child, so we need to start
1926 // the timer again.
1927 initTimer();
1928 startTimer();
1929
1930 // TODO: need to trace various other things in the child
1931 // like startup event, capabilities, process info etc
1932 traceTaskCreate(task, cap);
1933
1934 #if defined(THREADED_RTS)
1935 ioManagerStartCap(&cap);
1936 #endif
1937
1938 rts_evalStableIO(&cap, entry, NULL); // run the action
1939 rts_checkSchedStatus("forkProcess",cap);
1940
1941 rts_unlock(cap);
1942 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
1943 }
1944 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1945 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1946 #endif
1947 }
1948
1949 /* ---------------------------------------------------------------------------
1950 * Changing the number of Capabilities
1951 *
1952 * Changing the number of Capabilities is very tricky! We can only do
1953 * it with the system fully stopped, so we do a full sync with
1954 * requestSync(SYNC_OTHER) and grab all the capabilities.
1955 *
1956 * Then we resize the appropriate data structures, and update all
1957 * references to the old data structures which have now moved.
1958 * Finally we release the Capabilities we are holding, and start
1959 * worker Tasks on the new Capabilities we created.
1960 *
1961 * ------------------------------------------------------------------------- */
1962
1963 void
1964 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1965 {
1966 #if !defined(THREADED_RTS)
1967 if (new_n_capabilities != 1) {
1968 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1969 }
1970 return;
1971 #elif defined(NOSMP)
1972 if (new_n_capabilities != 1) {
1973 errorBelch("setNumCapabilities: not supported on this platform");
1974 }
1975 return;
1976 #else
1977 Task *task;
1978 Capability *cap;
1979 nat sync;
1980 nat n;
1981 Capability *old_capabilities = NULL;
1982 nat old_n_capabilities = n_capabilities;
1983
1984 if (new_n_capabilities == enabled_capabilities) return;
1985
1986 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1987 enabled_capabilities, new_n_capabilities);
1988
1989 cap = rts_lock();
1990 task = cap->running_task;
1991
1992 do {
1993 sync = requestSync(&cap, task, SYNC_OTHER);
1994 } while (sync);
1995
1996 acquireAllCapabilities(cap,task);
1997
1998 pending_sync = 0;
1999
2000 if (new_n_capabilities < enabled_capabilities)
2001 {
2002 // Reducing the number of capabilities: we do not actually
2003 // remove the extra capabilities, we just mark them as
2004 // "disabled". This has the following effects:
2005 //
2006 // - threads on a disabled capability are migrated away by the
2007 // scheduler loop
2008 //
2009 // - disabled capabilities do not participate in GC
2010 // (see scheduleDoGC())
2011 //
2012 // - No spark threads are created on this capability
2013 // (see scheduleActivateSpark())
2014 //
2015 // - We do not attempt to migrate threads *to* a disabled
2016 // capability (see schedulePushWork()).
2017 //
2018 // but in other respects, a disabled capability remains
2019 // alive. Threads may be woken up on a disabled capability,
2020 // but they will be immediately migrated away.
2021 //
2022 // This approach is much easier than trying to actually remove
2023 // the capability; we don't have to worry about GC data
2024 // structures, the nursery, etc.
2025 //
2026 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2027 capabilities[n]->disabled = rtsTrue;
2028 traceCapDisable(capabilities[n]);
2029 }
2030 enabled_capabilities = new_n_capabilities;
2031 }
2032 else
2033 {
2034 // Increasing the number of enabled capabilities.
2035 //
2036 // enable any disabled capabilities, up to the required number
2037 for (n = enabled_capabilities;
2038 n < new_n_capabilities && n < n_capabilities; n++) {
2039 capabilities[n]->disabled = rtsFalse;
2040 traceCapEnable(capabilities[n]);
2041 }
2042 enabled_capabilities = n;
2043
2044 if (new_n_capabilities > n_capabilities) {
2045 #if defined(TRACING)
2046 // Allocate eventlog buffers for the new capabilities. Note this
2047 // must be done before calling moreCapabilities(), because that
2048 // will emit events about creating the new capabilities and adding
2049 // them to existing capsets.
2050 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2051 #endif
2052
2053 // Resize the capabilities array
2054 // NB. after this, capabilities points somewhere new. Any pointers
2055 // of type (Capability *) are now invalid.
2056 moreCapabilities(n_capabilities, new_n_capabilities);
2057
2058 // Resize and update storage manager data structures
2059 storageAddCapabilities(n_capabilities, new_n_capabilities);
2060 }
2061 }
2062
2063 // update n_capabilities before things start running
2064 if (new_n_capabilities > n_capabilities) {
2065 n_capabilities = enabled_capabilities = new_n_capabilities;
2066 }
2067
2068 // Start worker tasks on the new Capabilities
2069 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2070
2071 // We're done: release the original Capabilities
2072 releaseAllCapabilities(old_n_capabilities, cap,task);
2073
2074 // We can't free the old array until now, because we access it
2075 // while updating pointers in updateCapabilityRefs().
2076 if (old_capabilities) {
2077 stgFree(old_capabilities);
2078 }
2079
2080 // Notify IO manager that the number of capabilities has changed.
2081 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2082
2083 rts_unlock(cap);
2084
2085 #endif // THREADED_RTS
2086 }
2087
2088
2089
2090 /* ---------------------------------------------------------------------------
2091 * Delete all the threads in the system
2092 * ------------------------------------------------------------------------- */
2093
2094 static void
2095 deleteAllThreads ( Capability *cap )
2096 {
2097 // NOTE: only safe to call if we own all capabilities.
2098
2099 StgTSO* t, *next;
2100 nat g;
2101
2102 debugTrace(DEBUG_sched,"deleting all threads");
2103 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2104 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2105 next = t->global_link;
2106 deleteThread(cap,t);
2107 }
2108 }
2109
2110 // The run queue now contains a bunch of ThreadKilled threads. We
2111 // must not throw these away: the main thread(s) will be in there
2112 // somewhere, and the main scheduler loop has to deal with it.
2113 // Also, the run queue is the only thing keeping these threads from
2114 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2115
2116 #if !defined(THREADED_RTS)
2117 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2118 ASSERT(sleeping_queue == END_TSO_QUEUE);
2119 #endif
2120 }
2121
2122 /* -----------------------------------------------------------------------------
2123 Managing the suspended_ccalls list.
2124 Locks required: sched_mutex
2125 -------------------------------------------------------------------------- */
2126
2127 STATIC_INLINE void
2128 suspendTask (Capability *cap, Task *task)
2129 {
2130 InCall *incall;
2131
2132 incall = task->incall;
2133 ASSERT(incall->next == NULL && incall->prev == NULL);
2134 incall->next = cap->suspended_ccalls;
2135 incall->prev = NULL;
2136 if (cap->suspended_ccalls) {
2137 cap->suspended_ccalls->prev = incall;
2138 }
2139 cap->suspended_ccalls = incall;
2140 }
2141
2142 STATIC_INLINE void
2143 recoverSuspendedTask (Capability *cap, Task *task)
2144 {
2145 InCall *incall;
2146
2147 incall = task->incall;
2148 if (incall->prev) {
2149 incall->prev->next = incall->next;
2150 } else {
2151 ASSERT(cap->suspended_ccalls == incall);
2152 cap->suspended_ccalls = incall->next;
2153 }
2154 if (incall->next) {
2155 incall->next->prev = incall->prev;
2156 }
2157 incall->next = incall->prev = NULL;
2158 }
2159
2160 /* ---------------------------------------------------------------------------
2161 * Suspending & resuming Haskell threads.
2162 *
2163 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2164 * its capability before calling the C function. This allows another
2165 * task to pick up the capability and carry on running Haskell
2166 * threads. It also means that if the C call blocks, it won't lock
2167 * the whole system.
2168 *
2169 * The Haskell thread making the C call is put to sleep for the
2170 * duration of the call, on the suspended_ccalling_threads queue. We
2171 * give out a token to the task, which it can use to resume the thread
2172 * on return from the C function.
2173 *
2174 * If this is an interruptible C call, this means that the FFI call may be
2175 * unceremoniously terminated and should be scheduled on an
2176 * unbound worker thread.
2177 * ------------------------------------------------------------------------- */
2178
2179 void *
2180 suspendThread (StgRegTable *reg, rtsBool interruptible)
2181 {
2182 Capability *cap;
2183 int saved_errno;
2184 StgTSO *tso;
2185 Task *task;
2186 #if mingw32_HOST_OS
2187 StgWord32 saved_winerror;
2188 #endif
2189
2190 saved_errno = errno;
2191 #if mingw32_HOST_OS
2192 saved_winerror = GetLastError();
2193 #endif
2194
2195 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2196 */
2197 cap = regTableToCapability(reg);
2198
2199 task = cap->running_task;
2200 tso = cap->r.rCurrentTSO;
2201
2202 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2203
2204 // XXX this might not be necessary --SDM
2205 tso->what_next = ThreadRunGHC;
2206
2207 threadPaused(cap,tso);
2208
2209 if (interruptible) {
2210 tso->why_blocked = BlockedOnCCall_Interruptible;
2211 } else {
2212 tso->why_blocked = BlockedOnCCall;
2213 }
2214
2215 // Hand back capability
2216 task->incall->suspended_tso = tso;
2217 task->incall->suspended_cap = cap;
2218
2219 // Otherwise allocate() will write to invalid memory.
2220 cap->r.rCurrentTSO = NULL;
2221
2222 ACQUIRE_LOCK(&cap->lock);
2223
2224 suspendTask(cap,task);
2225 cap->in_haskell = rtsFalse;
2226 releaseCapability_(cap,rtsFalse);
2227
2228 RELEASE_LOCK(&cap->lock);
2229
2230 errno = saved_errno;
2231 #if mingw32_HOST_OS
2232 SetLastError(saved_winerror);
2233 #endif
2234 return task;
2235 }
2236
2237 StgRegTable *
2238 resumeThread (void *task_)
2239 {
2240 StgTSO *tso;
2241 InCall *incall;
2242 Capability *cap;
2243 Task *task = task_;
2244 int saved_errno;
2245 #if mingw32_HOST_OS
2246 StgWord32 saved_winerror;
2247 #endif
2248
2249 saved_errno = errno;
2250 #if mingw32_HOST_OS
2251 saved_winerror = GetLastError();
2252 #endif
2253
2254 incall = task->incall;
2255 cap = incall->suspended_cap;
2256 task->cap = cap;
2257
2258 // Wait for permission to re-enter the RTS with the result.
2259 waitForCapability(&cap,task);
2260 // we might be on a different capability now... but if so, our
2261 // entry on the suspended_ccalls list will also have been
2262 // migrated.
2263
2264 // Remove the thread from the suspended list
2265 recoverSuspendedTask(cap,task);
2266
2267 tso = incall->suspended_tso;
2268 incall->suspended_tso = NULL;
2269 incall->suspended_cap = NULL;
2270 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2271
2272 traceEventRunThread(cap, tso);
2273
2274 /* Reset blocking status */
2275 tso->why_blocked = NotBlocked;
2276
2277 if ((tso->flags & TSO_BLOCKEX) == 0) {
2278 // avoid locking the TSO if we don't have to
2279 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2280 maybePerformBlockedException(cap,tso);
2281 }
2282 }
2283
2284 cap->r.rCurrentTSO = tso;
2285 cap->in_haskell = rtsTrue;
2286 errno = saved_errno;
2287 #if mingw32_HOST_OS
2288 SetLastError(saved_winerror);
2289 #endif
2290
2291 /* We might have GC'd, mark the TSO dirty again */
2292 dirty_TSO(cap,tso);
2293 dirty_STACK(cap,tso->stackobj);
2294
2295 IF_DEBUG(sanity, checkTSO(tso));
2296
2297 return &cap->r;
2298 }
2299
2300 /* ---------------------------------------------------------------------------
2301 * scheduleThread()
2302 *
2303 * scheduleThread puts a thread on the end of the runnable queue.
2304 * This will usually be done immediately after a thread is created.
2305 * The caller of scheduleThread must create the thread using e.g.
2306 * createThread and push an appropriate closure
2307 * on this thread's stack before the scheduler is invoked.
2308 * ------------------------------------------------------------------------ */
2309
2310 void
2311 scheduleThread(Capability *cap, StgTSO *tso)
2312 {
2313 // The thread goes at the *end* of the run-queue, to avoid possible
2314 // starvation of any threads already on the queue.
2315 appendToRunQueue(cap,tso);
2316 }
2317
2318 void
2319 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2320 {
2321 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2322 // move this thread from now on.
2323 #if defined(THREADED_RTS)
2324 cpu %= enabled_capabilities;
2325 if (cpu == cap->no) {
2326 appendToRunQueue(cap,tso);
2327 } else {
2328 migrateThread(cap, tso, capabilities[cpu]);
2329 }
2330 #else
2331 appendToRunQueue(cap,tso);
2332 #endif
2333 }
2334
2335 void
2336 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2337 {
2338 Task *task;
2339 DEBUG_ONLY( StgThreadID id );
2340 Capability *cap;
2341
2342 cap = *pcap;
2343
2344 // We already created/initialised the Task
2345 task = cap->running_task;
2346
2347 // This TSO is now a bound thread; make the Task and TSO
2348 // point to each other.
2349 tso->bound = task->incall;
2350 tso->cap = cap;
2351
2352 task->incall->tso = tso;
2353 task->incall->ret = ret;
2354 task->incall->rstat = NoStatus;
2355
2356 appendToRunQueue(cap,tso);
2357
2358 DEBUG_ONLY( id = tso->id );
2359 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2360
2361 cap = schedule(cap,task);
2362
2363 ASSERT(task->incall->rstat != NoStatus);
2364 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2365
2366 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2367 *pcap = cap;
2368 }
2369
2370 /* ----------------------------------------------------------------------------
2371 * Starting Tasks
2372 * ------------------------------------------------------------------------- */
2373
2374 #if defined(THREADED_RTS)
2375 void scheduleWorker (Capability *cap, Task *task)
2376 {
2377 // schedule() runs without a lock.
2378 cap = schedule(cap,task);
2379
2380 // On exit from schedule(), we have a Capability, but possibly not
2381 // the same one we started with.
2382
2383 // During shutdown, the requirement is that after all the
2384 // Capabilities are shut down, all workers that are shutting down
2385 // have finished workerTaskStop(). This is why we hold on to
2386 // cap->lock until we've finished workerTaskStop() below.
2387 //
2388 // There may be workers still involved in foreign calls; those
2389 // will just block in waitForCapability() because the
2390 // Capability has been shut down.
2391 //
2392 ACQUIRE_LOCK(&cap->lock);
2393 releaseCapability_(cap,rtsFalse);
2394 workerTaskStop(task);
2395 RELEASE_LOCK(&cap->lock);
2396 }
2397 #endif
2398
2399 /* ---------------------------------------------------------------------------
2400 * Start new worker tasks on Capabilities from--to
2401 * -------------------------------------------------------------------------- */
2402
2403 static void
2404 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2405 {
2406 #if defined(THREADED_RTS)
2407 nat i;
2408 Capability *cap;
2409
2410 for (i = from; i < to; i++) {
2411 cap = capabilities[i];
2412 ACQUIRE_LOCK(&cap->lock);
2413 startWorkerTask(cap);
2414 RELEASE_LOCK(&cap->lock);
2415 }
2416 #endif
2417 }
2418
2419 /* ---------------------------------------------------------------------------
2420 * initScheduler()
2421 *
2422 * Initialise the scheduler. This resets all the queues - if the
2423 * queues contained any threads, they'll be garbage collected at the
2424 * next pass.
2425 *
2426 * ------------------------------------------------------------------------ */
2427
2428 void
2429 initScheduler(void)
2430 {
2431 #if !defined(THREADED_RTS)
2432 blocked_queue_hd = END_TSO_QUEUE;
2433 blocked_queue_tl = END_TSO_QUEUE;
2434 sleeping_queue = END_TSO_QUEUE;
2435 #endif
2436
2437 sched_state = SCHED_RUNNING;
2438 recent_activity = ACTIVITY_YES;
2439
2440 #if defined(THREADED_RTS)
2441 /* Initialise the mutex and condition variables used by
2442 * the scheduler. */
2443 initMutex(&sched_mutex);
2444 #endif
2445
2446 ACQUIRE_LOCK(&sched_mutex);
2447
2448 /* A capability holds the state a native thread needs in
2449 * order to execute STG code. At least one capability is
2450 * floating around (only THREADED_RTS builds have more than one).
2451 */
2452 initCapabilities();
2453
2454 initTaskManager();
2455
2456 /*
2457 * Eagerly start one worker to run each Capability, except for
2458 * Capability 0. The idea is that we're probably going to start a
2459 * bound thread on Capability 0 pretty soon, so we don't want a
2460 * worker task hogging it.
2461 */
2462 startWorkerTasks(1, n_capabilities);
2463
2464 RELEASE_LOCK(&sched_mutex);
2465
2466 }
2467
2468 void
2469 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2470 /* see Capability.c, shutdownCapability() */
2471 {
2472 Task *task = NULL;
2473
2474 task = newBoundTask();
2475
2476 // If we haven't killed all the threads yet, do it now.
2477 if (sched_state < SCHED_SHUTTING_DOWN) {
2478 sched_state = SCHED_INTERRUPTING;
2479 Capability *cap = task->cap;
2480 waitForCapability(&cap,task);
2481 scheduleDoGC(&cap,task,rtsTrue);
2482 ASSERT(task->incall->tso == NULL);
2483 releaseCapability(cap);
2484 }
2485 sched_state = SCHED_SHUTTING_DOWN;
2486
2487 shutdownCapabilities(task, wait_foreign);
2488
2489 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2490 // n_failed_trygrab_idles, n_idle_caps);
2491
2492 boundTaskExiting(task);
2493 }
2494
2495 void
2496 freeScheduler( void )
2497 {
2498 nat still_running;
2499
2500 ACQUIRE_LOCK(&sched_mutex);
2501 still_running = freeTaskManager();
2502 // We can only free the Capabilities if there are no Tasks still
2503 // running. We might have a Task about to return from a foreign
2504 // call into waitForCapability(), for example (actually,
2505 // this should be the *only* thing that a still-running Task can
2506 // do at this point, and it will block waiting for the
2507 // Capability).
2508 if (still_running == 0) {
2509 freeCapabilities();
2510 }
2511 RELEASE_LOCK(&sched_mutex);
2512 #if defined(THREADED_RTS)
2513 closeMutex(&sched_mutex);
2514 #endif
2515 }
2516
2517 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2518 void *user USED_IF_NOT_THREADS)
2519 {
2520 #if !defined(THREADED_RTS)
2521 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2522 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2523 evac(user, (StgClosure **)(void *)&sleeping_queue);
2524 #endif
2525 }
2526
2527 /* -----------------------------------------------------------------------------
2528 performGC
2529
2530 This is the interface to the garbage collector from Haskell land.
2531 We provide this so that external C code can allocate and garbage
2532 collect when called from Haskell via _ccall_GC.
2533 -------------------------------------------------------------------------- */
2534
2535 static void
2536 performGC_(rtsBool force_major)
2537 {
2538 Task *task;
2539 Capability *cap = NULL;
2540
2541 // We must grab a new Task here, because the existing Task may be
2542 // associated with a particular Capability, and chained onto the
2543 // suspended_ccalls queue.
2544 task = newBoundTask();
2545
2546 // TODO: do we need to traceTask*() here?
2547
2548 waitForCapability(&cap,task);
2549 scheduleDoGC(&cap,task,force_major);
2550 releaseCapability(cap);
2551 boundTaskExiting(task);
2552 }
2553
2554 void
2555 performGC(void)
2556 {
2557 performGC_(rtsFalse);
2558 }
2559
2560 void
2561 performMajorGC(void)
2562 {
2563 performGC_(rtsTrue);
2564 }
2565
2566 /* ---------------------------------------------------------------------------
2567 Interrupt execution
2568 - usually called inside a signal handler so it mustn't do anything fancy.
2569 ------------------------------------------------------------------------ */
2570
2571 void
2572 interruptStgRts(void)
2573 {
2574 sched_state = SCHED_INTERRUPTING;
2575 interruptAllCapabilities();
2576 #if defined(THREADED_RTS)
2577 wakeUpRts();
2578 #endif
2579 }
2580
2581 /* -----------------------------------------------------------------------------
2582 Wake up the RTS
2583
2584 This function causes at least one OS thread to wake up and run the
2585 scheduler loop. It is invoked when the RTS might be deadlocked, or
2586 an external event has arrived that may need servicing (eg. a
2587 keyboard interrupt).
2588
2589 In the single-threaded RTS we don't do anything here; we only have
2590 one thread anyway, and the event that caused us to want to wake up
2591 will have interrupted any blocking system call in progress anyway.
2592 -------------------------------------------------------------------------- */
2593
2594 #if defined(THREADED_RTS)
2595 void wakeUpRts(void)
2596 {
2597 // This forces the IO Manager thread to wakeup, which will
2598 // in turn ensure that some OS thread wakes up and runs the
2599 // scheduler loop, which will cause a GC and deadlock check.
2600 ioManagerWakeup();
2601 }
2602 #endif
2603
2604 /* -----------------------------------------------------------------------------
2605 Deleting threads
2606
2607 This is used for interruption (^C) and forking, and corresponds to
2608 raising an exception but without letting the thread catch the
2609 exception.
2610 -------------------------------------------------------------------------- */
2611
2612 static void
2613 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2614 {
2615 // NOTE: must only be called on a TSO that we have exclusive
2616 // access to, because we will call throwToSingleThreaded() below.
2617 // The TSO must be on the run queue of the Capability we own, or
2618 // we must own all Capabilities.
2619
2620 if (tso->why_blocked != BlockedOnCCall &&
2621 tso->why_blocked != BlockedOnCCall_Interruptible) {
2622 throwToSingleThreaded(tso->cap,tso,NULL);
2623 }
2624 }
2625
2626 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2627 static void
2628 deleteThread_(Capability *cap, StgTSO *tso)
2629 { // for forkProcess only:
2630 // like deleteThread(), but we delete threads in foreign calls, too.
2631
2632 if (tso->why_blocked == BlockedOnCCall ||
2633 tso->why_blocked == BlockedOnCCall_Interruptible) {
2634 tso->what_next = ThreadKilled;
2635 appendToRunQueue(tso->cap, tso);
2636 } else {
2637 deleteThread(cap,tso);
2638 }
2639 }
2640 #endif
2641
2642 /* -----------------------------------------------------------------------------
2643 raiseExceptionHelper
2644
2645 This function is called by the raise# primitve, just so that we can
2646 move some of the tricky bits of raising an exception from C-- into
2647 C. Who knows, it might be a useful re-useable thing here too.
2648 -------------------------------------------------------------------------- */
2649
2650 StgWord
2651 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2652 {
2653 Capability *cap = regTableToCapability(reg);
2654 StgThunk *raise_closure = NULL;
2655 StgPtr p, next;
2656 StgRetInfoTable *info;
2657 //
2658 // This closure represents the expression 'raise# E' where E
2659 // is the exception raise. It is used to overwrite all the
2660 // thunks which are currently under evaluataion.
2661 //
2662
2663 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2664 // LDV profiling: stg_raise_info has THUNK as its closure
2665 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2666 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2667 // 1 does not cause any problem unless profiling is performed.
2668 // However, when LDV profiling goes on, we need to linearly scan
2669 // small object pool, where raise_closure is stored, so we should
2670 // use MIN_UPD_SIZE.
2671 //
2672 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2673 // sizeofW(StgClosure)+1);
2674 //
2675
2676 //
2677 // Walk up the stack, looking for the catch frame. On the way,
2678 // we update any closures pointed to from update frames with the
2679 // raise closure that we just built.
2680 //
2681 p = tso->stackobj->sp;
2682 while(1) {
2683 info = get_ret_itbl((StgClosure *)p);
2684 next = p + stack_frame_sizeW((StgClosure *)p);
2685 switch (info->i.type) {
2686
2687 case UPDATE_FRAME:
2688 // Only create raise_closure if we need to.
2689 if (raise_closure == NULL) {
2690 raise_closure =
2691 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2692 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2693 raise_closure->payload[0] = exception;
2694 }
2695 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2696 (StgClosure *)raise_closure);
2697 p = next;
2698 continue;
2699
2700 case ATOMICALLY_FRAME:
2701 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2702 tso->stackobj->sp = p;
2703 return ATOMICALLY_FRAME;
2704
2705 case CATCH_FRAME:
2706 tso->stackobj->sp = p;
2707 return CATCH_FRAME;
2708
2709 case CATCH_STM_FRAME:
2710 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2711 tso->stackobj->sp = p;
2712 return CATCH_STM_FRAME;
2713
2714 case UNDERFLOW_FRAME:
2715 tso->stackobj->sp = p;
2716 threadStackUnderflow(cap,tso);
2717 p = tso->stackobj->sp;
2718 continue;
2719
2720 case STOP_FRAME:
2721 tso->stackobj->sp = p;
2722 return STOP_FRAME;
2723
2724 case CATCH_RETRY_FRAME: {
2725 StgTRecHeader *trec = tso -> trec;
2726 StgTRecHeader *outer = trec -> enclosing_trec;
2727 debugTrace(DEBUG_stm,
2728 "found CATCH_RETRY_FRAME at %p during raise", p);
2729 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2730 stmAbortTransaction(cap, trec);
2731 stmFreeAbortedTRec(cap, trec);
2732 tso -> trec = outer;
2733 p = next;
2734 continue;
2735 }
2736
2737 default:
2738 p = next;
2739 continue;
2740 }
2741 }
2742 }
2743
2744
2745 /* -----------------------------------------------------------------------------
2746 findRetryFrameHelper
2747
2748 This function is called by the retry# primitive. It traverses the stack
2749 leaving tso->sp referring to the frame which should handle the retry.
2750
2751 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2752 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2753
2754 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2755 create) because retries are not considered to be exceptions, despite the
2756 similar implementation.
2757
2758 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2759 not be created within memory transactions.
2760 -------------------------------------------------------------------------- */
2761
2762 StgWord
2763 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2764 {
2765 StgPtr p, next;
2766 StgRetInfoTable *info;
2767
2768 p = tso->stackobj->sp;
2769 while (1) {
2770 info = get_ret_itbl((StgClosure *)p);
2771 next = p + stack_frame_sizeW((StgClosure *)p);
2772 switch (info->i.type) {
2773
2774 case ATOMICALLY_FRAME:
2775 debugTrace(DEBUG_stm,
2776 "found ATOMICALLY_FRAME at %p during retry", p);
2777 tso->stackobj->sp = p;
2778 return ATOMICALLY_FRAME;
2779
2780 case CATCH_RETRY_FRAME:
2781 debugTrace(DEBUG_stm,
2782 "found CATCH_RETRY_FRAME at %p during retry", p);
2783 tso->stackobj->sp = p;
2784 return CATCH_RETRY_FRAME;
2785
2786 case CATCH_STM_FRAME: {
2787 StgTRecHeader *trec = tso -> trec;
2788 StgTRecHeader *outer = trec -> enclosing_trec;
2789 debugTrace(DEBUG_stm,
2790 "found CATCH_STM_FRAME at %p during retry", p);
2791 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2792 stmAbortTransaction(cap, trec);
2793 stmFreeAbortedTRec(cap, trec);
2794 tso -> trec = outer;
2795 p = next;
2796 continue;
2797 }
2798
2799 case UNDERFLOW_FRAME:
2800 tso->stackobj->sp = p;
2801 threadStackUnderflow(cap,tso);
2802 p = tso->stackobj->sp;
2803 continue;
2804
2805 default:
2806 ASSERT(info->i.type != CATCH_FRAME);
2807 ASSERT(info->i.type != STOP_FRAME);
2808 p = next;
2809 continue;
2810 }
2811 }
2812 }
2813
2814 /* -----------------------------------------------------------------------------
2815 resurrectThreads is called after garbage collection on the list of
2816 threads found to be garbage. Each of these threads will be woken
2817 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2818 on an MVar, or NonTermination if the thread was blocked on a Black
2819 Hole.
2820
2821 Locks: assumes we hold *all* the capabilities.
2822 -------------------------------------------------------------------------- */
2823
2824 void
2825 resurrectThreads (StgTSO *threads)
2826 {
2827 StgTSO *tso, *next;
2828 Capability *cap;
2829 generation *gen;
2830
2831 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2832 next = tso->global_link;
2833
2834 gen = Bdescr((P_)tso)->gen;
2835 tso->global_link = gen->threads;
2836 gen->threads = tso;
2837
2838 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2839
2840 // Wake up the thread on the Capability it was last on
2841 cap = tso->cap;
2842
2843 switch (tso->why_blocked) {
2844 case BlockedOnMVar:
2845 case BlockedOnMVarRead:
2846 /* Called by GC - sched_mutex lock is currently held. */
2847 throwToSingleThreaded(cap, tso,
2848 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2849 break;
2850 case BlockedOnBlackHole:
2851 throwToSingleThreaded(cap, tso,
2852 (StgClosure *)nonTermination_closure);
2853 break;
2854 case BlockedOnSTM:
2855 throwToSingleThreaded(cap, tso,
2856 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2857 break;
2858 case NotBlocked:
2859 /* This might happen if the thread was blocked on a black hole
2860 * belonging to a thread that we've just woken up (raiseAsync
2861 * can wake up threads, remember...).
2862 */
2863 continue;
2864 case BlockedOnMsgThrowTo:
2865 // This can happen if the target is masking, blocks on a
2866 // black hole, and then is found to be unreachable. In
2867 // this case, we want to let the target wake up and carry
2868 // on, and do nothing to this thread.
2869 continue;
2870 default:
2871 barf("resurrectThreads: thread blocked in a strange way: %d",
2872 tso->why_blocked);
2873 }
2874 }
2875 }