Fix segfault with STM; fixes #8035. Patch from errge.
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsFalse);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. t might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // And save the current errno in this thread.
485 // XXX: possibly bogus for SMP because this thread might already
486 // be running again, see code below.
487 t->saved_errno = errno;
488 #if mingw32_HOST_OS
489 // Similarly for Windows error code
490 t->saved_winerror = GetLastError();
491 #endif
492
493 if (ret == ThreadBlocked) {
494 if (t->why_blocked == BlockedOnBlackHole) {
495 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
496 traceEventStopThread(cap, t, t->why_blocked + 6,
497 owner != NULL ? owner->id : 0);
498 } else {
499 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
500 }
501 } else {
502 traceEventStopThread(cap, t, ret, 0);
503 }
504
505 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
506 ASSERT(t->cap == cap);
507
508 // ----------------------------------------------------------------------
509
510 // Costs for the scheduler are assigned to CCS_SYSTEM
511 stopHeapProfTimer();
512 #if defined(PROFILING)
513 cap->r.rCCCS = CCS_SYSTEM;
514 #endif
515
516 schedulePostRunThread(cap,t);
517
518 ready_to_gc = rtsFalse;
519
520 switch (ret) {
521 case HeapOverflow:
522 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
523 break;
524
525 case StackOverflow:
526 // just adjust the stack for this thread, then pop it back
527 // on the run queue.
528 threadStackOverflow(cap, t);
529 pushOnRunQueue(cap,t);
530 break;
531
532 case ThreadYielding:
533 if (scheduleHandleYield(cap, t, prev_what_next)) {
534 // shortcut for switching between compiler/interpreter:
535 goto run_thread;
536 }
537 break;
538
539 case ThreadBlocked:
540 scheduleHandleThreadBlocked(t);
541 break;
542
543 case ThreadFinished:
544 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
545 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
546 break;
547
548 default:
549 barf("schedule: invalid thread return code %d", (int)ret);
550 }
551
552 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
553 scheduleDoGC(&cap,task,rtsFalse);
554 }
555 } /* end of while() */
556 }
557
558 /* -----------------------------------------------------------------------------
559 * Run queue operations
560 * -------------------------------------------------------------------------- */
561
562 void
563 removeFromRunQueue (Capability *cap, StgTSO *tso)
564 {
565 if (tso->block_info.prev == END_TSO_QUEUE) {
566 ASSERT(cap->run_queue_hd == tso);
567 cap->run_queue_hd = tso->_link;
568 } else {
569 setTSOLink(cap, tso->block_info.prev, tso->_link);
570 }
571 if (tso->_link == END_TSO_QUEUE) {
572 ASSERT(cap->run_queue_tl == tso);
573 cap->run_queue_tl = tso->block_info.prev;
574 } else {
575 setTSOPrev(cap, tso->_link, tso->block_info.prev);
576 }
577 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->returning_tasks_hd != NULL ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 nat i, n_free_caps;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Check whether we have more threads on our run queue, or sparks
709 // in our pool, that we could hand to another Capability.
710 if (emptyRunQueue(cap)) {
711 if (sparkPoolSizeCap(cap) < 2) return;
712 } else {
713 if (singletonRunQueue(cap) &&
714 sparkPoolSizeCap(cap) < 1) return;
715 }
716
717 // First grab as many free Capabilities as we can.
718 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
719 cap0 = &capabilities[i];
720 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
721 if (!emptyRunQueue(cap0)
722 || cap0->returning_tasks_hd != NULL
723 || cap0->inbox != (Message*)END_TSO_QUEUE) {
724 // it already has some work, we just grabbed it at
725 // the wrong moment. Or maybe it's deadlocked!
726 releaseCapability(cap0);
727 } else {
728 free_caps[n_free_caps++] = cap0;
729 }
730 }
731 }
732
733 // we now have n_free_caps free capabilities stashed in
734 // free_caps[]. Share our run queue equally with them. This is
735 // probably the simplest thing we could do; improvements we might
736 // want to do include:
737 //
738 // - giving high priority to moving relatively new threads, on
739 // the gournds that they haven't had time to build up a
740 // working set in the cache on this CPU/Capability.
741 //
742 // - giving low priority to moving long-lived threads
743
744 if (n_free_caps > 0) {
745 StgTSO *prev, *t, *next;
746 #ifdef SPARK_PUSHING
747 rtsBool pushed_to_all;
748 #endif
749
750 debugTrace(DEBUG_sched,
751 "cap %d: %s and %d free capabilities, sharing...",
752 cap->no,
753 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
754 "excess threads on run queue":"sparks to share (>=2)",
755 n_free_caps);
756
757 i = 0;
758 #ifdef SPARK_PUSHING
759 pushed_to_all = rtsFalse;
760 #endif
761
762 if (cap->run_queue_hd != END_TSO_QUEUE) {
763 prev = cap->run_queue_hd;
764 t = prev->_link;
765 prev->_link = END_TSO_QUEUE;
766 for (; t != END_TSO_QUEUE; t = next) {
767 next = t->_link;
768 t->_link = END_TSO_QUEUE;
769 if (t->bound == task->incall // don't move my bound thread
770 || tsoLocked(t)) { // don't move a locked thread
771 setTSOLink(cap, prev, t);
772 setTSOPrev(cap, t, prev);
773 prev = t;
774 } else if (i == n_free_caps) {
775 #ifdef SPARK_PUSHING
776 pushed_to_all = rtsTrue;
777 #endif
778 i = 0;
779 // keep one for us
780 setTSOLink(cap, prev, t);
781 setTSOPrev(cap, t, prev);
782 prev = t;
783 } else {
784 appendToRunQueue(free_caps[i],t);
785
786 traceEventMigrateThread (cap, t, free_caps[i]->no);
787
788 if (t->bound) { t->bound->task->cap = free_caps[i]; }
789 t->cap = free_caps[i];
790 i++;
791 }
792 }
793 cap->run_queue_tl = prev;
794
795 IF_DEBUG(sanity, checkRunQueue(cap));
796 }
797
798 #ifdef SPARK_PUSHING
799 /* JB I left this code in place, it would work but is not necessary */
800
801 // If there are some free capabilities that we didn't push any
802 // threads to, then try to push a spark to each one.
803 if (!pushed_to_all) {
804 StgClosure *spark;
805 // i is the next free capability to push to
806 for (; i < n_free_caps; i++) {
807 if (emptySparkPoolCap(free_caps[i])) {
808 spark = tryStealSpark(cap->sparks);
809 if (spark != NULL) {
810 /* TODO: if anyone wants to re-enable this code then
811 * they must consider the fizzledSpark(spark) case
812 * and update the per-cap spark statistics.
813 */
814 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
815
816 traceEventStealSpark(free_caps[i], t, cap->no);
817
818 newSpark(&(free_caps[i]->r), spark);
819 }
820 }
821 }
822 }
823 #endif /* SPARK_PUSHING */
824
825 // release the capabilities
826 for (i = 0; i < n_free_caps; i++) {
827 task->cap = free_caps[i];
828 releaseAndWakeupCapability(free_caps[i]);
829 }
830 }
831 task->cap = cap; // reset to point to our Capability.
832
833 #endif /* THREADED_RTS */
834
835 }
836
837 /* ----------------------------------------------------------------------------
838 * Start any pending signal handlers
839 * ------------------------------------------------------------------------- */
840
841 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
842 static void
843 scheduleStartSignalHandlers(Capability *cap)
844 {
845 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
846 // safe outside the lock
847 startSignalHandlers(cap);
848 }
849 }
850 #else
851 static void
852 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
853 {
854 }
855 #endif
856
857 /* ----------------------------------------------------------------------------
858 * Check for blocked threads that can be woken up.
859 * ------------------------------------------------------------------------- */
860
861 static void
862 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
863 {
864 #if !defined(THREADED_RTS)
865 //
866 // Check whether any waiting threads need to be woken up. If the
867 // run queue is empty, and there are no other tasks running, we
868 // can wait indefinitely for something to happen.
869 //
870 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
871 {
872 awaitEvent (emptyRunQueue(cap));
873 }
874 #endif
875 }
876
877 /* ----------------------------------------------------------------------------
878 * Detect deadlock conditions and attempt to resolve them.
879 * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock (Capability **pcap, Task *task)
883 {
884 Capability *cap = *pcap;
885 /*
886 * Detect deadlock: when we have no threads to run, there are no
887 * threads blocked, waiting for I/O, or sleeping, and all the
888 * other tasks are waiting for work, we must have a deadlock of
889 * some description.
890 */
891 if ( emptyThreadQueues(cap) )
892 {
893 #if defined(THREADED_RTS)
894 /*
895 * In the threaded RTS, we only check for deadlock if there
896 * has been no activity in a complete timeslice. This means
897 * we won't eagerly start a full GC just because we don't have
898 * any threads to run currently.
899 */
900 if (recent_activity != ACTIVITY_INACTIVE) return;
901 #endif
902
903 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
904
905 // Garbage collection can release some new threads due to
906 // either (a) finalizers or (b) threads resurrected because
907 // they are unreachable and will therefore be sent an
908 // exception. Any threads thus released will be immediately
909 // runnable.
910 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
911 cap = *pcap;
912 // when force_major == rtsTrue. scheduleDoGC sets
913 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
914 // signal.
915
916 if ( !emptyRunQueue(cap) ) return;
917
918 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
919 /* If we have user-installed signal handlers, then wait
920 * for signals to arrive rather then bombing out with a
921 * deadlock.
922 */
923 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
924 debugTrace(DEBUG_sched,
925 "still deadlocked, waiting for signals...");
926
927 awaitUserSignals();
928
929 if (signals_pending()) {
930 startSignalHandlers(cap);
931 }
932
933 // either we have threads to run, or we were interrupted:
934 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
935
936 return;
937 }
938 #endif
939
940 #if !defined(THREADED_RTS)
941 /* Probably a real deadlock. Send the current main thread the
942 * Deadlock exception.
943 */
944 if (task->incall->tso) {
945 switch (task->incall->tso->why_blocked) {
946 case BlockedOnSTM:
947 case BlockedOnBlackHole:
948 case BlockedOnMsgThrowTo:
949 case BlockedOnMVar:
950 throwToSingleThreaded(cap, task->incall->tso,
951 (StgClosure *)nonTermination_closure);
952 return;
953 default:
954 barf("deadlock: main thread blocked in a strange way");
955 }
956 }
957 return;
958 #endif
959 }
960 }
961
962
963 /* ----------------------------------------------------------------------------
964 * Send pending messages (PARALLEL_HASKELL only)
965 * ------------------------------------------------------------------------- */
966
967 #if defined(PARALLEL_HASKELL)
968 static void
969 scheduleSendPendingMessages(void)
970 {
971
972 # if defined(PAR) // global Mem.Mgmt., omit for now
973 if (PendingFetches != END_BF_QUEUE) {
974 processFetches();
975 }
976 # endif
977
978 if (RtsFlags.ParFlags.BufferTime) {
979 // if we use message buffering, we must send away all message
980 // packets which have become too old...
981 sendOldBuffers();
982 }
983 }
984 #endif
985
986 /* ----------------------------------------------------------------------------
987 * Process message in the current Capability's inbox
988 * ------------------------------------------------------------------------- */
989
990 static void
991 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
992 {
993 #if defined(THREADED_RTS)
994 Message *m, *next;
995 int r;
996 Capability *cap = *pcap;
997
998 while (!emptyInbox(cap)) {
999 if (cap->r.rCurrentNursery->link == NULL ||
1000 g0->n_new_large_words >= large_alloc_lim) {
1001 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1002 cap = *pcap;
1003 }
1004
1005 // don't use a blocking acquire; if the lock is held by
1006 // another thread then just carry on. This seems to avoid
1007 // getting stuck in a message ping-pong situation with other
1008 // processors. We'll check the inbox again later anyway.
1009 //
1010 // We should really use a more efficient queue data structure
1011 // here. The trickiness is that we must ensure a Capability
1012 // never goes idle if the inbox is non-empty, which is why we
1013 // use cap->lock (cap->lock is released as the last thing
1014 // before going idle; see Capability.c:releaseCapability()).
1015 r = TRY_ACQUIRE_LOCK(&cap->lock);
1016 if (r != 0) return;
1017
1018 m = cap->inbox;
1019 cap->inbox = (Message*)END_TSO_QUEUE;
1020
1021 RELEASE_LOCK(&cap->lock);
1022
1023 while (m != (Message*)END_TSO_QUEUE) {
1024 next = m->link;
1025 executeMessage(cap, m);
1026 m = next;
1027 }
1028 }
1029 #endif
1030 }
1031
1032 /* ----------------------------------------------------------------------------
1033 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1034 * ------------------------------------------------------------------------- */
1035
1036 #if defined(THREADED_RTS)
1037 static void
1038 scheduleActivateSpark(Capability *cap)
1039 {
1040 if (anySparks() && !cap->disabled)
1041 {
1042 createSparkThread(cap);
1043 debugTrace(DEBUG_sched, "creating a spark thread");
1044 }
1045 }
1046 #endif // PARALLEL_HASKELL || THREADED_RTS
1047
1048 /* ----------------------------------------------------------------------------
1049 * After running a thread...
1050 * ------------------------------------------------------------------------- */
1051
1052 static void
1053 schedulePostRunThread (Capability *cap, StgTSO *t)
1054 {
1055 // We have to be able to catch transactions that are in an
1056 // infinite loop as a result of seeing an inconsistent view of
1057 // memory, e.g.
1058 //
1059 // atomically $ do
1060 // [a,b] <- mapM readTVar [ta,tb]
1061 // when (a == b) loop
1062 //
1063 // and a is never equal to b given a consistent view of memory.
1064 //
1065 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1066 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1067 debugTrace(DEBUG_sched | DEBUG_stm,
1068 "trec %p found wasting its time", t);
1069
1070 // strip the stack back to the
1071 // ATOMICALLY_FRAME, aborting the (nested)
1072 // transaction, and saving the stack of any
1073 // partially-evaluated thunks on the heap.
1074 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1075
1076 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1077 }
1078 }
1079
1080 /* some statistics gathering in the parallel case */
1081 }
1082
1083 /* -----------------------------------------------------------------------------
1084 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1085 * -------------------------------------------------------------------------- */
1086
1087 static rtsBool
1088 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1089 {
1090 // did the task ask for a large block?
1091 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1092 // if so, get one and push it on the front of the nursery.
1093 bdescr *bd;
1094 W_ blocks;
1095
1096 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1097
1098 if (blocks > BLOCKS_PER_MBLOCK) {
1099 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1100 }
1101
1102 debugTrace(DEBUG_sched,
1103 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1104 (long)t->id, what_next_strs[t->what_next], blocks);
1105
1106 // don't do this if the nursery is (nearly) full, we'll GC first.
1107 if (cap->r.rCurrentNursery->link != NULL ||
1108 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1109 // if the nursery has only one block.
1110
1111 bd = allocGroup_lock(blocks);
1112 cap->r.rNursery->n_blocks += blocks;
1113
1114 // link the new group into the list
1115 bd->link = cap->r.rCurrentNursery;
1116 bd->u.back = cap->r.rCurrentNursery->u.back;
1117 if (cap->r.rCurrentNursery->u.back != NULL) {
1118 cap->r.rCurrentNursery->u.back->link = bd;
1119 } else {
1120 cap->r.rNursery->blocks = bd;
1121 }
1122 cap->r.rCurrentNursery->u.back = bd;
1123
1124 // initialise it as a nursery block. We initialise the
1125 // step, gen_no, and flags field of *every* sub-block in
1126 // this large block, because this is easier than making
1127 // sure that we always find the block head of a large
1128 // block whenever we call Bdescr() (eg. evacuate() and
1129 // isAlive() in the GC would both have to do this, at
1130 // least).
1131 {
1132 bdescr *x;
1133 for (x = bd; x < bd + blocks; x++) {
1134 initBdescr(x,g0,g0);
1135 x->free = x->start;
1136 x->flags = 0;
1137 }
1138 }
1139
1140 // This assert can be a killer if the app is doing lots
1141 // of large block allocations.
1142 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1143
1144 // now update the nursery to point to the new block
1145 cap->r.rCurrentNursery = bd;
1146
1147 // we might be unlucky and have another thread get on the
1148 // run queue before us and steal the large block, but in that
1149 // case the thread will just end up requesting another large
1150 // block.
1151 pushOnRunQueue(cap,t);
1152 return rtsFalse; /* not actually GC'ing */
1153 }
1154 }
1155
1156 if (cap->r.rHpLim == NULL || cap->context_switch) {
1157 // Sometimes we miss a context switch, e.g. when calling
1158 // primitives in a tight loop, MAYBE_GC() doesn't check the
1159 // context switch flag, and we end up waiting for a GC.
1160 // See #1984, and concurrent/should_run/1984
1161 cap->context_switch = 0;
1162 appendToRunQueue(cap,t);
1163 } else {
1164 pushOnRunQueue(cap,t);
1165 }
1166 return rtsTrue;
1167 /* actual GC is done at the end of the while loop in schedule() */
1168 }
1169
1170 /* -----------------------------------------------------------------------------
1171 * Handle a thread that returned to the scheduler with ThreadYielding
1172 * -------------------------------------------------------------------------- */
1173
1174 static rtsBool
1175 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1176 {
1177 /* put the thread back on the run queue. Then, if we're ready to
1178 * GC, check whether this is the last task to stop. If so, wake
1179 * up the GC thread. getThread will block during a GC until the
1180 * GC is finished.
1181 */
1182
1183 ASSERT(t->_link == END_TSO_QUEUE);
1184
1185 // Shortcut if we're just switching evaluators: don't bother
1186 // doing stack squeezing (which can be expensive), just run the
1187 // thread.
1188 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1189 debugTrace(DEBUG_sched,
1190 "--<< thread %ld (%s) stopped to switch evaluators",
1191 (long)t->id, what_next_strs[t->what_next]);
1192 return rtsTrue;
1193 }
1194
1195 // Reset the context switch flag. We don't do this just before
1196 // running the thread, because that would mean we would lose ticks
1197 // during GC, which can lead to unfair scheduling (a thread hogs
1198 // the CPU because the tick always arrives during GC). This way
1199 // penalises threads that do a lot of allocation, but that seems
1200 // better than the alternative.
1201 if (cap->context_switch != 0) {
1202 cap->context_switch = 0;
1203 appendToRunQueue(cap,t);
1204 } else {
1205 pushOnRunQueue(cap,t);
1206 }
1207
1208 IF_DEBUG(sanity,
1209 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1210 checkTSO(t));
1211
1212 return rtsFalse;
1213 }
1214
1215 /* -----------------------------------------------------------------------------
1216 * Handle a thread that returned to the scheduler with ThreadBlocked
1217 * -------------------------------------------------------------------------- */
1218
1219 static void
1220 scheduleHandleThreadBlocked( StgTSO *t
1221 #if !defined(DEBUG)
1222 STG_UNUSED
1223 #endif
1224 )
1225 {
1226
1227 // We don't need to do anything. The thread is blocked, and it
1228 // has tidied up its stack and placed itself on whatever queue
1229 // it needs to be on.
1230
1231 // ASSERT(t->why_blocked != NotBlocked);
1232 // Not true: for example,
1233 // - the thread may have woken itself up already, because
1234 // threadPaused() might have raised a blocked throwTo
1235 // exception, see maybePerformBlockedException().
1236
1237 #ifdef DEBUG
1238 traceThreadStatus(DEBUG_sched, t);
1239 #endif
1240 }
1241
1242 /* -----------------------------------------------------------------------------
1243 * Handle a thread that returned to the scheduler with ThreadFinished
1244 * -------------------------------------------------------------------------- */
1245
1246 static rtsBool
1247 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1248 {
1249 /* Need to check whether this was a main thread, and if so,
1250 * return with the return value.
1251 *
1252 * We also end up here if the thread kills itself with an
1253 * uncaught exception, see Exception.cmm.
1254 */
1255
1256 // blocked exceptions can now complete, even if the thread was in
1257 // blocked mode (see #2910).
1258 awakenBlockedExceptionQueue (cap, t);
1259
1260 //
1261 // Check whether the thread that just completed was a bound
1262 // thread, and if so return with the result.
1263 //
1264 // There is an assumption here that all thread completion goes
1265 // through this point; we need to make sure that if a thread
1266 // ends up in the ThreadKilled state, that it stays on the run
1267 // queue so it can be dealt with here.
1268 //
1269
1270 if (t->bound) {
1271
1272 if (t->bound != task->incall) {
1273 #if !defined(THREADED_RTS)
1274 // Must be a bound thread that is not the topmost one. Leave
1275 // it on the run queue until the stack has unwound to the
1276 // point where we can deal with this. Leaving it on the run
1277 // queue also ensures that the garbage collector knows about
1278 // this thread and its return value (it gets dropped from the
1279 // step->threads list so there's no other way to find it).
1280 appendToRunQueue(cap,t);
1281 return rtsFalse;
1282 #else
1283 // this cannot happen in the threaded RTS, because a
1284 // bound thread can only be run by the appropriate Task.
1285 barf("finished bound thread that isn't mine");
1286 #endif
1287 }
1288
1289 ASSERT(task->incall->tso == t);
1290
1291 if (t->what_next == ThreadComplete) {
1292 if (task->incall->ret) {
1293 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1294 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1295 }
1296 task->incall->stat = Success;
1297 } else {
1298 if (task->incall->ret) {
1299 *(task->incall->ret) = NULL;
1300 }
1301 if (sched_state >= SCHED_INTERRUPTING) {
1302 if (heap_overflow) {
1303 task->incall->stat = HeapExhausted;
1304 } else {
1305 task->incall->stat = Interrupted;
1306 }
1307 } else {
1308 task->incall->stat = Killed;
1309 }
1310 }
1311 #ifdef DEBUG
1312 removeThreadLabel((StgWord)task->incall->tso->id);
1313 #endif
1314
1315 // We no longer consider this thread and task to be bound to
1316 // each other. The TSO lives on until it is GC'd, but the
1317 // task is about to be released by the caller, and we don't
1318 // want anyone following the pointer from the TSO to the
1319 // defunct task (which might have already been
1320 // re-used). This was a real bug: the GC updated
1321 // tso->bound->tso which lead to a deadlock.
1322 t->bound = NULL;
1323 task->incall->tso = NULL;
1324
1325 return rtsTrue; // tells schedule() to return
1326 }
1327
1328 return rtsFalse;
1329 }
1330
1331 /* -----------------------------------------------------------------------------
1332 * Perform a heap census
1333 * -------------------------------------------------------------------------- */
1334
1335 static rtsBool
1336 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1337 {
1338 // When we have +RTS -i0 and we're heap profiling, do a census at
1339 // every GC. This lets us get repeatable runs for debugging.
1340 if (performHeapProfile ||
1341 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1342 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1343 return rtsTrue;
1344 } else {
1345 return rtsFalse;
1346 }
1347 }
1348
1349 /* -----------------------------------------------------------------------------
1350 * Start a synchronisation of all capabilities
1351 * -------------------------------------------------------------------------- */
1352
1353 // Returns:
1354 // 0 if we successfully got a sync
1355 // non-0 if there was another sync request in progress,
1356 // and we yielded to it. The value returned is the
1357 // type of the other sync request.
1358 //
1359 #if defined(THREADED_RTS)
1360 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1361 {
1362 nat prev_pending_sync;
1363
1364 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1365
1366 if (prev_pending_sync)
1367 {
1368 do {
1369 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1370 prev_pending_sync);
1371 ASSERT(*pcap);
1372 yieldCapability(pcap,task,rtsTrue);
1373 } while (pending_sync);
1374 return prev_pending_sync; // NOTE: task->cap might have changed now
1375 }
1376 else
1377 {
1378 return 0;
1379 }
1380 }
1381
1382 //
1383 // Grab all the capabilities except the one we already hold. Used
1384 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1385 // before a fork (SYNC_OTHER).
1386 //
1387 // Only call this after requestSync(), otherwise a deadlock might
1388 // ensue if another thread is trying to synchronise.
1389 //
1390 static void acquireAllCapabilities(Capability *cap, Task *task)
1391 {
1392 Capability *tmpcap;
1393 nat i;
1394
1395 for (i=0; i < n_capabilities; i++) {
1396 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1397 tmpcap = &capabilities[i];
1398 if (tmpcap != cap) {
1399 // we better hope this task doesn't get migrated to
1400 // another Capability while we're waiting for this one.
1401 // It won't, because load balancing happens while we have
1402 // all the Capabilities, but even so it's a slightly
1403 // unsavoury invariant.
1404 task->cap = tmpcap;
1405 waitForReturnCapability(&tmpcap, task);
1406 if (tmpcap->no != i) {
1407 barf("acquireAllCapabilities: got the wrong capability");
1408 }
1409 }
1410 }
1411 task->cap = cap;
1412 }
1413
1414 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1415 {
1416 nat i;
1417
1418 for (i = 0; i < n; i++) {
1419 if (cap->no != i) {
1420 task->cap = &capabilities[i];
1421 releaseCapability(&capabilities[i]);
1422 }
1423 }
1424 task->cap = cap;
1425 }
1426 #endif
1427
1428 /* -----------------------------------------------------------------------------
1429 * Perform a garbage collection if necessary
1430 * -------------------------------------------------------------------------- */
1431
1432 static void
1433 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1434 rtsBool force_major)
1435 {
1436 Capability *cap = *pcap;
1437 rtsBool heap_census;
1438 nat collect_gen;
1439 #ifdef THREADED_RTS
1440 StgWord8 gc_type;
1441 nat i, sync;
1442 StgTSO *tso;
1443 #endif
1444
1445 if (sched_state == SCHED_SHUTTING_DOWN) {
1446 // The final GC has already been done, and the system is
1447 // shutting down. We'll probably deadlock if we try to GC
1448 // now.
1449 return;
1450 }
1451
1452 heap_census = scheduleNeedHeapProfile(rtsTrue);
1453
1454 // Figure out which generation we are collecting, so that we can
1455 // decide whether this is a parallel GC or not.
1456 collect_gen = calcNeeded(force_major || heap_census, NULL);
1457
1458 #ifdef THREADED_RTS
1459 if (sched_state < SCHED_INTERRUPTING
1460 && RtsFlags.ParFlags.parGcEnabled
1461 && collect_gen >= RtsFlags.ParFlags.parGcGen
1462 && ! oldest_gen->mark)
1463 {
1464 gc_type = SYNC_GC_PAR;
1465 } else {
1466 gc_type = SYNC_GC_SEQ;
1467 }
1468
1469 // In order to GC, there must be no threads running Haskell code.
1470 // Therefore, the GC thread needs to hold *all* the capabilities,
1471 // and release them after the GC has completed.
1472 //
1473 // This seems to be the simplest way: previous attempts involved
1474 // making all the threads with capabilities give up their
1475 // capabilities and sleep except for the *last* one, which
1476 // actually did the GC. But it's quite hard to arrange for all
1477 // the other tasks to sleep and stay asleep.
1478 //
1479
1480 /* Other capabilities are prevented from running yet more Haskell
1481 threads if pending_sync is set. Tested inside
1482 yieldCapability() and releaseCapability() in Capability.c */
1483
1484 do {
1485 sync = requestSync(pcap, task, gc_type);
1486 cap = *pcap;
1487 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1488 // someone else had a pending sync request for a GC, so
1489 // let's assume GC has been done and we don't need to GC
1490 // again.
1491 return;
1492 }
1493 if (sched_state == SCHED_SHUTTING_DOWN) {
1494 // The scheduler might now be shutting down. We tested
1495 // this above, but it might have become true since then as
1496 // we yielded the capability in requestSync().
1497 return;
1498 }
1499 } while (sync);
1500
1501 // don't declare this until after we have sync'd, because
1502 // n_capabilities may change.
1503 rtsBool idle_cap[n_capabilities];
1504 #ifdef DEBUG
1505 unsigned int old_n_capabilities = n_capabilities;
1506 #endif
1507
1508 interruptAllCapabilities();
1509
1510 // The final shutdown GC is always single-threaded, because it's
1511 // possible that some of the Capabilities have no worker threads.
1512
1513 if (gc_type == SYNC_GC_SEQ)
1514 {
1515 traceEventRequestSeqGc(cap);
1516 }
1517 else
1518 {
1519 traceEventRequestParGc(cap);
1520 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1521 }
1522
1523 if (gc_type == SYNC_GC_SEQ)
1524 {
1525 // single-threaded GC: grab all the capabilities
1526 acquireAllCapabilities(cap,task);
1527 }
1528 else
1529 {
1530 // If we are load-balancing collections in this
1531 // generation, then we require all GC threads to participate
1532 // in the collection. Otherwise, we only require active
1533 // threads to participate, and we set gc_threads[i]->idle for
1534 // any idle capabilities. The rationale here is that waking
1535 // up an idle Capability takes much longer than just doing any
1536 // GC work on its behalf.
1537
1538 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1539 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1540 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1541 for (i=0; i < n_capabilities; i++) {
1542 if (capabilities[i].disabled) {
1543 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1544 } else {
1545 idle_cap[i] = rtsFalse;
1546 }
1547 }
1548 } else {
1549 for (i=0; i < n_capabilities; i++) {
1550 if (capabilities[i].disabled) {
1551 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1552 } else if (i == cap->no ||
1553 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1554 idle_cap[i] = rtsFalse;
1555 } else {
1556 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1557 if (!idle_cap[i]) {
1558 n_failed_trygrab_idles++;
1559 } else {
1560 n_idle_caps++;
1561 }
1562 }
1563 }
1564 }
1565
1566 // We set the gc_thread[i]->idle flag if that
1567 // capability/thread is not participating in this collection.
1568 // We also keep a local record of which capabilities are idle
1569 // in idle_cap[], because scheduleDoGC() is re-entrant:
1570 // another thread might start a GC as soon as we've finished
1571 // this one, and thus the gc_thread[]->idle flags are invalid
1572 // as soon as we release any threads after GC. Getting this
1573 // wrong leads to a rare and hard to debug deadlock!
1574
1575 for (i=0; i < n_capabilities; i++) {
1576 gc_threads[i]->idle = idle_cap[i];
1577 capabilities[i].idle++;
1578 }
1579
1580 // For all capabilities participating in this GC, wait until
1581 // they have stopped mutating and are standing by for GC.
1582 waitForGcThreads(cap);
1583
1584 #if defined(THREADED_RTS)
1585 // Stable point where we can do a global check on our spark counters
1586 ASSERT(checkSparkCountInvariant());
1587 #endif
1588 }
1589
1590 #endif
1591
1592 IF_DEBUG(scheduler, printAllThreads());
1593
1594 delete_threads_and_gc:
1595 /*
1596 * We now have all the capabilities; if we're in an interrupting
1597 * state, then we should take the opportunity to delete all the
1598 * threads in the system.
1599 */
1600 if (sched_state == SCHED_INTERRUPTING) {
1601 deleteAllThreads(cap);
1602 #if defined(THREADED_RTS)
1603 // Discard all the sparks from every Capability. Why?
1604 // They'll probably be GC'd anyway since we've killed all the
1605 // threads. It just avoids the GC having to do any work to
1606 // figure out that any remaining sparks are garbage.
1607 for (i = 0; i < n_capabilities; i++) {
1608 capabilities[i].spark_stats.gcd +=
1609 sparkPoolSize(capabilities[i].sparks);
1610 // No race here since all Caps are stopped.
1611 discardSparksCap(&capabilities[i]);
1612 }
1613 #endif
1614 sched_state = SCHED_SHUTTING_DOWN;
1615 }
1616
1617 /*
1618 * When there are disabled capabilities, we want to migrate any
1619 * threads away from them. Normally this happens in the
1620 * scheduler's loop, but only for unbound threads - it's really
1621 * hard for a bound thread to migrate itself. So we have another
1622 * go here.
1623 */
1624 #if defined(THREADED_RTS)
1625 for (i = enabled_capabilities; i < n_capabilities; i++) {
1626 Capability *tmp_cap, *dest_cap;
1627 tmp_cap = &capabilities[i];
1628 ASSERT(tmp_cap->disabled);
1629 if (i != cap->no) {
1630 dest_cap = &capabilities[i % enabled_capabilities];
1631 while (!emptyRunQueue(tmp_cap)) {
1632 tso = popRunQueue(tmp_cap);
1633 migrateThread(tmp_cap, tso, dest_cap);
1634 if (tso->bound) {
1635 traceTaskMigrate(tso->bound->task,
1636 tso->bound->task->cap,
1637 dest_cap);
1638 tso->bound->task->cap = dest_cap;
1639 }
1640 }
1641 }
1642 }
1643 #endif
1644
1645 #if defined(THREADED_RTS)
1646 // reset pending_sync *before* GC, so that when the GC threads
1647 // emerge they don't immediately re-enter the GC.
1648 pending_sync = 0;
1649 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1650 #else
1651 GarbageCollect(collect_gen, heap_census, 0, cap);
1652 #endif
1653
1654 traceSparkCounters(cap);
1655
1656 switch (recent_activity) {
1657 case ACTIVITY_INACTIVE:
1658 if (force_major) {
1659 // We are doing a GC because the system has been idle for a
1660 // timeslice and we need to check for deadlock. Record the
1661 // fact that we've done a GC and turn off the timer signal;
1662 // it will get re-enabled if we run any threads after the GC.
1663 recent_activity = ACTIVITY_DONE_GC;
1664 #ifndef PROFILING
1665 stopTimer();
1666 #endif
1667 break;
1668 }
1669 // fall through...
1670
1671 case ACTIVITY_MAYBE_NO:
1672 // the GC might have taken long enough for the timer to set
1673 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1674 // but we aren't necessarily deadlocked:
1675 recent_activity = ACTIVITY_YES;
1676 break;
1677
1678 case ACTIVITY_DONE_GC:
1679 // If we are actually active, the scheduler will reset the
1680 // recent_activity flag and re-enable the timer.
1681 break;
1682 }
1683
1684 #if defined(THREADED_RTS)
1685 // Stable point where we can do a global check on our spark counters
1686 ASSERT(checkSparkCountInvariant());
1687 #endif
1688
1689 // The heap census itself is done during GarbageCollect().
1690 if (heap_census) {
1691 performHeapProfile = rtsFalse;
1692 }
1693
1694 #if defined(THREADED_RTS)
1695
1696 // If n_capabilities has changed during GC, we're in trouble.
1697 ASSERT(n_capabilities == old_n_capabilities);
1698
1699 if (gc_type == SYNC_GC_PAR)
1700 {
1701 releaseGCThreads(cap);
1702 for (i = 0; i < n_capabilities; i++) {
1703 if (i != cap->no) {
1704 if (idle_cap[i]) {
1705 ASSERT(capabilities[i].running_task == task);
1706 task->cap = &capabilities[i];
1707 releaseCapability(&capabilities[i]);
1708 } else {
1709 ASSERT(capabilities[i].running_task != task);
1710 }
1711 }
1712 }
1713 task->cap = cap;
1714 }
1715 #endif
1716
1717 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1718 // GC set the heap_overflow flag, so we should proceed with
1719 // an orderly shutdown now. Ultimately we want the main
1720 // thread to return to its caller with HeapExhausted, at which
1721 // point the caller should call hs_exit(). The first step is
1722 // to delete all the threads.
1723 //
1724 // Another way to do this would be to raise an exception in
1725 // the main thread, which we really should do because it gives
1726 // the program a chance to clean up. But how do we find the
1727 // main thread? It should presumably be the same one that
1728 // gets ^C exceptions, but that's all done on the Haskell side
1729 // (GHC.TopHandler).
1730 sched_state = SCHED_INTERRUPTING;
1731 goto delete_threads_and_gc;
1732 }
1733
1734 #ifdef SPARKBALANCE
1735 /* JB
1736 Once we are all together... this would be the place to balance all
1737 spark pools. No concurrent stealing or adding of new sparks can
1738 occur. Should be defined in Sparks.c. */
1739 balanceSparkPoolsCaps(n_capabilities, capabilities);
1740 #endif
1741
1742 #if defined(THREADED_RTS)
1743 if (gc_type == SYNC_GC_SEQ) {
1744 // release our stash of capabilities.
1745 releaseAllCapabilities(n_capabilities, cap, task);
1746 }
1747 #endif
1748
1749 return;
1750 }
1751
1752 /* ---------------------------------------------------------------------------
1753 * Singleton fork(). Do not copy any running threads.
1754 * ------------------------------------------------------------------------- */
1755
1756 pid_t
1757 forkProcess(HsStablePtr *entry
1758 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1759 STG_UNUSED
1760 #endif
1761 )
1762 {
1763 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1764 pid_t pid;
1765 StgTSO* t,*next;
1766 Capability *cap;
1767 nat g;
1768 Task *task = NULL;
1769 nat i;
1770 #ifdef THREADED_RTS
1771 nat sync;
1772 #endif
1773
1774 debugTrace(DEBUG_sched, "forking!");
1775
1776 task = newBoundTask();
1777
1778 cap = NULL;
1779 waitForReturnCapability(&cap, task);
1780
1781 #ifdef THREADED_RTS
1782 do {
1783 sync = requestSync(&cap, task, SYNC_OTHER);
1784 } while (sync);
1785
1786 acquireAllCapabilities(cap,task);
1787
1788 pending_sync = 0;
1789 #endif
1790
1791 // no funny business: hold locks while we fork, otherwise if some
1792 // other thread is holding a lock when the fork happens, the data
1793 // structure protected by the lock will forever be in an
1794 // inconsistent state in the child. See also #1391.
1795 ACQUIRE_LOCK(&sched_mutex);
1796 ACQUIRE_LOCK(&sm_mutex);
1797 ACQUIRE_LOCK(&stable_mutex);
1798 ACQUIRE_LOCK(&task->lock);
1799
1800 for (i=0; i < n_capabilities; i++) {
1801 ACQUIRE_LOCK(&capabilities[i].lock);
1802 }
1803
1804 stopTimer(); // See #4074
1805
1806 #if defined(TRACING)
1807 flushEventLog(); // so that child won't inherit dirty file buffers
1808 #endif
1809
1810 pid = fork();
1811
1812 if (pid) { // parent
1813
1814 startTimer(); // #4074
1815
1816 RELEASE_LOCK(&sched_mutex);
1817 RELEASE_LOCK(&sm_mutex);
1818 RELEASE_LOCK(&stable_mutex);
1819 RELEASE_LOCK(&task->lock);
1820
1821 for (i=0; i < n_capabilities; i++) {
1822 releaseCapability_(&capabilities[i],rtsFalse);
1823 RELEASE_LOCK(&capabilities[i].lock);
1824 }
1825 boundTaskExiting(task);
1826
1827 // just return the pid
1828 return pid;
1829
1830 } else { // child
1831
1832 #if defined(THREADED_RTS)
1833 initMutex(&sched_mutex);
1834 initMutex(&sm_mutex);
1835 initMutex(&stable_mutex);
1836 initMutex(&task->lock);
1837
1838 for (i=0; i < n_capabilities; i++) {
1839 initMutex(&capabilities[i].lock);
1840 }
1841 #endif
1842
1843 #ifdef TRACING
1844 resetTracing();
1845 #endif
1846
1847 // Now, all OS threads except the thread that forked are
1848 // stopped. We need to stop all Haskell threads, including
1849 // those involved in foreign calls. Also we need to delete
1850 // all Tasks, because they correspond to OS threads that are
1851 // now gone.
1852
1853 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1854 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1855 next = t->global_link;
1856 // don't allow threads to catch the ThreadKilled
1857 // exception, but we do want to raiseAsync() because these
1858 // threads may be evaluating thunks that we need later.
1859 deleteThread_(t->cap,t);
1860
1861 // stop the GC from updating the InCall to point to
1862 // the TSO. This is only necessary because the
1863 // OSThread bound to the TSO has been killed, and
1864 // won't get a chance to exit in the usual way (see
1865 // also scheduleHandleThreadFinished).
1866 t->bound = NULL;
1867 }
1868 }
1869
1870 discardTasksExcept(task);
1871
1872 for (i=0; i < n_capabilities; i++) {
1873 cap = &capabilities[i];
1874
1875 // Empty the run queue. It seems tempting to let all the
1876 // killed threads stay on the run queue as zombies to be
1877 // cleaned up later, but some of them may correspond to
1878 // bound threads for which the corresponding Task does not
1879 // exist.
1880 truncateRunQueue(cap);
1881
1882 // Any suspended C-calling Tasks are no more, their OS threads
1883 // don't exist now:
1884 cap->suspended_ccalls = NULL;
1885
1886 #if defined(THREADED_RTS)
1887 // Wipe our spare workers list, they no longer exist. New
1888 // workers will be created if necessary.
1889 cap->spare_workers = NULL;
1890 cap->n_spare_workers = 0;
1891 cap->returning_tasks_hd = NULL;
1892 cap->returning_tasks_tl = NULL;
1893 #endif
1894
1895 // Release all caps except 0, we'll use that for starting
1896 // the IO manager and running the client action below.
1897 if (cap->no != 0) {
1898 task->cap = cap;
1899 releaseCapability(cap);
1900 }
1901 }
1902 cap = &capabilities[0];
1903 task->cap = cap;
1904
1905 // Empty the threads lists. Otherwise, the garbage
1906 // collector may attempt to resurrect some of these threads.
1907 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1908 generations[g].threads = END_TSO_QUEUE;
1909 }
1910
1911 // On Unix, all timers are reset in the child, so we need to start
1912 // the timer again.
1913 initTimer();
1914 startTimer();
1915
1916 // TODO: need to trace various other things in the child
1917 // like startup event, capabilities, process info etc
1918 traceTaskCreate(task, cap);
1919
1920 #if defined(THREADED_RTS)
1921 ioManagerStartCap(&cap);
1922 #endif
1923
1924 rts_evalStableIO(&cap, entry, NULL); // run the action
1925 rts_checkSchedStatus("forkProcess",cap);
1926
1927 rts_unlock(cap);
1928 hs_exit(); // clean up and exit
1929 stg_exit(EXIT_SUCCESS);
1930 }
1931 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1932 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1933 #endif
1934 }
1935
1936 /* ---------------------------------------------------------------------------
1937 * Changing the number of Capabilities
1938 *
1939 * Changing the number of Capabilities is very tricky! We can only do
1940 * it with the system fully stopped, so we do a full sync with
1941 * requestSync(SYNC_OTHER) and grab all the capabilities.
1942 *
1943 * Then we resize the appropriate data structures, and update all
1944 * references to the old data structures which have now moved.
1945 * Finally we release the Capabilities we are holding, and start
1946 * worker Tasks on the new Capabilities we created.
1947 *
1948 * ------------------------------------------------------------------------- */
1949
1950 void
1951 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1952 {
1953 #if !defined(THREADED_RTS)
1954 if (new_n_capabilities != 1) {
1955 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1956 }
1957 return;
1958 #elif defined(NOSMP)
1959 if (new_n_capabilities != 1) {
1960 errorBelch("setNumCapabilities: not supported on this platform");
1961 }
1962 return;
1963 #else
1964 Task *task;
1965 Capability *cap;
1966 nat sync;
1967 StgTSO* t;
1968 nat g, n;
1969 Capability *old_capabilities = NULL;
1970 nat old_n_capabilities = n_capabilities;
1971
1972 if (new_n_capabilities == enabled_capabilities) return;
1973
1974 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1975 enabled_capabilities, new_n_capabilities);
1976
1977 cap = rts_lock();
1978 task = cap->running_task;
1979
1980 do {
1981 sync = requestSync(&cap, task, SYNC_OTHER);
1982 } while (sync);
1983
1984 acquireAllCapabilities(cap,task);
1985
1986 pending_sync = 0;
1987
1988 if (new_n_capabilities < enabled_capabilities)
1989 {
1990 // Reducing the number of capabilities: we do not actually
1991 // remove the extra capabilities, we just mark them as
1992 // "disabled". This has the following effects:
1993 //
1994 // - threads on a disabled capability are migrated away by the
1995 // scheduler loop
1996 //
1997 // - disabled capabilities do not participate in GC
1998 // (see scheduleDoGC())
1999 //
2000 // - No spark threads are created on this capability
2001 // (see scheduleActivateSpark())
2002 //
2003 // - We do not attempt to migrate threads *to* a disabled
2004 // capability (see schedulePushWork()).
2005 //
2006 // but in other respects, a disabled capability remains
2007 // alive. Threads may be woken up on a disabled capability,
2008 // but they will be immediately migrated away.
2009 //
2010 // This approach is much easier than trying to actually remove
2011 // the capability; we don't have to worry about GC data
2012 // structures, the nursery, etc.
2013 //
2014 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2015 capabilities[n].disabled = rtsTrue;
2016 traceCapDisable(&capabilities[n]);
2017 }
2018 enabled_capabilities = new_n_capabilities;
2019 }
2020 else
2021 {
2022 // Increasing the number of enabled capabilities.
2023 //
2024 // enable any disabled capabilities, up to the required number
2025 for (n = enabled_capabilities;
2026 n < new_n_capabilities && n < n_capabilities; n++) {
2027 capabilities[n].disabled = rtsFalse;
2028 traceCapEnable(&capabilities[n]);
2029 }
2030 enabled_capabilities = n;
2031
2032 if (new_n_capabilities > n_capabilities) {
2033 #if defined(TRACING)
2034 // Allocate eventlog buffers for the new capabilities. Note this
2035 // must be done before calling moreCapabilities(), because that
2036 // will emit events about creating the new capabilities and adding
2037 // them to existing capsets.
2038 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2039 #endif
2040
2041 // Resize the capabilities array
2042 // NB. after this, capabilities points somewhere new. Any pointers
2043 // of type (Capability *) are now invalid.
2044 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2045
2046 // update our own cap pointer
2047 cap = &capabilities[cap->no];
2048
2049 // Resize and update storage manager data structures
2050 storageAddCapabilities(n_capabilities, new_n_capabilities);
2051
2052 // Update (Capability *) refs in the Task manager.
2053 updateCapabilityRefs();
2054
2055 // Update (Capability *) refs from TSOs
2056 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2057 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2058 t->cap = &capabilities[t->cap->no];
2059 }
2060 }
2061 }
2062 }
2063
2064 // update n_capabilities before things start running
2065 if (new_n_capabilities > n_capabilities) {
2066 n_capabilities = enabled_capabilities = new_n_capabilities;
2067 }
2068
2069 // Start worker tasks on the new Capabilities
2070 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2071
2072 // We're done: release the original Capabilities
2073 releaseAllCapabilities(old_n_capabilities, cap,task);
2074
2075 // We can't free the old array until now, because we access it
2076 // while updating pointers in updateCapabilityRefs().
2077 if (old_capabilities) {
2078 stgFree(old_capabilities);
2079 }
2080
2081 // Notify IO manager that the number of capabilities has changed.
2082 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2083
2084 rts_unlock(cap);
2085
2086 #endif // THREADED_RTS
2087 }
2088
2089
2090
2091 /* ---------------------------------------------------------------------------
2092 * Delete all the threads in the system
2093 * ------------------------------------------------------------------------- */
2094
2095 static void
2096 deleteAllThreads ( Capability *cap )
2097 {
2098 // NOTE: only safe to call if we own all capabilities.
2099
2100 StgTSO* t, *next;
2101 nat g;
2102
2103 debugTrace(DEBUG_sched,"deleting all threads");
2104 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2105 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2106 next = t->global_link;
2107 deleteThread(cap,t);
2108 }
2109 }
2110
2111 // The run queue now contains a bunch of ThreadKilled threads. We
2112 // must not throw these away: the main thread(s) will be in there
2113 // somewhere, and the main scheduler loop has to deal with it.
2114 // Also, the run queue is the only thing keeping these threads from
2115 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2116
2117 #if !defined(THREADED_RTS)
2118 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2119 ASSERT(sleeping_queue == END_TSO_QUEUE);
2120 #endif
2121 }
2122
2123 /* -----------------------------------------------------------------------------
2124 Managing the suspended_ccalls list.
2125 Locks required: sched_mutex
2126 -------------------------------------------------------------------------- */
2127
2128 STATIC_INLINE void
2129 suspendTask (Capability *cap, Task *task)
2130 {
2131 InCall *incall;
2132
2133 incall = task->incall;
2134 ASSERT(incall->next == NULL && incall->prev == NULL);
2135 incall->next = cap->suspended_ccalls;
2136 incall->prev = NULL;
2137 if (cap->suspended_ccalls) {
2138 cap->suspended_ccalls->prev = incall;
2139 }
2140 cap->suspended_ccalls = incall;
2141 }
2142
2143 STATIC_INLINE void
2144 recoverSuspendedTask (Capability *cap, Task *task)
2145 {
2146 InCall *incall;
2147
2148 incall = task->incall;
2149 if (incall->prev) {
2150 incall->prev->next = incall->next;
2151 } else {
2152 ASSERT(cap->suspended_ccalls == incall);
2153 cap->suspended_ccalls = incall->next;
2154 }
2155 if (incall->next) {
2156 incall->next->prev = incall->prev;
2157 }
2158 incall->next = incall->prev = NULL;
2159 }
2160
2161 /* ---------------------------------------------------------------------------
2162 * Suspending & resuming Haskell threads.
2163 *
2164 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2165 * its capability before calling the C function. This allows another
2166 * task to pick up the capability and carry on running Haskell
2167 * threads. It also means that if the C call blocks, it won't lock
2168 * the whole system.
2169 *
2170 * The Haskell thread making the C call is put to sleep for the
2171 * duration of the call, on the suspended_ccalling_threads queue. We
2172 * give out a token to the task, which it can use to resume the thread
2173 * on return from the C function.
2174 *
2175 * If this is an interruptible C call, this means that the FFI call may be
2176 * unceremoniously terminated and should be scheduled on an
2177 * unbound worker thread.
2178 * ------------------------------------------------------------------------- */
2179
2180 void *
2181 suspendThread (StgRegTable *reg, rtsBool interruptible)
2182 {
2183 Capability *cap;
2184 int saved_errno;
2185 StgTSO *tso;
2186 Task *task;
2187 #if mingw32_HOST_OS
2188 StgWord32 saved_winerror;
2189 #endif
2190
2191 saved_errno = errno;
2192 #if mingw32_HOST_OS
2193 saved_winerror = GetLastError();
2194 #endif
2195
2196 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2197 */
2198 cap = regTableToCapability(reg);
2199
2200 task = cap->running_task;
2201 tso = cap->r.rCurrentTSO;
2202
2203 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2204
2205 // XXX this might not be necessary --SDM
2206 tso->what_next = ThreadRunGHC;
2207
2208 threadPaused(cap,tso);
2209
2210 if (interruptible) {
2211 tso->why_blocked = BlockedOnCCall_Interruptible;
2212 } else {
2213 tso->why_blocked = BlockedOnCCall;
2214 }
2215
2216 // Hand back capability
2217 task->incall->suspended_tso = tso;
2218 task->incall->suspended_cap = cap;
2219
2220 ACQUIRE_LOCK(&cap->lock);
2221
2222 suspendTask(cap,task);
2223 cap->in_haskell = rtsFalse;
2224 releaseCapability_(cap,rtsFalse);
2225
2226 RELEASE_LOCK(&cap->lock);
2227
2228 errno = saved_errno;
2229 #if mingw32_HOST_OS
2230 SetLastError(saved_winerror);
2231 #endif
2232 return task;
2233 }
2234
2235 StgRegTable *
2236 resumeThread (void *task_)
2237 {
2238 StgTSO *tso;
2239 InCall *incall;
2240 Capability *cap;
2241 Task *task = task_;
2242 int saved_errno;
2243 #if mingw32_HOST_OS
2244 StgWord32 saved_winerror;
2245 #endif
2246
2247 saved_errno = errno;
2248 #if mingw32_HOST_OS
2249 saved_winerror = GetLastError();
2250 #endif
2251
2252 incall = task->incall;
2253 cap = incall->suspended_cap;
2254 task->cap = cap;
2255
2256 // Wait for permission to re-enter the RTS with the result.
2257 waitForReturnCapability(&cap,task);
2258 // we might be on a different capability now... but if so, our
2259 // entry on the suspended_ccalls list will also have been
2260 // migrated.
2261
2262 // Remove the thread from the suspended list
2263 recoverSuspendedTask(cap,task);
2264
2265 tso = incall->suspended_tso;
2266 incall->suspended_tso = NULL;
2267 incall->suspended_cap = NULL;
2268 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2269
2270 traceEventRunThread(cap, tso);
2271
2272 /* Reset blocking status */
2273 tso->why_blocked = NotBlocked;
2274
2275 if ((tso->flags & TSO_BLOCKEX) == 0) {
2276 // avoid locking the TSO if we don't have to
2277 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2278 maybePerformBlockedException(cap,tso);
2279 }
2280 }
2281
2282 cap->r.rCurrentTSO = tso;
2283 cap->in_haskell = rtsTrue;
2284 errno = saved_errno;
2285 #if mingw32_HOST_OS
2286 SetLastError(saved_winerror);
2287 #endif
2288
2289 /* We might have GC'd, mark the TSO dirty again */
2290 dirty_TSO(cap,tso);
2291 dirty_STACK(cap,tso->stackobj);
2292
2293 IF_DEBUG(sanity, checkTSO(tso));
2294
2295 return &cap->r;
2296 }
2297
2298 /* ---------------------------------------------------------------------------
2299 * scheduleThread()
2300 *
2301 * scheduleThread puts a thread on the end of the runnable queue.
2302 * This will usually be done immediately after a thread is created.
2303 * The caller of scheduleThread must create the thread using e.g.
2304 * createThread and push an appropriate closure
2305 * on this thread's stack before the scheduler is invoked.
2306 * ------------------------------------------------------------------------ */
2307
2308 void
2309 scheduleThread(Capability *cap, StgTSO *tso)
2310 {
2311 // The thread goes at the *end* of the run-queue, to avoid possible
2312 // starvation of any threads already on the queue.
2313 appendToRunQueue(cap,tso);
2314 }
2315
2316 void
2317 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2318 {
2319 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2320 // move this thread from now on.
2321 #if defined(THREADED_RTS)
2322 cpu %= enabled_capabilities;
2323 if (cpu == cap->no) {
2324 appendToRunQueue(cap,tso);
2325 } else {
2326 migrateThread(cap, tso, &capabilities[cpu]);
2327 }
2328 #else
2329 appendToRunQueue(cap,tso);
2330 #endif
2331 }
2332
2333 void
2334 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2335 {
2336 Task *task;
2337 DEBUG_ONLY( StgThreadID id );
2338 Capability *cap;
2339
2340 cap = *pcap;
2341
2342 // We already created/initialised the Task
2343 task = cap->running_task;
2344
2345 // This TSO is now a bound thread; make the Task and TSO
2346 // point to each other.
2347 tso->bound = task->incall;
2348 tso->cap = cap;
2349
2350 task->incall->tso = tso;
2351 task->incall->ret = ret;
2352 task->incall->stat = NoStatus;
2353
2354 appendToRunQueue(cap,tso);
2355
2356 DEBUG_ONLY( id = tso->id );
2357 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2358
2359 cap = schedule(cap,task);
2360
2361 ASSERT(task->incall->stat != NoStatus);
2362 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2363
2364 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2365 *pcap = cap;
2366 }
2367
2368 /* ----------------------------------------------------------------------------
2369 * Starting Tasks
2370 * ------------------------------------------------------------------------- */
2371
2372 #if defined(THREADED_RTS)
2373 void scheduleWorker (Capability *cap, Task *task)
2374 {
2375 // schedule() runs without a lock.
2376 cap = schedule(cap,task);
2377
2378 // On exit from schedule(), we have a Capability, but possibly not
2379 // the same one we started with.
2380
2381 // During shutdown, the requirement is that after all the
2382 // Capabilities are shut down, all workers that are shutting down
2383 // have finished workerTaskStop(). This is why we hold on to
2384 // cap->lock until we've finished workerTaskStop() below.
2385 //
2386 // There may be workers still involved in foreign calls; those
2387 // will just block in waitForReturnCapability() because the
2388 // Capability has been shut down.
2389 //
2390 ACQUIRE_LOCK(&cap->lock);
2391 releaseCapability_(cap,rtsFalse);
2392 workerTaskStop(task);
2393 RELEASE_LOCK(&cap->lock);
2394 }
2395 #endif
2396
2397 /* ---------------------------------------------------------------------------
2398 * Start new worker tasks on Capabilities from--to
2399 * -------------------------------------------------------------------------- */
2400
2401 static void
2402 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2403 {
2404 #if defined(THREADED_RTS)
2405 nat i;
2406 Capability *cap;
2407
2408 for (i = from; i < to; i++) {
2409 cap = &capabilities[i];
2410 ACQUIRE_LOCK(&cap->lock);
2411 startWorkerTask(cap);
2412 RELEASE_LOCK(&cap->lock);
2413 }
2414 #endif
2415 }
2416
2417 /* ---------------------------------------------------------------------------
2418 * initScheduler()
2419 *
2420 * Initialise the scheduler. This resets all the queues - if the
2421 * queues contained any threads, they'll be garbage collected at the
2422 * next pass.
2423 *
2424 * ------------------------------------------------------------------------ */
2425
2426 void
2427 initScheduler(void)
2428 {
2429 #if !defined(THREADED_RTS)
2430 blocked_queue_hd = END_TSO_QUEUE;
2431 blocked_queue_tl = END_TSO_QUEUE;
2432 sleeping_queue = END_TSO_QUEUE;
2433 #endif
2434
2435 sched_state = SCHED_RUNNING;
2436 recent_activity = ACTIVITY_YES;
2437
2438 #if defined(THREADED_RTS)
2439 /* Initialise the mutex and condition variables used by
2440 * the scheduler. */
2441 initMutex(&sched_mutex);
2442 #endif
2443
2444 ACQUIRE_LOCK(&sched_mutex);
2445
2446 /* A capability holds the state a native thread needs in
2447 * order to execute STG code. At least one capability is
2448 * floating around (only THREADED_RTS builds have more than one).
2449 */
2450 initCapabilities();
2451
2452 initTaskManager();
2453
2454 /*
2455 * Eagerly start one worker to run each Capability, except for
2456 * Capability 0. The idea is that we're probably going to start a
2457 * bound thread on Capability 0 pretty soon, so we don't want a
2458 * worker task hogging it.
2459 */
2460 startWorkerTasks(1, n_capabilities);
2461
2462 RELEASE_LOCK(&sched_mutex);
2463
2464 }
2465
2466 void
2467 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2468 /* see Capability.c, shutdownCapability() */
2469 {
2470 Task *task = NULL;
2471
2472 task = newBoundTask();
2473
2474 // If we haven't killed all the threads yet, do it now.
2475 if (sched_state < SCHED_SHUTTING_DOWN) {
2476 sched_state = SCHED_INTERRUPTING;
2477 Capability *cap = task->cap;
2478 waitForReturnCapability(&cap,task);
2479 scheduleDoGC(&cap,task,rtsTrue);
2480 ASSERT(task->incall->tso == NULL);
2481 releaseCapability(cap);
2482 }
2483 sched_state = SCHED_SHUTTING_DOWN;
2484
2485 shutdownCapabilities(task, wait_foreign);
2486
2487 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2488 // n_failed_trygrab_idles, n_idle_caps);
2489
2490 boundTaskExiting(task);
2491 }
2492
2493 void
2494 freeScheduler( void )
2495 {
2496 nat still_running;
2497
2498 ACQUIRE_LOCK(&sched_mutex);
2499 still_running = freeTaskManager();
2500 // We can only free the Capabilities if there are no Tasks still
2501 // running. We might have a Task about to return from a foreign
2502 // call into waitForReturnCapability(), for example (actually,
2503 // this should be the *only* thing that a still-running Task can
2504 // do at this point, and it will block waiting for the
2505 // Capability).
2506 if (still_running == 0) {
2507 freeCapabilities();
2508 if (n_capabilities != 1) {
2509 stgFree(capabilities);
2510 }
2511 }
2512 RELEASE_LOCK(&sched_mutex);
2513 #if defined(THREADED_RTS)
2514 closeMutex(&sched_mutex);
2515 #endif
2516 }
2517
2518 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2519 void *user USED_IF_NOT_THREADS)
2520 {
2521 #if !defined(THREADED_RTS)
2522 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2523 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2524 evac(user, (StgClosure **)(void *)&sleeping_queue);
2525 #endif
2526 }
2527
2528 /* -----------------------------------------------------------------------------
2529 performGC
2530
2531 This is the interface to the garbage collector from Haskell land.
2532 We provide this so that external C code can allocate and garbage
2533 collect when called from Haskell via _ccall_GC.
2534 -------------------------------------------------------------------------- */
2535
2536 static void
2537 performGC_(rtsBool force_major)
2538 {
2539 Task *task;
2540 Capability *cap = NULL;
2541
2542 // We must grab a new Task here, because the existing Task may be
2543 // associated with a particular Capability, and chained onto the
2544 // suspended_ccalls queue.
2545 task = newBoundTask();
2546
2547 // TODO: do we need to traceTask*() here?
2548
2549 waitForReturnCapability(&cap,task);
2550 scheduleDoGC(&cap,task,force_major);
2551 releaseCapability(cap);
2552 boundTaskExiting(task);
2553 }
2554
2555 void
2556 performGC(void)
2557 {
2558 performGC_(rtsFalse);
2559 }
2560
2561 void
2562 performMajorGC(void)
2563 {
2564 performGC_(rtsTrue);
2565 }
2566
2567 /* ---------------------------------------------------------------------------
2568 Interrupt execution
2569 - usually called inside a signal handler so it mustn't do anything fancy.
2570 ------------------------------------------------------------------------ */
2571
2572 void
2573 interruptStgRts(void)
2574 {
2575 sched_state = SCHED_INTERRUPTING;
2576 interruptAllCapabilities();
2577 #if defined(THREADED_RTS)
2578 wakeUpRts();
2579 #endif
2580 }
2581
2582 /* -----------------------------------------------------------------------------
2583 Wake up the RTS
2584
2585 This function causes at least one OS thread to wake up and run the
2586 scheduler loop. It is invoked when the RTS might be deadlocked, or
2587 an external event has arrived that may need servicing (eg. a
2588 keyboard interrupt).
2589
2590 In the single-threaded RTS we don't do anything here; we only have
2591 one thread anyway, and the event that caused us to want to wake up
2592 will have interrupted any blocking system call in progress anyway.
2593 -------------------------------------------------------------------------- */
2594
2595 #if defined(THREADED_RTS)
2596 void wakeUpRts(void)
2597 {
2598 // This forces the IO Manager thread to wakeup, which will
2599 // in turn ensure that some OS thread wakes up and runs the
2600 // scheduler loop, which will cause a GC and deadlock check.
2601 ioManagerWakeup();
2602 }
2603 #endif
2604
2605 /* -----------------------------------------------------------------------------
2606 Deleting threads
2607
2608 This is used for interruption (^C) and forking, and corresponds to
2609 raising an exception but without letting the thread catch the
2610 exception.
2611 -------------------------------------------------------------------------- */
2612
2613 static void
2614 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2615 {
2616 // NOTE: must only be called on a TSO that we have exclusive
2617 // access to, because we will call throwToSingleThreaded() below.
2618 // The TSO must be on the run queue of the Capability we own, or
2619 // we must own all Capabilities.
2620
2621 if (tso->why_blocked != BlockedOnCCall &&
2622 tso->why_blocked != BlockedOnCCall_Interruptible) {
2623 throwToSingleThreaded(tso->cap,tso,NULL);
2624 }
2625 }
2626
2627 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2628 static void
2629 deleteThread_(Capability *cap, StgTSO *tso)
2630 { // for forkProcess only:
2631 // like deleteThread(), but we delete threads in foreign calls, too.
2632
2633 if (tso->why_blocked == BlockedOnCCall ||
2634 tso->why_blocked == BlockedOnCCall_Interruptible) {
2635 tso->what_next = ThreadKilled;
2636 appendToRunQueue(tso->cap, tso);
2637 } else {
2638 deleteThread(cap,tso);
2639 }
2640 }
2641 #endif
2642
2643 /* -----------------------------------------------------------------------------
2644 raiseExceptionHelper
2645
2646 This function is called by the raise# primitve, just so that we can
2647 move some of the tricky bits of raising an exception from C-- into
2648 C. Who knows, it might be a useful re-useable thing here too.
2649 -------------------------------------------------------------------------- */
2650
2651 StgWord
2652 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2653 {
2654 Capability *cap = regTableToCapability(reg);
2655 StgThunk *raise_closure = NULL;
2656 StgPtr p, next;
2657 StgRetInfoTable *info;
2658 //
2659 // This closure represents the expression 'raise# E' where E
2660 // is the exception raise. It is used to overwrite all the
2661 // thunks which are currently under evaluataion.
2662 //
2663
2664 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2665 // LDV profiling: stg_raise_info has THUNK as its closure
2666 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2667 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2668 // 1 does not cause any problem unless profiling is performed.
2669 // However, when LDV profiling goes on, we need to linearly scan
2670 // small object pool, where raise_closure is stored, so we should
2671 // use MIN_UPD_SIZE.
2672 //
2673 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2674 // sizeofW(StgClosure)+1);
2675 //
2676
2677 //
2678 // Walk up the stack, looking for the catch frame. On the way,
2679 // we update any closures pointed to from update frames with the
2680 // raise closure that we just built.
2681 //
2682 p = tso->stackobj->sp;
2683 while(1) {
2684 info = get_ret_itbl((StgClosure *)p);
2685 next = p + stack_frame_sizeW((StgClosure *)p);
2686 switch (info->i.type) {
2687
2688 case UPDATE_FRAME:
2689 // Only create raise_closure if we need to.
2690 if (raise_closure == NULL) {
2691 raise_closure =
2692 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2693 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2694 raise_closure->payload[0] = exception;
2695 }
2696 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2697 (StgClosure *)raise_closure);
2698 p = next;
2699 continue;
2700
2701 case ATOMICALLY_FRAME:
2702 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2703 tso->stackobj->sp = p;
2704 return ATOMICALLY_FRAME;
2705
2706 case CATCH_FRAME:
2707 tso->stackobj->sp = p;
2708 return CATCH_FRAME;
2709
2710 case CATCH_STM_FRAME:
2711 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2712 tso->stackobj->sp = p;
2713 return CATCH_STM_FRAME;
2714
2715 case UNDERFLOW_FRAME:
2716 tso->stackobj->sp = p;
2717 threadStackUnderflow(cap,tso);
2718 p = tso->stackobj->sp;
2719 continue;
2720
2721 case STOP_FRAME:
2722 tso->stackobj->sp = p;
2723 return STOP_FRAME;
2724
2725 case CATCH_RETRY_FRAME: {
2726 StgTRecHeader *trec = tso -> trec;
2727 StgTRecHeader *outer = trec -> enclosing_trec;
2728 debugTrace(DEBUG_stm,
2729 "found CATCH_RETRY_FRAME at %p during raise", p);
2730 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2731 stmAbortTransaction(cap, trec);
2732 stmFreeAbortedTRec(cap, trec);
2733 tso -> trec = outer;
2734 p = next;
2735 continue;
2736 }
2737
2738 default:
2739 p = next;
2740 continue;
2741 }
2742 }
2743 }
2744
2745
2746 /* -----------------------------------------------------------------------------
2747 findRetryFrameHelper
2748
2749 This function is called by the retry# primitive. It traverses the stack
2750 leaving tso->sp referring to the frame which should handle the retry.
2751
2752 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2753 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2754
2755 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2756 create) because retries are not considered to be exceptions, despite the
2757 similar implementation.
2758
2759 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2760 not be created within memory transactions.
2761 -------------------------------------------------------------------------- */
2762
2763 StgWord
2764 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2765 {
2766 StgPtr p, next;
2767 StgRetInfoTable *info;
2768
2769 p = tso->stackobj->sp;
2770 while (1) {
2771 info = get_ret_itbl((StgClosure *)p);
2772 next = p + stack_frame_sizeW((StgClosure *)p);
2773 switch (info->i.type) {
2774
2775 case ATOMICALLY_FRAME:
2776 debugTrace(DEBUG_stm,
2777 "found ATOMICALLY_FRAME at %p during retry", p);
2778 tso->stackobj->sp = p;
2779 return ATOMICALLY_FRAME;
2780
2781 case CATCH_RETRY_FRAME:
2782 debugTrace(DEBUG_stm,
2783 "found CATCH_RETRY_FRAME at %p during retry", p);
2784 tso->stackobj->sp = p;
2785 return CATCH_RETRY_FRAME;
2786
2787 case CATCH_STM_FRAME: {
2788 StgTRecHeader *trec = tso -> trec;
2789 StgTRecHeader *outer = trec -> enclosing_trec;
2790 debugTrace(DEBUG_stm,
2791 "found CATCH_STM_FRAME at %p during retry", p);
2792 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2793 stmAbortTransaction(cap, trec);
2794 stmFreeAbortedTRec(cap, trec);
2795 tso -> trec = outer;
2796 p = next;
2797 continue;
2798 }
2799
2800 case UNDERFLOW_FRAME:
2801 tso->stackobj->sp = p;
2802 threadStackUnderflow(cap,tso);
2803 p = tso->stackobj->sp;
2804 continue;
2805
2806 default:
2807 ASSERT(info->i.type != CATCH_FRAME);
2808 ASSERT(info->i.type != STOP_FRAME);
2809 p = next;
2810 continue;
2811 }
2812 }
2813 }
2814
2815 /* -----------------------------------------------------------------------------
2816 resurrectThreads is called after garbage collection on the list of
2817 threads found to be garbage. Each of these threads will be woken
2818 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2819 on an MVar, or NonTermination if the thread was blocked on a Black
2820 Hole.
2821
2822 Locks: assumes we hold *all* the capabilities.
2823 -------------------------------------------------------------------------- */
2824
2825 void
2826 resurrectThreads (StgTSO *threads)
2827 {
2828 StgTSO *tso, *next;
2829 Capability *cap;
2830 generation *gen;
2831
2832 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2833 next = tso->global_link;
2834
2835 gen = Bdescr((P_)tso)->gen;
2836 tso->global_link = gen->threads;
2837 gen->threads = tso;
2838
2839 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2840
2841 // Wake up the thread on the Capability it was last on
2842 cap = tso->cap;
2843
2844 switch (tso->why_blocked) {
2845 case BlockedOnMVar:
2846 /* Called by GC - sched_mutex lock is currently held. */
2847 throwToSingleThreaded(cap, tso,
2848 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2849 break;
2850 case BlockedOnBlackHole:
2851 throwToSingleThreaded(cap, tso,
2852 (StgClosure *)nonTermination_closure);
2853 break;
2854 case BlockedOnSTM:
2855 throwToSingleThreaded(cap, tso,
2856 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2857 break;
2858 case NotBlocked:
2859 /* This might happen if the thread was blocked on a black hole
2860 * belonging to a thread that we've just woken up (raiseAsync
2861 * can wake up threads, remember...).
2862 */
2863 continue;
2864 case BlockedOnMsgThrowTo:
2865 // This can happen if the target is masking, blocks on a
2866 // black hole, and then is found to be unreachable. In
2867 // this case, we want to let the target wake up and carry
2868 // on, and do nothing to this thread.
2869 continue;
2870 default:
2871 barf("resurrectThreads: thread blocked in a strange way: %d",
2872 tso->why_blocked);
2873 }
2874 }
2875 }