fe346afe197d0fc8b24e0744792ce19312abdfd7
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability **pcap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability **cap);
149 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163
164 static void deleteThread (Capability *cap, StgTSO *tso);
165 static void deleteAllThreads (Capability *cap);
166
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability *cap, StgTSO *tso);
169 #endif
170
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
173
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
176
177 * out of heap space
178 * timer expires (thread yields)
179 * thread blocks
180 * thread ends
181 * stack overflow
182
183 GRAN version:
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
189
190 GUM version:
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
198
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
204
205 ------------------------------------------------------------------------ */
206
207 static Capability *
208 schedule (Capability *initialCapability, Task *task)
209 {
210 StgTSO *t;
211 Capability *cap;
212 StgThreadReturnCode ret;
213 nat prev_what_next;
214 rtsBool ready_to_gc;
215 #if defined(THREADED_RTS)
216 rtsBool first = rtsTrue;
217 #endif
218
219 cap = initialCapability;
220
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
224
225 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226
227 schedulePreLoop();
228
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
231
232 while (1) {
233
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
236 // call).
237 if (cap->in_haskell) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE);
241 }
242
243 // The interruption / shutdown sequence.
244 //
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
250 //
251 // So the sequence for ^C goes like this:
252 //
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
255 //
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
262 //
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
264 //
265 // * sched_state := SCHED_SHUTTING_DOWN
266 //
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
270 //
271 // * eventually all Capabilities will shut down, and the RTS can
272 // exit.
273 //
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
276
277 switch (sched_state) {
278 case SCHED_RUNNING:
279 break;
280 case SCHED_INTERRUPTING:
281 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap,task,rtsFalse);
284
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
288 // case now.
289 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
290 // fall through
291
292 case SCHED_SHUTTING_DOWN:
293 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
296 // the run queue.
297 if (!isBoundTask(task) && emptyRunQueue(cap)) {
298 return cap;
299 }
300 break;
301 default:
302 barf("sched_state: %d", sched_state);
303 }
304
305 scheduleFindWork(&cap);
306
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap,task);
310
311 scheduleDetectDeadlock(&cap,task);
312
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
318 //
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
321
322 #if defined(THREADED_RTS)
323 if (first)
324 {
325 // XXX: ToDo
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
328 // // door.
329 // first = rtsFalse;
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
331 }
332
333 scheduleYield(&cap,task);
334
335 if (emptyRunQueue(cap)) continue; // look for work again
336 #endif
337
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap) ) {
340 ASSERT(sched_state >= SCHED_INTERRUPTING);
341 }
342 #endif
343
344 //
345 // Get a thread to run
346 //
347 t = popRunQueue(cap);
348
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity,checkTSO(t));
352
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
356 {
357 InCall *bound = t->bound;
358
359 if (bound) {
360 if (bound->task == task) {
361 // yes, the Haskell thread is bound to the current native thread
362 } else {
363 debugTrace(DEBUG_sched,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t->id);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap,t);
368 continue;
369 }
370 } else {
371 // The thread we want to run is unbound.
372 if (task->incall->tso) {
373 debugTrace(DEBUG_sched,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t->id);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap,t);
379 continue;
380 }
381 }
382 }
383 #endif
384
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state >= SCHED_INTERRUPTING &&
390 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391 deleteThread(cap,t);
392 }
393
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
400 //
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
406 #ifdef THREADED_RTS
407 if (cap->disabled && !t->bound) {
408 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
409 migrateThread(cap, t, dest_cap);
410 continue;
411 }
412 #endif
413
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
416 * +RTS -C0
417 */
418 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
419 && !emptyThreadQueues(cap)) {
420 cap->context_switch = 1;
421 }
422
423 run_thread:
424
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
427 // that.
428 cap->r.rCurrentTSO = t;
429
430 startHeapProfTimer();
431
432 // ----------------------------------------------------------------------
433 // Run the current thread
434
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436 ASSERT(t->cap == cap);
437 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
438
439 prev_what_next = t->what_next;
440
441 errno = t->saved_errno;
442 #if mingw32_HOST_OS
443 SetLastError(t->saved_winerror);
444 #endif
445
446 // reset the interrupt flag before running Haskell code
447 cap->interrupt = 0;
448
449 cap->in_haskell = rtsTrue;
450 cap->idle = 0;
451
452 dirty_TSO(cap,t);
453 dirty_STACK(cap,t->stackobj);
454
455 #if defined(THREADED_RTS)
456 if (recent_activity == ACTIVITY_DONE_GC) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
459 nat prev;
460 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
461 if (prev == ACTIVITY_DONE_GC) {
462 startTimer();
463 }
464 } else if (recent_activity != ACTIVITY_INACTIVE) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
468 // wakeUpRts().
469 recent_activity = ACTIVITY_YES;
470 }
471 #endif
472
473 traceEventRunThread(cap, t);
474
475 switch (prev_what_next) {
476
477 case ThreadKilled:
478 case ThreadComplete:
479 /* Thread already finished, return to scheduler. */
480 ret = ThreadFinished;
481 break;
482
483 case ThreadRunGHC:
484 {
485 StgRegTable *r;
486 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
487 cap = regTableToCapability(r);
488 ret = r->rRet;
489 break;
490 }
491
492 case ThreadInterpret:
493 cap = interpretBCO(cap);
494 ret = cap->r.rRet;
495 break;
496
497 default:
498 barf("schedule: invalid what_next field");
499 }
500
501 cap->in_haskell = rtsFalse;
502
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t = cap->r.rCurrentTSO;
506
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t->saved_errno = errno;
511 #if mingw32_HOST_OS
512 // Similarly for Windows error code
513 t->saved_winerror = GetLastError();
514 #endif
515
516 if (ret == ThreadBlocked) {
517 if (t->why_blocked == BlockedOnBlackHole) {
518 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
519 traceEventStopThread(cap, t, t->why_blocked + 6,
520 owner != NULL ? owner->id : 0);
521 } else {
522 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
523 }
524 } else {
525 traceEventStopThread(cap, t, ret, 0);
526 }
527
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529 ASSERT(t->cap == cap);
530
531 // ----------------------------------------------------------------------
532
533 // Costs for the scheduler are assigned to CCS_SYSTEM
534 stopHeapProfTimer();
535 #if defined(PROFILING)
536 cap->r.rCCCS = CCS_SYSTEM;
537 #endif
538
539 schedulePostRunThread(cap,t);
540
541 ready_to_gc = rtsFalse;
542
543 switch (ret) {
544 case HeapOverflow:
545 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
546 break;
547
548 case StackOverflow:
549 // just adjust the stack for this thread, then pop it back
550 // on the run queue.
551 threadStackOverflow(cap, t);
552 pushOnRunQueue(cap,t);
553 break;
554
555 case ThreadYielding:
556 if (scheduleHandleYield(cap, t, prev_what_next)) {
557 // shortcut for switching between compiler/interpreter:
558 goto run_thread;
559 }
560 break;
561
562 case ThreadBlocked:
563 scheduleHandleThreadBlocked(t);
564 break;
565
566 case ThreadFinished:
567 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
569 break;
570
571 default:
572 barf("schedule: invalid thread return code %d", (int)ret);
573 }
574
575 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
576 scheduleDoGC(&cap,task,rtsFalse);
577 }
578 } /* end of while() */
579 }
580
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
584
585 void
586 removeFromRunQueue (Capability *cap, StgTSO *tso)
587 {
588 if (tso->block_info.prev == END_TSO_QUEUE) {
589 ASSERT(cap->run_queue_hd == tso);
590 cap->run_queue_hd = tso->_link;
591 } else {
592 setTSOLink(cap, tso->block_info.prev, tso->_link);
593 }
594 if (tso->_link == END_TSO_QUEUE) {
595 ASSERT(cap->run_queue_tl == tso);
596 cap->run_queue_tl = tso->block_info.prev;
597 } else {
598 setTSOPrev(cap, tso->_link, tso->block_info.prev);
599 }
600 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
601
602 IF_DEBUG(sanity, checkRunQueue(cap));
603 }
604
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
608
609 static void
610 schedulePreLoop(void)
611 {
612 // initialisation for scheduler - what cannot go into initScheduler()
613
614 #if defined(mingw32_HOST_OS) && !defined(GhcUnregisterised)
615 win32AllocStack();
616 #endif
617 }
618
619 /* -----------------------------------------------------------------------------
620 * scheduleFindWork()
621 *
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
624
625 static void
626 scheduleFindWork (Capability **pcap)
627 {
628 scheduleStartSignalHandlers(*pcap);
629
630 scheduleProcessInbox(pcap);
631
632 scheduleCheckBlockedThreads(*pcap);
633
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
636 #endif
637 }
638
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability *cap, Task *task)
642 {
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC
645 // - another Task is returning from a foreign call
646 // - the thread at the head of the run queue cannot be run
647 // by this Task (it is bound to another Task, or it is unbound
648 // and this task it bound).
649 return (pending_sync ||
650 cap->returning_tasks_hd != NULL ||
651 (!emptyRunQueue(cap) && (task->incall->tso == NULL
652 ? cap->run_queue_hd->bound != NULL
653 : cap->run_queue_hd->bound != task->incall)));
654 }
655
656 // This is the single place where a Task goes to sleep. There are
657 // two reasons it might need to sleep:
658 // - there are no threads to run
659 // - we need to yield this Capability to someone else
660 // (see shouldYieldCapability())
661 //
662 // Careful: the scheduler loop is quite delicate. Make sure you run
663 // the tests in testsuite/concurrent (all ways) after modifying this,
664 // and also check the benchmarks in nofib/parallel for regressions.
665
666 static void
667 scheduleYield (Capability **pcap, Task *task)
668 {
669 Capability *cap = *pcap;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING))
677 return;
678
679 // otherwise yield (sleep), and keep yielding if necessary.
680 do {
681 yieldCapability(&cap,task);
682 }
683 while (shouldYieldCapability(cap,task));
684
685 // note there may still be no threads on the run queue at this
686 // point, the caller has to check.
687
688 *pcap = cap;
689 return;
690 }
691 #endif
692
693 /* -----------------------------------------------------------------------------
694 * schedulePushWork()
695 *
696 * Push work to other Capabilities if we have some.
697 * -------------------------------------------------------------------------- */
698
699 static void
700 schedulePushWork(Capability *cap USED_IF_THREADS,
701 Task *task USED_IF_THREADS)
702 {
703 /* following code not for PARALLEL_HASKELL. I kept the call general,
704 future GUM versions might use pushing in a distributed setup */
705 #if defined(THREADED_RTS)
706
707 Capability *free_caps[n_capabilities], *cap0;
708 nat i, n_free_caps;
709
710 // migration can be turned off with +RTS -qm
711 if (!RtsFlags.ParFlags.migrate) return;
712
713 // Check whether we have more threads on our run queue, or sparks
714 // in our pool, that we could hand to another Capability.
715 if (cap->run_queue_hd == END_TSO_QUEUE) {
716 if (sparkPoolSizeCap(cap) < 2) return;
717 } else {
718 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
719 sparkPoolSizeCap(cap) < 1) return;
720 }
721
722 // First grab as many free Capabilities as we can.
723 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
724 cap0 = &capabilities[i];
725 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726 if (!emptyRunQueue(cap0)
727 || cap0->returning_tasks_hd != NULL
728 || cap0->inbox != (Message*)END_TSO_QUEUE) {
729 // it already has some work, we just grabbed it at
730 // the wrong moment. Or maybe it's deadlocked!
731 releaseCapability(cap0);
732 } else {
733 free_caps[n_free_caps++] = cap0;
734 }
735 }
736 }
737
738 // we now have n_free_caps free capabilities stashed in
739 // free_caps[]. Share our run queue equally with them. This is
740 // probably the simplest thing we could do; improvements we might
741 // want to do include:
742 //
743 // - giving high priority to moving relatively new threads, on
744 // the gournds that they haven't had time to build up a
745 // working set in the cache on this CPU/Capability.
746 //
747 // - giving low priority to moving long-lived threads
748
749 if (n_free_caps > 0) {
750 StgTSO *prev, *t, *next;
751 #ifdef SPARK_PUSHING
752 rtsBool pushed_to_all;
753 #endif
754
755 debugTrace(DEBUG_sched,
756 "cap %d: %s and %d free capabilities, sharing...",
757 cap->no,
758 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
759 "excess threads on run queue":"sparks to share (>=2)",
760 n_free_caps);
761
762 i = 0;
763 #ifdef SPARK_PUSHING
764 pushed_to_all = rtsFalse;
765 #endif
766
767 if (cap->run_queue_hd != END_TSO_QUEUE) {
768 prev = cap->run_queue_hd;
769 t = prev->_link;
770 prev->_link = END_TSO_QUEUE;
771 for (; t != END_TSO_QUEUE; t = next) {
772 next = t->_link;
773 t->_link = END_TSO_QUEUE;
774 if (t->bound == task->incall // don't move my bound thread
775 || tsoLocked(t)) { // don't move a locked thread
776 setTSOLink(cap, prev, t);
777 setTSOPrev(cap, t, prev);
778 prev = t;
779 } else if (i == n_free_caps) {
780 #ifdef SPARK_PUSHING
781 pushed_to_all = rtsTrue;
782 #endif
783 i = 0;
784 // keep one for us
785 setTSOLink(cap, prev, t);
786 setTSOPrev(cap, t, prev);
787 prev = t;
788 } else {
789 appendToRunQueue(free_caps[i],t);
790
791 traceEventMigrateThread (cap, t, free_caps[i]->no);
792
793 if (t->bound) { t->bound->task->cap = free_caps[i]; }
794 t->cap = free_caps[i];
795 i++;
796 }
797 }
798 cap->run_queue_tl = prev;
799
800 IF_DEBUG(sanity, checkRunQueue(cap));
801 }
802
803 #ifdef SPARK_PUSHING
804 /* JB I left this code in place, it would work but is not necessary */
805
806 // If there are some free capabilities that we didn't push any
807 // threads to, then try to push a spark to each one.
808 if (!pushed_to_all) {
809 StgClosure *spark;
810 // i is the next free capability to push to
811 for (; i < n_free_caps; i++) {
812 if (emptySparkPoolCap(free_caps[i])) {
813 spark = tryStealSpark(cap->sparks);
814 if (spark != NULL) {
815 /* TODO: if anyone wants to re-enable this code then
816 * they must consider the fizzledSpark(spark) case
817 * and update the per-cap spark statistics.
818 */
819 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
820
821 traceEventStealSpark(free_caps[i], t, cap->no);
822
823 newSpark(&(free_caps[i]->r), spark);
824 }
825 }
826 }
827 }
828 #endif /* SPARK_PUSHING */
829
830 // release the capabilities
831 for (i = 0; i < n_free_caps; i++) {
832 task->cap = free_caps[i];
833 releaseAndWakeupCapability(free_caps[i]);
834 }
835 }
836 task->cap = cap; // reset to point to our Capability.
837
838 #endif /* THREADED_RTS */
839
840 }
841
842 /* ----------------------------------------------------------------------------
843 * Start any pending signal handlers
844 * ------------------------------------------------------------------------- */
845
846 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
847 static void
848 scheduleStartSignalHandlers(Capability *cap)
849 {
850 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
851 // safe outside the lock
852 startSignalHandlers(cap);
853 }
854 }
855 #else
856 static void
857 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
858 {
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863 * Check for blocked threads that can be woken up.
864 * ------------------------------------------------------------------------- */
865
866 static void
867 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
868 {
869 #if !defined(THREADED_RTS)
870 //
871 // Check whether any waiting threads need to be woken up. If the
872 // run queue is empty, and there are no other tasks running, we
873 // can wait indefinitely for something to happen.
874 //
875 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
876 {
877 awaitEvent (emptyRunQueue(cap));
878 }
879 #endif
880 }
881
882 /* ----------------------------------------------------------------------------
883 * Detect deadlock conditions and attempt to resolve them.
884 * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleDetectDeadlock (Capability **pcap, Task *task)
888 {
889 Capability *cap = *pcap;
890 /*
891 * Detect deadlock: when we have no threads to run, there are no
892 * threads blocked, waiting for I/O, or sleeping, and all the
893 * other tasks are waiting for work, we must have a deadlock of
894 * some description.
895 */
896 if ( emptyThreadQueues(cap) )
897 {
898 #if defined(THREADED_RTS)
899 /*
900 * In the threaded RTS, we only check for deadlock if there
901 * has been no activity in a complete timeslice. This means
902 * we won't eagerly start a full GC just because we don't have
903 * any threads to run currently.
904 */
905 if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910 // Garbage collection can release some new threads due to
911 // either (a) finalizers or (b) threads resurrected because
912 // they are unreachable and will therefore be sent an
913 // exception. Any threads thus released will be immediately
914 // runnable.
915 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
916 cap = *pcap;
917 // when force_major == rtsTrue. scheduleDoGC sets
918 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
919 // signal.
920
921 if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924 /* If we have user-installed signal handlers, then wait
925 * for signals to arrive rather then bombing out with a
926 * deadlock.
927 */
928 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929 debugTrace(DEBUG_sched,
930 "still deadlocked, waiting for signals...");
931
932 awaitUserSignals();
933
934 if (signals_pending()) {
935 startSignalHandlers(cap);
936 }
937
938 // either we have threads to run, or we were interrupted:
939 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941 return;
942 }
943 #endif
944
945 #if !defined(THREADED_RTS)
946 /* Probably a real deadlock. Send the current main thread the
947 * Deadlock exception.
948 */
949 if (task->incall->tso) {
950 switch (task->incall->tso->why_blocked) {
951 case BlockedOnSTM:
952 case BlockedOnBlackHole:
953 case BlockedOnMsgThrowTo:
954 case BlockedOnMVar:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static void
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches != END_BF_QUEUE) {
979 processFetches();
980 }
981 # endif
982
983 if (RtsFlags.ParFlags.BufferTime) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
986 sendOldBuffers();
987 }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
994
995 static void
996 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
997 {
998 #if defined(THREADED_RTS)
999 Message *m, *next;
1000 int r;
1001 Capability *cap = *pcap;
1002
1003 while (!emptyInbox(cap)) {
1004 if (cap->r.rCurrentNursery->link == NULL ||
1005 g0->n_new_large_words >= large_alloc_lim) {
1006 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1007 cap = *pcap;
1008 }
1009
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1014 //
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r = TRY_ACQUIRE_LOCK(&cap->lock);
1021 if (r != 0) return;
1022
1023 m = cap->inbox;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025
1026 RELEASE_LOCK(&cap->lock);
1027
1028 while (m != (Message*)END_TSO_QUEUE) {
1029 next = m->link;
1030 executeMessage(cap, m);
1031 m = next;
1032 }
1033 }
1034 #endif
1035 }
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions (t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 /* some statistics gathering in the parallel case */
1086 }
1087
1088 /* -----------------------------------------------------------------------------
1089 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1090 * -------------------------------------------------------------------------- */
1091
1092 static rtsBool
1093 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1094 {
1095 // did the task ask for a large block?
1096 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1097 // if so, get one and push it on the front of the nursery.
1098 bdescr *bd;
1099 lnat blocks;
1100
1101 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1102
1103 if (blocks > BLOCKS_PER_MBLOCK) {
1104 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1105 }
1106
1107 debugTrace(DEBUG_sched,
1108 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1109 (long)t->id, what_next_strs[t->what_next], blocks);
1110
1111 // don't do this if the nursery is (nearly) full, we'll GC first.
1112 if (cap->r.rCurrentNursery->link != NULL ||
1113 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1114 // if the nursery has only one block.
1115
1116 bd = allocGroup_lock(blocks);
1117 cap->r.rNursery->n_blocks += blocks;
1118
1119 // link the new group into the list
1120 bd->link = cap->r.rCurrentNursery;
1121 bd->u.back = cap->r.rCurrentNursery->u.back;
1122 if (cap->r.rCurrentNursery->u.back != NULL) {
1123 cap->r.rCurrentNursery->u.back->link = bd;
1124 } else {
1125 cap->r.rNursery->blocks = bd;
1126 }
1127 cap->r.rCurrentNursery->u.back = bd;
1128
1129 // initialise it as a nursery block. We initialise the
1130 // step, gen_no, and flags field of *every* sub-block in
1131 // this large block, because this is easier than making
1132 // sure that we always find the block head of a large
1133 // block whenever we call Bdescr() (eg. evacuate() and
1134 // isAlive() in the GC would both have to do this, at
1135 // least).
1136 {
1137 bdescr *x;
1138 for (x = bd; x < bd + blocks; x++) {
1139 initBdescr(x,g0,g0);
1140 x->free = x->start;
1141 x->flags = 0;
1142 }
1143 }
1144
1145 // This assert can be a killer if the app is doing lots
1146 // of large block allocations.
1147 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1148
1149 // now update the nursery to point to the new block
1150 cap->r.rCurrentNursery = bd;
1151
1152 // we might be unlucky and have another thread get on the
1153 // run queue before us and steal the large block, but in that
1154 // case the thread will just end up requesting another large
1155 // block.
1156 pushOnRunQueue(cap,t);
1157 return rtsFalse; /* not actually GC'ing */
1158 }
1159 }
1160
1161 if (cap->r.rHpLim == NULL || cap->context_switch) {
1162 // Sometimes we miss a context switch, e.g. when calling
1163 // primitives in a tight loop, MAYBE_GC() doesn't check the
1164 // context switch flag, and we end up waiting for a GC.
1165 // See #1984, and concurrent/should_run/1984
1166 cap->context_switch = 0;
1167 appendToRunQueue(cap,t);
1168 } else {
1169 pushOnRunQueue(cap,t);
1170 }
1171 return rtsTrue;
1172 /* actual GC is done at the end of the while loop in schedule() */
1173 }
1174
1175 /* -----------------------------------------------------------------------------
1176 * Handle a thread that returned to the scheduler with ThreadYielding
1177 * -------------------------------------------------------------------------- */
1178
1179 static rtsBool
1180 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1181 {
1182 /* put the thread back on the run queue. Then, if we're ready to
1183 * GC, check whether this is the last task to stop. If so, wake
1184 * up the GC thread. getThread will block during a GC until the
1185 * GC is finished.
1186 */
1187
1188 ASSERT(t->_link == END_TSO_QUEUE);
1189
1190 // Shortcut if we're just switching evaluators: don't bother
1191 // doing stack squeezing (which can be expensive), just run the
1192 // thread.
1193 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1194 debugTrace(DEBUG_sched,
1195 "--<< thread %ld (%s) stopped to switch evaluators",
1196 (long)t->id, what_next_strs[t->what_next]);
1197 return rtsTrue;
1198 }
1199
1200 // Reset the context switch flag. We don't do this just before
1201 // running the thread, because that would mean we would lose ticks
1202 // during GC, which can lead to unfair scheduling (a thread hogs
1203 // the CPU because the tick always arrives during GC). This way
1204 // penalises threads that do a lot of allocation, but that seems
1205 // better than the alternative.
1206 if (cap->context_switch != 0) {
1207 cap->context_switch = 0;
1208 appendToRunQueue(cap,t);
1209 } else {
1210 pushOnRunQueue(cap,t);
1211 }
1212
1213 IF_DEBUG(sanity,
1214 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1215 checkTSO(t));
1216
1217 return rtsFalse;
1218 }
1219
1220 /* -----------------------------------------------------------------------------
1221 * Handle a thread that returned to the scheduler with ThreadBlocked
1222 * -------------------------------------------------------------------------- */
1223
1224 static void
1225 scheduleHandleThreadBlocked( StgTSO *t
1226 #if !defined(DEBUG)
1227 STG_UNUSED
1228 #endif
1229 )
1230 {
1231
1232 // We don't need to do anything. The thread is blocked, and it
1233 // has tidied up its stack and placed itself on whatever queue
1234 // it needs to be on.
1235
1236 // ASSERT(t->why_blocked != NotBlocked);
1237 // Not true: for example,
1238 // - the thread may have woken itself up already, because
1239 // threadPaused() might have raised a blocked throwTo
1240 // exception, see maybePerformBlockedException().
1241
1242 #ifdef DEBUG
1243 traceThreadStatus(DEBUG_sched, t);
1244 #endif
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248 * Handle a thread that returned to the scheduler with ThreadFinished
1249 * -------------------------------------------------------------------------- */
1250
1251 static rtsBool
1252 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1253 {
1254 /* Need to check whether this was a main thread, and if so,
1255 * return with the return value.
1256 *
1257 * We also end up here if the thread kills itself with an
1258 * uncaught exception, see Exception.cmm.
1259 */
1260
1261 // blocked exceptions can now complete, even if the thread was in
1262 // blocked mode (see #2910).
1263 awakenBlockedExceptionQueue (cap, t);
1264
1265 //
1266 // Check whether the thread that just completed was a bound
1267 // thread, and if so return with the result.
1268 //
1269 // There is an assumption here that all thread completion goes
1270 // through this point; we need to make sure that if a thread
1271 // ends up in the ThreadKilled state, that it stays on the run
1272 // queue so it can be dealt with here.
1273 //
1274
1275 if (t->bound) {
1276
1277 if (t->bound != task->incall) {
1278 #if !defined(THREADED_RTS)
1279 // Must be a bound thread that is not the topmost one. Leave
1280 // it on the run queue until the stack has unwound to the
1281 // point where we can deal with this. Leaving it on the run
1282 // queue also ensures that the garbage collector knows about
1283 // this thread and its return value (it gets dropped from the
1284 // step->threads list so there's no other way to find it).
1285 appendToRunQueue(cap,t);
1286 return rtsFalse;
1287 #else
1288 // this cannot happen in the threaded RTS, because a
1289 // bound thread can only be run by the appropriate Task.
1290 barf("finished bound thread that isn't mine");
1291 #endif
1292 }
1293
1294 ASSERT(task->incall->tso == t);
1295
1296 if (t->what_next == ThreadComplete) {
1297 if (task->incall->ret) {
1298 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1299 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1300 }
1301 task->incall->stat = Success;
1302 } else {
1303 if (task->incall->ret) {
1304 *(task->incall->ret) = NULL;
1305 }
1306 if (sched_state >= SCHED_INTERRUPTING) {
1307 if (heap_overflow) {
1308 task->incall->stat = HeapExhausted;
1309 } else {
1310 task->incall->stat = Interrupted;
1311 }
1312 } else {
1313 task->incall->stat = Killed;
1314 }
1315 }
1316 #ifdef DEBUG
1317 removeThreadLabel((StgWord)task->incall->tso->id);
1318 #endif
1319
1320 // We no longer consider this thread and task to be bound to
1321 // each other. The TSO lives on until it is GC'd, but the
1322 // task is about to be released by the caller, and we don't
1323 // want anyone following the pointer from the TSO to the
1324 // defunct task (which might have already been
1325 // re-used). This was a real bug: the GC updated
1326 // tso->bound->tso which lead to a deadlock.
1327 t->bound = NULL;
1328 task->incall->tso = NULL;
1329
1330 return rtsTrue; // tells schedule() to return
1331 }
1332
1333 return rtsFalse;
1334 }
1335
1336 /* -----------------------------------------------------------------------------
1337 * Perform a heap census
1338 * -------------------------------------------------------------------------- */
1339
1340 static rtsBool
1341 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1342 {
1343 // When we have +RTS -i0 and we're heap profiling, do a census at
1344 // every GC. This lets us get repeatable runs for debugging.
1345 if (performHeapProfile ||
1346 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1347 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1348 return rtsTrue;
1349 } else {
1350 return rtsFalse;
1351 }
1352 }
1353
1354 /* -----------------------------------------------------------------------------
1355 * Start a synchronisation of all capabilities
1356 * -------------------------------------------------------------------------- */
1357
1358 // Returns:
1359 // 0 if we successfully got a sync
1360 // non-0 if there was another sync request in progress,
1361 // and we yielded to it. The value returned is the
1362 // type of the other sync request.
1363 //
1364 #if defined(THREADED_RTS)
1365 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1366 {
1367 nat prev_pending_sync;
1368
1369 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1370
1371 if (prev_pending_sync)
1372 {
1373 do {
1374 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1375 prev_pending_sync);
1376 ASSERT(*pcap);
1377 yieldCapability(pcap,task);
1378 } while (pending_sync);
1379 return prev_pending_sync; // NOTE: task->cap might have changed now
1380 }
1381 else
1382 {
1383 return 0;
1384 }
1385 }
1386
1387 //
1388 // Grab all the capabilities except the one we already hold. Used
1389 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1390 // before a fork (SYNC_OTHER).
1391 //
1392 // Only call this after requestSync(), otherwise a deadlock might
1393 // ensue if another thread is trying to synchronise.
1394 //
1395 static void acquireAllCapabilities(Capability *cap, Task *task)
1396 {
1397 Capability *tmpcap;
1398 nat i;
1399
1400 for (i=0; i < n_capabilities; i++) {
1401 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1402 tmpcap = &capabilities[i];
1403 if (tmpcap != cap) {
1404 // we better hope this task doesn't get migrated to
1405 // another Capability while we're waiting for this one.
1406 // It won't, because load balancing happens while we have
1407 // all the Capabilities, but even so it's a slightly
1408 // unsavoury invariant.
1409 task->cap = tmpcap;
1410 waitForReturnCapability(&tmpcap, task);
1411 if (tmpcap->no != i) {
1412 barf("acquireAllCapabilities: got the wrong capability");
1413 }
1414 }
1415 }
1416 task->cap = cap;
1417 }
1418
1419 static void releaseAllCapabilities(Capability *cap, Task *task)
1420 {
1421 nat i;
1422
1423 for (i = 0; i < n_capabilities; i++) {
1424 if (cap->no != i) {
1425 task->cap = &capabilities[i];
1426 releaseCapability(&capabilities[i]);
1427 }
1428 }
1429 task->cap = cap;
1430 }
1431 #endif
1432
1433 /* -----------------------------------------------------------------------------
1434 * Perform a garbage collection if necessary
1435 * -------------------------------------------------------------------------- */
1436
1437 static void
1438 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1439 rtsBool force_major)
1440 {
1441 Capability *cap = *pcap;
1442 rtsBool heap_census;
1443 #ifdef THREADED_RTS
1444 rtsBool idle_cap[n_capabilities];
1445 rtsBool gc_type;
1446 nat i, sync;
1447 StgTSO *tso;
1448 #endif
1449
1450 if (sched_state == SCHED_SHUTTING_DOWN) {
1451 // The final GC has already been done, and the system is
1452 // shutting down. We'll probably deadlock if we try to GC
1453 // now.
1454 return;
1455 }
1456
1457 #ifdef THREADED_RTS
1458 if (sched_state < SCHED_INTERRUPTING
1459 && RtsFlags.ParFlags.parGcEnabled
1460 && N >= RtsFlags.ParFlags.parGcGen
1461 && ! oldest_gen->mark)
1462 {
1463 gc_type = SYNC_GC_PAR;
1464 } else {
1465 gc_type = SYNC_GC_SEQ;
1466 }
1467
1468 // In order to GC, there must be no threads running Haskell code.
1469 // Therefore, the GC thread needs to hold *all* the capabilities,
1470 // and release them after the GC has completed.
1471 //
1472 // This seems to be the simplest way: previous attempts involved
1473 // making all the threads with capabilities give up their
1474 // capabilities and sleep except for the *last* one, which
1475 // actually did the GC. But it's quite hard to arrange for all
1476 // the other tasks to sleep and stay asleep.
1477 //
1478
1479 /* Other capabilities are prevented from running yet more Haskell
1480 threads if pending_sync is set. Tested inside
1481 yieldCapability() and releaseCapability() in Capability.c */
1482
1483 do {
1484 sync = requestSync(pcap, task, gc_type);
1485 cap = *pcap;
1486 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1487 // someone else had a pending sync request for a GC, so
1488 // let's assume GC has been done and we don't need to GC
1489 // again.
1490 return;
1491 }
1492 if (sched_state == SCHED_SHUTTING_DOWN) {
1493 // The scheduler might now be shutting down. We tested
1494 // this above, but it might have become true since then as
1495 // we yielded the capability in requestSync().
1496 return;
1497 }
1498 } while (sync);
1499
1500 interruptAllCapabilities();
1501
1502 // The final shutdown GC is always single-threaded, because it's
1503 // possible that some of the Capabilities have no worker threads.
1504
1505 if (gc_type == SYNC_GC_SEQ)
1506 {
1507 traceEventRequestSeqGc(cap);
1508 }
1509 else
1510 {
1511 traceEventRequestParGc(cap);
1512 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1513 }
1514
1515 if (gc_type == SYNC_GC_SEQ)
1516 {
1517 // single-threaded GC: grab all the capabilities
1518 acquireAllCapabilities(cap,task);
1519 }
1520 else
1521 {
1522 // If we are load-balancing collections in this
1523 // generation, then we require all GC threads to participate
1524 // in the collection. Otherwise, we only require active
1525 // threads to participate, and we set gc_threads[i]->idle for
1526 // any idle capabilities. The rationale here is that waking
1527 // up an idle Capability takes much longer than just doing any
1528 // GC work on its behalf.
1529
1530 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1531 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1532 N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1533 for (i=0; i < n_capabilities; i++) {
1534 if (capabilities[i].disabled) {
1535 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1536 } else {
1537 idle_cap[i] = rtsFalse;
1538 }
1539 }
1540 } else {
1541 for (i=0; i < n_capabilities; i++) {
1542 if (capabilities[i].disabled) {
1543 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1544 } else if (i == cap->no ||
1545 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1546 idle_cap[i] = rtsFalse;
1547 } else {
1548 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1549 if (!idle_cap[i]) {
1550 n_failed_trygrab_idles++;
1551 } else {
1552 n_idle_caps++;
1553 }
1554 }
1555 }
1556 }
1557
1558 // We set the gc_thread[i]->idle flag if that
1559 // capability/thread is not participating in this collection.
1560 // We also keep a local record of which capabilities are idle
1561 // in idle_cap[], because scheduleDoGC() is re-entrant:
1562 // another thread might start a GC as soon as we've finished
1563 // this one, and thus the gc_thread[]->idle flags are invalid
1564 // as soon as we release any threads after GC. Getting this
1565 // wrong leads to a rare and hard to debug deadlock!
1566
1567 for (i=0; i < n_capabilities; i++) {
1568 gc_threads[i]->idle = idle_cap[i];
1569 capabilities[i].idle++;
1570 }
1571
1572 // For all capabilities participating in this GC, wait until
1573 // they have stopped mutating and are standing by for GC.
1574 waitForGcThreads(cap);
1575
1576 #if defined(THREADED_RTS)
1577 // Stable point where we can do a global check on our spark counters
1578 ASSERT(checkSparkCountInvariant());
1579 #endif
1580 }
1581
1582 #endif
1583
1584 IF_DEBUG(scheduler, printAllThreads());
1585
1586 delete_threads_and_gc:
1587 /*
1588 * We now have all the capabilities; if we're in an interrupting
1589 * state, then we should take the opportunity to delete all the
1590 * threads in the system.
1591 */
1592 if (sched_state == SCHED_INTERRUPTING) {
1593 deleteAllThreads(cap);
1594 #if defined(THREADED_RTS)
1595 // Discard all the sparks from every Capability. Why?
1596 // They'll probably be GC'd anyway since we've killed all the
1597 // threads. It just avoids the GC having to do any work to
1598 // figure out that any remaining sparks are garbage.
1599 for (i = 0; i < n_capabilities; i++) {
1600 capabilities[i].spark_stats.gcd +=
1601 sparkPoolSize(capabilities[i].sparks);
1602 // No race here since all Caps are stopped.
1603 discardSparksCap(&capabilities[i]);
1604 }
1605 #endif
1606 sched_state = SCHED_SHUTTING_DOWN;
1607 }
1608
1609 /*
1610 * When there are disabled capabilities, we want to migrate any
1611 * threads away from them. Normally this happens in the
1612 * scheduler's loop, but only for unbound threads - it's really
1613 * hard for a bound thread to migrate itself. So we have another
1614 * go here.
1615 */
1616 #if defined(THREADED_RTS)
1617 for (i = enabled_capabilities; i < n_capabilities; i++) {
1618 Capability *tmp_cap, *dest_cap;
1619 tmp_cap = &capabilities[i];
1620 ASSERT(tmp_cap->disabled);
1621 if (i != cap->no) {
1622 dest_cap = &capabilities[i % enabled_capabilities];
1623 while (!emptyRunQueue(tmp_cap)) {
1624 tso = popRunQueue(tmp_cap);
1625 migrateThread(tmp_cap, tso, dest_cap);
1626 if (tso->bound) { tso->bound->task->cap = dest_cap; }
1627 }
1628 }
1629 }
1630 #endif
1631
1632 heap_census = scheduleNeedHeapProfile(rtsTrue);
1633
1634 #if defined(THREADED_RTS)
1635 // reset pending_sync *before* GC, so that when the GC threads
1636 // emerge they don't immediately re-enter the GC.
1637 pending_sync = 0;
1638 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1639 #else
1640 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1641 #endif
1642
1643 traceSparkCounters(cap);
1644
1645 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1646 {
1647 // We are doing a GC because the system has been idle for a
1648 // timeslice and we need to check for deadlock. Record the
1649 // fact that we've done a GC and turn off the timer signal;
1650 // it will get re-enabled if we run any threads after the GC.
1651 recent_activity = ACTIVITY_DONE_GC;
1652 stopTimer();
1653 }
1654 else
1655 {
1656 // the GC might have taken long enough for the timer to set
1657 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1658 // necessarily deadlocked:
1659 recent_activity = ACTIVITY_YES;
1660 }
1661
1662 #if defined(THREADED_RTS)
1663 // Stable point where we can do a global check on our spark counters
1664 ASSERT(checkSparkCountInvariant());
1665 #endif
1666
1667 // The heap census itself is done during GarbageCollect().
1668 if (heap_census) {
1669 performHeapProfile = rtsFalse;
1670 }
1671
1672 #if defined(THREADED_RTS)
1673 if (gc_type == SYNC_GC_PAR)
1674 {
1675 releaseGCThreads(cap);
1676 for (i = 0; i < n_capabilities; i++) {
1677 if (i != cap->no) {
1678 if (idle_cap[i]) {
1679 ASSERT(capabilities[i].running_task == task);
1680 task->cap = &capabilities[i];
1681 releaseCapability(&capabilities[i]);
1682 } else {
1683 ASSERT(capabilities[i].running_task != task);
1684 }
1685 }
1686 }
1687 task->cap = cap;
1688 }
1689 #endif
1690
1691 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1692 // GC set the heap_overflow flag, so we should proceed with
1693 // an orderly shutdown now. Ultimately we want the main
1694 // thread to return to its caller with HeapExhausted, at which
1695 // point the caller should call hs_exit(). The first step is
1696 // to delete all the threads.
1697 //
1698 // Another way to do this would be to raise an exception in
1699 // the main thread, which we really should do because it gives
1700 // the program a chance to clean up. But how do we find the
1701 // main thread? It should presumably be the same one that
1702 // gets ^C exceptions, but that's all done on the Haskell side
1703 // (GHC.TopHandler).
1704 sched_state = SCHED_INTERRUPTING;
1705 goto delete_threads_and_gc;
1706 }
1707
1708 #ifdef SPARKBALANCE
1709 /* JB
1710 Once we are all together... this would be the place to balance all
1711 spark pools. No concurrent stealing or adding of new sparks can
1712 occur. Should be defined in Sparks.c. */
1713 balanceSparkPoolsCaps(n_capabilities, capabilities);
1714 #endif
1715
1716 #if defined(THREADED_RTS)
1717 if (gc_type == SYNC_GC_SEQ) {
1718 // release our stash of capabilities.
1719 releaseAllCapabilities(cap, task);
1720 }
1721 #endif
1722
1723 return;
1724 }
1725
1726 /* ---------------------------------------------------------------------------
1727 * Singleton fork(). Do not copy any running threads.
1728 * ------------------------------------------------------------------------- */
1729
1730 pid_t
1731 forkProcess(HsStablePtr *entry
1732 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1733 STG_UNUSED
1734 #endif
1735 )
1736 {
1737 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1738 pid_t pid;
1739 StgTSO* t,*next;
1740 Capability *cap;
1741 nat g;
1742 Task *task = NULL;
1743 nat i;
1744 #ifdef THREADED_RTS
1745 nat sync;
1746 #endif
1747
1748 debugTrace(DEBUG_sched, "forking!");
1749
1750 task = newBoundTask();
1751
1752 cap = NULL;
1753 waitForReturnCapability(&cap, task);
1754
1755 #ifdef THREADED_RTS
1756 do {
1757 sync = requestSync(&cap, task, SYNC_OTHER);
1758 } while (sync);
1759
1760 acquireAllCapabilities(cap,task);
1761
1762 pending_sync = 0;
1763 #endif
1764
1765 // no funny business: hold locks while we fork, otherwise if some
1766 // other thread is holding a lock when the fork happens, the data
1767 // structure protected by the lock will forever be in an
1768 // inconsistent state in the child. See also #1391.
1769 ACQUIRE_LOCK(&sched_mutex);
1770 ACQUIRE_LOCK(&sm_mutex);
1771 ACQUIRE_LOCK(&stable_mutex);
1772 ACQUIRE_LOCK(&task->lock);
1773
1774 for (i=0; i < n_capabilities; i++) {
1775 ACQUIRE_LOCK(&capabilities[i].lock);
1776 }
1777
1778 stopTimer(); // See #4074
1779
1780 #if defined(TRACING)
1781 flushEventLog(); // so that child won't inherit dirty file buffers
1782 #endif
1783
1784 pid = fork();
1785
1786 if (pid) { // parent
1787
1788 startTimer(); // #4074
1789
1790 RELEASE_LOCK(&sched_mutex);
1791 RELEASE_LOCK(&sm_mutex);
1792 RELEASE_LOCK(&stable_mutex);
1793 RELEASE_LOCK(&task->lock);
1794
1795 for (i=0; i < n_capabilities; i++) {
1796 releaseCapability_(&capabilities[i],rtsFalse);
1797 RELEASE_LOCK(&capabilities[i].lock);
1798 }
1799 boundTaskExiting(task);
1800
1801 // just return the pid
1802 return pid;
1803
1804 } else { // child
1805
1806 #if defined(THREADED_RTS)
1807 initMutex(&sched_mutex);
1808 initMutex(&sm_mutex);
1809 initMutex(&stable_mutex);
1810 initMutex(&task->lock);
1811
1812 for (i=0; i < n_capabilities; i++) {
1813 initMutex(&capabilities[i].lock);
1814 }
1815 #endif
1816
1817 #ifdef TRACING
1818 resetTracing();
1819 #endif
1820
1821 // Now, all OS threads except the thread that forked are
1822 // stopped. We need to stop all Haskell threads, including
1823 // those involved in foreign calls. Also we need to delete
1824 // all Tasks, because they correspond to OS threads that are
1825 // now gone.
1826
1827 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1828 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1829 next = t->global_link;
1830 // don't allow threads to catch the ThreadKilled
1831 // exception, but we do want to raiseAsync() because these
1832 // threads may be evaluating thunks that we need later.
1833 deleteThread_(t->cap,t);
1834
1835 // stop the GC from updating the InCall to point to
1836 // the TSO. This is only necessary because the
1837 // OSThread bound to the TSO has been killed, and
1838 // won't get a chance to exit in the usual way (see
1839 // also scheduleHandleThreadFinished).
1840 t->bound = NULL;
1841 }
1842 }
1843
1844 discardTasksExcept(task);
1845
1846 for (i=0; i < n_capabilities; i++) {
1847 cap = &capabilities[i];
1848
1849 // Empty the run queue. It seems tempting to let all the
1850 // killed threads stay on the run queue as zombies to be
1851 // cleaned up later, but some of them may correspond to
1852 // bound threads for which the corresponding Task does not
1853 // exist.
1854 cap->run_queue_hd = END_TSO_QUEUE;
1855 cap->run_queue_tl = END_TSO_QUEUE;
1856
1857 // Any suspended C-calling Tasks are no more, their OS threads
1858 // don't exist now:
1859 cap->suspended_ccalls = NULL;
1860
1861 #if defined(THREADED_RTS)
1862 // Wipe our spare workers list, they no longer exist. New
1863 // workers will be created if necessary.
1864 cap->spare_workers = NULL;
1865 cap->n_spare_workers = 0;
1866 cap->returning_tasks_hd = NULL;
1867 cap->returning_tasks_tl = NULL;
1868 #endif
1869
1870 // Release all caps except 0, we'll use that for starting
1871 // the IO manager and running the client action below.
1872 if (cap->no != 0) {
1873 task->cap = cap;
1874 releaseCapability(cap);
1875 }
1876 }
1877 cap = &capabilities[0];
1878 task->cap = cap;
1879
1880 // Empty the threads lists. Otherwise, the garbage
1881 // collector may attempt to resurrect some of these threads.
1882 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1883 generations[g].threads = END_TSO_QUEUE;
1884 }
1885
1886 // On Unix, all timers are reset in the child, so we need to start
1887 // the timer again.
1888 initTimer();
1889 startTimer();
1890
1891 #if defined(THREADED_RTS)
1892 ioManagerStartCap(&cap);
1893 #endif
1894
1895 rts_evalStableIO(&cap, entry, NULL); // run the action
1896 rts_checkSchedStatus("forkProcess",cap);
1897
1898 rts_unlock(cap);
1899 hs_exit(); // clean up and exit
1900 stg_exit(EXIT_SUCCESS);
1901 }
1902 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1903 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1904 #endif
1905 }
1906
1907 /* ---------------------------------------------------------------------------
1908 * Changing the number of Capabilities
1909 *
1910 * Changing the number of Capabilities is very tricky! We can only do
1911 * it with the system fully stopped, so we do a full sync with
1912 * requestSync(SYNC_OTHER) and grab all the capabilities.
1913 *
1914 * Then we resize the appropriate data structures, and update all
1915 * references to the old data structures which have now moved.
1916 * Finally we release the Capabilities we are holding, and start
1917 * worker Tasks on the new Capabilities we created.
1918 *
1919 * ------------------------------------------------------------------------- */
1920
1921 void
1922 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1923 {
1924 #if !defined(THREADED_RTS)
1925 if (new_n_capabilities != 1) {
1926 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1927 }
1928 return;
1929 #elif defined(NOSMP)
1930 if (new_n_capabilities != 1) {
1931 errorBelch("setNumCapabilities: not supported on this platform");
1932 }
1933 return;
1934 #else
1935 Task *task;
1936 Capability *cap;
1937 nat sync;
1938 StgTSO* t;
1939 nat g, n;
1940 Capability *old_capabilities = NULL;
1941
1942 if (new_n_capabilities == enabled_capabilities) return;
1943
1944 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1945 enabled_capabilities, new_n_capabilities);
1946
1947 cap = rts_lock();
1948 task = cap->running_task;
1949
1950 do {
1951 sync = requestSync(&cap, task, SYNC_OTHER);
1952 } while (sync);
1953
1954 acquireAllCapabilities(cap,task);
1955
1956 pending_sync = 0;
1957
1958 if (new_n_capabilities < enabled_capabilities)
1959 {
1960 // Reducing the number of capabilities: we do not actually
1961 // remove the extra capabilities, we just mark them as
1962 // "disabled". This has the following effects:
1963 //
1964 // - threads on a disabled capability are migrated away by the
1965 // scheduler loop
1966 //
1967 // - disabled capabilities do not participate in GC
1968 // (see scheduleDoGC())
1969 //
1970 // - No spark threads are created on this capability
1971 // (see scheduleActivateSpark())
1972 //
1973 // - We do not attempt to migrate threads *to* a disabled
1974 // capability (see schedulePushWork()).
1975 //
1976 // but in other respects, a disabled capability remains
1977 // alive. Threads may be woken up on a disabled capability,
1978 // but they will be immediately migrated away.
1979 //
1980 // This approach is much easier than trying to actually remove
1981 // the capability; we don't have to worry about GC data
1982 // structures, the nursery, etc.
1983 //
1984 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
1985 capabilities[n].disabled = rtsTrue;
1986 traceCapDisable(&capabilities[n]);
1987 }
1988 enabled_capabilities = new_n_capabilities;
1989 }
1990 else
1991 {
1992 // Increasing the number of enabled capabilities.
1993 //
1994 // enable any disabled capabilities, up to the required number
1995 for (n = enabled_capabilities;
1996 n < new_n_capabilities && n < n_capabilities; n++) {
1997 capabilities[n].disabled = rtsFalse;
1998 traceCapEnable(&capabilities[n]);
1999 }
2000 enabled_capabilities = n;
2001
2002 if (new_n_capabilities > n_capabilities) {
2003 #if defined(TRACING)
2004 // Allocate eventlog buffers for the new capabilities. Note this
2005 // must be done before calling moreCapabilities(), because that
2006 // will emit events about creating the new capabilities and adding
2007 // them to existing capsets.
2008 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2009 #endif
2010
2011 // Resize the capabilities array
2012 // NB. after this, capabilities points somewhere new. Any pointers
2013 // of type (Capability *) are now invalid.
2014 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2015
2016 // update our own cap pointer
2017 cap = &capabilities[cap->no];
2018
2019 // Resize and update storage manager data structures
2020 storageAddCapabilities(n_capabilities, new_n_capabilities);
2021
2022 // Update (Capability *) refs in the Task manager.
2023 updateCapabilityRefs();
2024
2025 // Update (Capability *) refs from TSOs
2026 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2027 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2028 t->cap = &capabilities[t->cap->no];
2029 }
2030 }
2031 }
2032 }
2033
2034 // We're done: release the original Capabilities
2035 releaseAllCapabilities(cap,task);
2036
2037 // Start worker tasks on the new Capabilities
2038 startWorkerTasks(n_capabilities, new_n_capabilities);
2039
2040 // finally, update n_capabilities
2041 if (new_n_capabilities > n_capabilities) {
2042 n_capabilities = enabled_capabilities = new_n_capabilities;
2043 }
2044
2045 // We can't free the old array until now, because we access it
2046 // while updating pointers in updateCapabilityRefs().
2047 if (old_capabilities) {
2048 stgFree(old_capabilities);
2049 }
2050
2051 rts_unlock(cap);
2052
2053 #endif // THREADED_RTS
2054 }
2055
2056
2057
2058 /* ---------------------------------------------------------------------------
2059 * Delete all the threads in the system
2060 * ------------------------------------------------------------------------- */
2061
2062 static void
2063 deleteAllThreads ( Capability *cap )
2064 {
2065 // NOTE: only safe to call if we own all capabilities.
2066
2067 StgTSO* t, *next;
2068 nat g;
2069
2070 debugTrace(DEBUG_sched,"deleting all threads");
2071 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2072 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2073 next = t->global_link;
2074 deleteThread(cap,t);
2075 }
2076 }
2077
2078 // The run queue now contains a bunch of ThreadKilled threads. We
2079 // must not throw these away: the main thread(s) will be in there
2080 // somewhere, and the main scheduler loop has to deal with it.
2081 // Also, the run queue is the only thing keeping these threads from
2082 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2083
2084 #if !defined(THREADED_RTS)
2085 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2086 ASSERT(sleeping_queue == END_TSO_QUEUE);
2087 #endif
2088 }
2089
2090 /* -----------------------------------------------------------------------------
2091 Managing the suspended_ccalls list.
2092 Locks required: sched_mutex
2093 -------------------------------------------------------------------------- */
2094
2095 STATIC_INLINE void
2096 suspendTask (Capability *cap, Task *task)
2097 {
2098 InCall *incall;
2099
2100 incall = task->incall;
2101 ASSERT(incall->next == NULL && incall->prev == NULL);
2102 incall->next = cap->suspended_ccalls;
2103 incall->prev = NULL;
2104 if (cap->suspended_ccalls) {
2105 cap->suspended_ccalls->prev = incall;
2106 }
2107 cap->suspended_ccalls = incall;
2108 }
2109
2110 STATIC_INLINE void
2111 recoverSuspendedTask (Capability *cap, Task *task)
2112 {
2113 InCall *incall;
2114
2115 incall = task->incall;
2116 if (incall->prev) {
2117 incall->prev->next = incall->next;
2118 } else {
2119 ASSERT(cap->suspended_ccalls == incall);
2120 cap->suspended_ccalls = incall->next;
2121 }
2122 if (incall->next) {
2123 incall->next->prev = incall->prev;
2124 }
2125 incall->next = incall->prev = NULL;
2126 }
2127
2128 /* ---------------------------------------------------------------------------
2129 * Suspending & resuming Haskell threads.
2130 *
2131 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2132 * its capability before calling the C function. This allows another
2133 * task to pick up the capability and carry on running Haskell
2134 * threads. It also means that if the C call blocks, it won't lock
2135 * the whole system.
2136 *
2137 * The Haskell thread making the C call is put to sleep for the
2138 * duration of the call, on the suspended_ccalling_threads queue. We
2139 * give out a token to the task, which it can use to resume the thread
2140 * on return from the C function.
2141 *
2142 * If this is an interruptible C call, this means that the FFI call may be
2143 * unceremoniously terminated and should be scheduled on an
2144 * unbound worker thread.
2145 * ------------------------------------------------------------------------- */
2146
2147 void *
2148 suspendThread (StgRegTable *reg, rtsBool interruptible)
2149 {
2150 Capability *cap;
2151 int saved_errno;
2152 StgTSO *tso;
2153 Task *task;
2154 #if mingw32_HOST_OS
2155 StgWord32 saved_winerror;
2156 #endif
2157
2158 saved_errno = errno;
2159 #if mingw32_HOST_OS
2160 saved_winerror = GetLastError();
2161 #endif
2162
2163 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2164 */
2165 cap = regTableToCapability(reg);
2166
2167 task = cap->running_task;
2168 tso = cap->r.rCurrentTSO;
2169
2170 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2171
2172 // XXX this might not be necessary --SDM
2173 tso->what_next = ThreadRunGHC;
2174
2175 threadPaused(cap,tso);
2176
2177 if (interruptible) {
2178 tso->why_blocked = BlockedOnCCall_Interruptible;
2179 } else {
2180 tso->why_blocked = BlockedOnCCall;
2181 }
2182
2183 // Hand back capability
2184 task->incall->suspended_tso = tso;
2185 task->incall->suspended_cap = cap;
2186
2187 ACQUIRE_LOCK(&cap->lock);
2188
2189 suspendTask(cap,task);
2190 cap->in_haskell = rtsFalse;
2191 releaseCapability_(cap,rtsFalse);
2192
2193 RELEASE_LOCK(&cap->lock);
2194
2195 errno = saved_errno;
2196 #if mingw32_HOST_OS
2197 SetLastError(saved_winerror);
2198 #endif
2199 return task;
2200 }
2201
2202 StgRegTable *
2203 resumeThread (void *task_)
2204 {
2205 StgTSO *tso;
2206 InCall *incall;
2207 Capability *cap;
2208 Task *task = task_;
2209 int saved_errno;
2210 #if mingw32_HOST_OS
2211 StgWord32 saved_winerror;
2212 #endif
2213
2214 saved_errno = errno;
2215 #if mingw32_HOST_OS
2216 saved_winerror = GetLastError();
2217 #endif
2218
2219 incall = task->incall;
2220 cap = incall->suspended_cap;
2221 task->cap = cap;
2222
2223 // Wait for permission to re-enter the RTS with the result.
2224 waitForReturnCapability(&cap,task);
2225 // we might be on a different capability now... but if so, our
2226 // entry on the suspended_ccalls list will also have been
2227 // migrated.
2228
2229 // Remove the thread from the suspended list
2230 recoverSuspendedTask(cap,task);
2231
2232 tso = incall->suspended_tso;
2233 incall->suspended_tso = NULL;
2234 incall->suspended_cap = NULL;
2235 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2236
2237 traceEventRunThread(cap, tso);
2238
2239 /* Reset blocking status */
2240 tso->why_blocked = NotBlocked;
2241
2242 if ((tso->flags & TSO_BLOCKEX) == 0) {
2243 // avoid locking the TSO if we don't have to
2244 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2245 maybePerformBlockedException(cap,tso);
2246 }
2247 }
2248
2249 cap->r.rCurrentTSO = tso;
2250 cap->in_haskell = rtsTrue;
2251 errno = saved_errno;
2252 #if mingw32_HOST_OS
2253 SetLastError(saved_winerror);
2254 #endif
2255
2256 /* We might have GC'd, mark the TSO dirty again */
2257 dirty_TSO(cap,tso);
2258 dirty_STACK(cap,tso->stackobj);
2259
2260 IF_DEBUG(sanity, checkTSO(tso));
2261
2262 return &cap->r;
2263 }
2264
2265 /* ---------------------------------------------------------------------------
2266 * scheduleThread()
2267 *
2268 * scheduleThread puts a thread on the end of the runnable queue.
2269 * This will usually be done immediately after a thread is created.
2270 * The caller of scheduleThread must create the thread using e.g.
2271 * createThread and push an appropriate closure
2272 * on this thread's stack before the scheduler is invoked.
2273 * ------------------------------------------------------------------------ */
2274
2275 void
2276 scheduleThread(Capability *cap, StgTSO *tso)
2277 {
2278 // The thread goes at the *end* of the run-queue, to avoid possible
2279 // starvation of any threads already on the queue.
2280 appendToRunQueue(cap,tso);
2281 }
2282
2283 void
2284 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2285 {
2286 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2287 // move this thread from now on.
2288 #if defined(THREADED_RTS)
2289 cpu %= enabled_capabilities;
2290 if (cpu == cap->no) {
2291 appendToRunQueue(cap,tso);
2292 } else {
2293 migrateThread(cap, tso, &capabilities[cpu]);
2294 }
2295 #else
2296 appendToRunQueue(cap,tso);
2297 #endif
2298 }
2299
2300 void
2301 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2302 {
2303 Task *task;
2304 DEBUG_ONLY( StgThreadID id );
2305 Capability *cap;
2306
2307 cap = *pcap;
2308
2309 // We already created/initialised the Task
2310 task = cap->running_task;
2311
2312 // This TSO is now a bound thread; make the Task and TSO
2313 // point to each other.
2314 tso->bound = task->incall;
2315 tso->cap = cap;
2316
2317 task->incall->tso = tso;
2318 task->incall->ret = ret;
2319 task->incall->stat = NoStatus;
2320
2321 appendToRunQueue(cap,tso);
2322
2323 DEBUG_ONLY( id = tso->id );
2324 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2325
2326 cap = schedule(cap,task);
2327
2328 ASSERT(task->incall->stat != NoStatus);
2329 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2330
2331 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2332 *pcap = cap;
2333 }
2334
2335 /* ----------------------------------------------------------------------------
2336 * Starting Tasks
2337 * ------------------------------------------------------------------------- */
2338
2339 #if defined(THREADED_RTS)
2340 void scheduleWorker (Capability *cap, Task *task)
2341 {
2342 // schedule() runs without a lock.
2343 cap = schedule(cap,task);
2344
2345 // On exit from schedule(), we have a Capability, but possibly not
2346 // the same one we started with.
2347
2348 // During shutdown, the requirement is that after all the
2349 // Capabilities are shut down, all workers that are shutting down
2350 // have finished workerTaskStop(). This is why we hold on to
2351 // cap->lock until we've finished workerTaskStop() below.
2352 //
2353 // There may be workers still involved in foreign calls; those
2354 // will just block in waitForReturnCapability() because the
2355 // Capability has been shut down.
2356 //
2357 ACQUIRE_LOCK(&cap->lock);
2358 releaseCapability_(cap,rtsFalse);
2359 workerTaskStop(task);
2360 RELEASE_LOCK(&cap->lock);
2361 }
2362 #endif
2363
2364 /* ---------------------------------------------------------------------------
2365 * Start new worker tasks on Capabilities from--to
2366 * -------------------------------------------------------------------------- */
2367
2368 static void
2369 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2370 {
2371 #if defined(THREADED_RTS)
2372 nat i;
2373 Capability *cap;
2374
2375 for (i = from; i < to; i++) {
2376 cap = &capabilities[i];
2377 ACQUIRE_LOCK(&cap->lock);
2378 startWorkerTask(cap);
2379 RELEASE_LOCK(&cap->lock);
2380 }
2381 #endif
2382 }
2383
2384 /* ---------------------------------------------------------------------------
2385 * initScheduler()
2386 *
2387 * Initialise the scheduler. This resets all the queues - if the
2388 * queues contained any threads, they'll be garbage collected at the
2389 * next pass.
2390 *
2391 * ------------------------------------------------------------------------ */
2392
2393 void
2394 initScheduler(void)
2395 {
2396 #if !defined(THREADED_RTS)
2397 blocked_queue_hd = END_TSO_QUEUE;
2398 blocked_queue_tl = END_TSO_QUEUE;
2399 sleeping_queue = END_TSO_QUEUE;
2400 #endif
2401
2402 sched_state = SCHED_RUNNING;
2403 recent_activity = ACTIVITY_YES;
2404
2405 #if defined(THREADED_RTS)
2406 /* Initialise the mutex and condition variables used by
2407 * the scheduler. */
2408 initMutex(&sched_mutex);
2409 #endif
2410
2411 ACQUIRE_LOCK(&sched_mutex);
2412
2413 /* A capability holds the state a native thread needs in
2414 * order to execute STG code. At least one capability is
2415 * floating around (only THREADED_RTS builds have more than one).
2416 */
2417 initCapabilities();
2418
2419 initTaskManager();
2420
2421 /*
2422 * Eagerly start one worker to run each Capability, except for
2423 * Capability 0. The idea is that we're probably going to start a
2424 * bound thread on Capability 0 pretty soon, so we don't want a
2425 * worker task hogging it.
2426 */
2427 startWorkerTasks(1, n_capabilities);
2428
2429 RELEASE_LOCK(&sched_mutex);
2430
2431 }
2432
2433 void
2434 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2435 /* see Capability.c, shutdownCapability() */
2436 {
2437 Task *task = NULL;
2438
2439 task = newBoundTask();
2440
2441 // If we haven't killed all the threads yet, do it now.
2442 if (sched_state < SCHED_SHUTTING_DOWN) {
2443 sched_state = SCHED_INTERRUPTING;
2444 Capability *cap = task->cap;
2445 waitForReturnCapability(&cap,task);
2446 scheduleDoGC(&cap,task,rtsFalse);
2447 ASSERT(task->incall->tso == NULL);
2448 releaseCapability(cap);
2449 }
2450 sched_state = SCHED_SHUTTING_DOWN;
2451
2452 shutdownCapabilities(task, wait_foreign);
2453
2454 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2455 // n_failed_trygrab_idles, n_idle_caps);
2456
2457 boundTaskExiting(task);
2458 }
2459
2460 void
2461 freeScheduler( void )
2462 {
2463 nat still_running;
2464
2465 ACQUIRE_LOCK(&sched_mutex);
2466 still_running = freeTaskManager();
2467 // We can only free the Capabilities if there are no Tasks still
2468 // running. We might have a Task about to return from a foreign
2469 // call into waitForReturnCapability(), for example (actually,
2470 // this should be the *only* thing that a still-running Task can
2471 // do at this point, and it will block waiting for the
2472 // Capability).
2473 if (still_running == 0) {
2474 freeCapabilities();
2475 if (n_capabilities != 1) {
2476 stgFree(capabilities);
2477 }
2478 }
2479 RELEASE_LOCK(&sched_mutex);
2480 #if defined(THREADED_RTS)
2481 closeMutex(&sched_mutex);
2482 #endif
2483 }
2484
2485 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2486 void *user USED_IF_NOT_THREADS)
2487 {
2488 #if !defined(THREADED_RTS)
2489 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2490 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2491 evac(user, (StgClosure **)(void *)&sleeping_queue);
2492 #endif
2493 }
2494
2495 /* -----------------------------------------------------------------------------
2496 performGC
2497
2498 This is the interface to the garbage collector from Haskell land.
2499 We provide this so that external C code can allocate and garbage
2500 collect when called from Haskell via _ccall_GC.
2501 -------------------------------------------------------------------------- */
2502
2503 static void
2504 performGC_(rtsBool force_major)
2505 {
2506 Task *task;
2507 Capability *cap = NULL;
2508
2509 // We must grab a new Task here, because the existing Task may be
2510 // associated with a particular Capability, and chained onto the
2511 // suspended_ccalls queue.
2512 task = newBoundTask();
2513
2514 waitForReturnCapability(&cap,task);
2515 scheduleDoGC(&cap,task,force_major);
2516 releaseCapability(cap);
2517 boundTaskExiting(task);
2518 }
2519
2520 void
2521 performGC(void)
2522 {
2523 performGC_(rtsFalse);
2524 }
2525
2526 void
2527 performMajorGC(void)
2528 {
2529 performGC_(rtsTrue);
2530 }
2531
2532 /* ---------------------------------------------------------------------------
2533 Interrupt execution
2534 - usually called inside a signal handler so it mustn't do anything fancy.
2535 ------------------------------------------------------------------------ */
2536
2537 void
2538 interruptStgRts(void)
2539 {
2540 sched_state = SCHED_INTERRUPTING;
2541 interruptAllCapabilities();
2542 #if defined(THREADED_RTS)
2543 wakeUpRts();
2544 #endif
2545 }
2546
2547 /* -----------------------------------------------------------------------------
2548 Wake up the RTS
2549
2550 This function causes at least one OS thread to wake up and run the
2551 scheduler loop. It is invoked when the RTS might be deadlocked, or
2552 an external event has arrived that may need servicing (eg. a
2553 keyboard interrupt).
2554
2555 In the single-threaded RTS we don't do anything here; we only have
2556 one thread anyway, and the event that caused us to want to wake up
2557 will have interrupted any blocking system call in progress anyway.
2558 -------------------------------------------------------------------------- */
2559
2560 #if defined(THREADED_RTS)
2561 void wakeUpRts(void)
2562 {
2563 // This forces the IO Manager thread to wakeup, which will
2564 // in turn ensure that some OS thread wakes up and runs the
2565 // scheduler loop, which will cause a GC and deadlock check.
2566 ioManagerWakeup();
2567 }
2568 #endif
2569
2570 /* -----------------------------------------------------------------------------
2571 Deleting threads
2572
2573 This is used for interruption (^C) and forking, and corresponds to
2574 raising an exception but without letting the thread catch the
2575 exception.
2576 -------------------------------------------------------------------------- */
2577
2578 static void
2579 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2580 {
2581 // NOTE: must only be called on a TSO that we have exclusive
2582 // access to, because we will call throwToSingleThreaded() below.
2583 // The TSO must be on the run queue of the Capability we own, or
2584 // we must own all Capabilities.
2585
2586 if (tso->why_blocked != BlockedOnCCall &&
2587 tso->why_blocked != BlockedOnCCall_Interruptible) {
2588 throwToSingleThreaded(tso->cap,tso,NULL);
2589 }
2590 }
2591
2592 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2593 static void
2594 deleteThread_(Capability *cap, StgTSO *tso)
2595 { // for forkProcess only:
2596 // like deleteThread(), but we delete threads in foreign calls, too.
2597
2598 if (tso->why_blocked == BlockedOnCCall ||
2599 tso->why_blocked == BlockedOnCCall_Interruptible) {
2600 tso->what_next = ThreadKilled;
2601 appendToRunQueue(tso->cap, tso);
2602 } else {
2603 deleteThread(cap,tso);
2604 }
2605 }
2606 #endif
2607
2608 /* -----------------------------------------------------------------------------
2609 raiseExceptionHelper
2610
2611 This function is called by the raise# primitve, just so that we can
2612 move some of the tricky bits of raising an exception from C-- into
2613 C. Who knows, it might be a useful re-useable thing here too.
2614 -------------------------------------------------------------------------- */
2615
2616 StgWord
2617 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2618 {
2619 Capability *cap = regTableToCapability(reg);
2620 StgThunk *raise_closure = NULL;
2621 StgPtr p, next;
2622 StgRetInfoTable *info;
2623 //
2624 // This closure represents the expression 'raise# E' where E
2625 // is the exception raise. It is used to overwrite all the
2626 // thunks which are currently under evaluataion.
2627 //
2628
2629 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2630 // LDV profiling: stg_raise_info has THUNK as its closure
2631 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2632 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2633 // 1 does not cause any problem unless profiling is performed.
2634 // However, when LDV profiling goes on, we need to linearly scan
2635 // small object pool, where raise_closure is stored, so we should
2636 // use MIN_UPD_SIZE.
2637 //
2638 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2639 // sizeofW(StgClosure)+1);
2640 //
2641
2642 //
2643 // Walk up the stack, looking for the catch frame. On the way,
2644 // we update any closures pointed to from update frames with the
2645 // raise closure that we just built.
2646 //
2647 p = tso->stackobj->sp;
2648 while(1) {
2649 info = get_ret_itbl((StgClosure *)p);
2650 next = p + stack_frame_sizeW((StgClosure *)p);
2651 switch (info->i.type) {
2652
2653 case UPDATE_FRAME:
2654 // Only create raise_closure if we need to.
2655 if (raise_closure == NULL) {
2656 raise_closure =
2657 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2658 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2659 raise_closure->payload[0] = exception;
2660 }
2661 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2662 (StgClosure *)raise_closure);
2663 p = next;
2664 continue;
2665
2666 case ATOMICALLY_FRAME:
2667 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2668 tso->stackobj->sp = p;
2669 return ATOMICALLY_FRAME;
2670
2671 case CATCH_FRAME:
2672 tso->stackobj->sp = p;
2673 return CATCH_FRAME;
2674
2675 case CATCH_STM_FRAME:
2676 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2677 tso->stackobj->sp = p;
2678 return CATCH_STM_FRAME;
2679
2680 case UNDERFLOW_FRAME:
2681 tso->stackobj->sp = p;
2682 threadStackUnderflow(cap,tso);
2683 p = tso->stackobj->sp;
2684 continue;
2685
2686 case STOP_FRAME:
2687 tso->stackobj->sp = p;
2688 return STOP_FRAME;
2689
2690 case CATCH_RETRY_FRAME:
2691 default:
2692 p = next;
2693 continue;
2694 }
2695 }
2696 }
2697
2698
2699 /* -----------------------------------------------------------------------------
2700 findRetryFrameHelper
2701
2702 This function is called by the retry# primitive. It traverses the stack
2703 leaving tso->sp referring to the frame which should handle the retry.
2704
2705 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2706 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2707
2708 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2709 create) because retries are not considered to be exceptions, despite the
2710 similar implementation.
2711
2712 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2713 not be created within memory transactions.
2714 -------------------------------------------------------------------------- */
2715
2716 StgWord
2717 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2718 {
2719 StgPtr p, next;
2720 StgRetInfoTable *info;
2721
2722 p = tso->stackobj->sp;
2723 while (1) {
2724 info = get_ret_itbl((StgClosure *)p);
2725 next = p + stack_frame_sizeW((StgClosure *)p);
2726 switch (info->i.type) {
2727
2728 case ATOMICALLY_FRAME:
2729 debugTrace(DEBUG_stm,
2730 "found ATOMICALLY_FRAME at %p during retry", p);
2731 tso->stackobj->sp = p;
2732 return ATOMICALLY_FRAME;
2733
2734 case CATCH_RETRY_FRAME:
2735 debugTrace(DEBUG_stm,
2736 "found CATCH_RETRY_FRAME at %p during retrry", p);
2737 tso->stackobj->sp = p;
2738 return CATCH_RETRY_FRAME;
2739
2740 case CATCH_STM_FRAME: {
2741 StgTRecHeader *trec = tso -> trec;
2742 StgTRecHeader *outer = trec -> enclosing_trec;
2743 debugTrace(DEBUG_stm,
2744 "found CATCH_STM_FRAME at %p during retry", p);
2745 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2746 stmAbortTransaction(cap, trec);
2747 stmFreeAbortedTRec(cap, trec);
2748 tso -> trec = outer;
2749 p = next;
2750 continue;
2751 }
2752
2753 case UNDERFLOW_FRAME:
2754 threadStackUnderflow(cap,tso);
2755 p = tso->stackobj->sp;
2756 continue;
2757
2758 default:
2759 ASSERT(info->i.type != CATCH_FRAME);
2760 ASSERT(info->i.type != STOP_FRAME);
2761 p = next;
2762 continue;
2763 }
2764 }
2765 }
2766
2767 /* -----------------------------------------------------------------------------
2768 resurrectThreads is called after garbage collection on the list of
2769 threads found to be garbage. Each of these threads will be woken
2770 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2771 on an MVar, or NonTermination if the thread was blocked on a Black
2772 Hole.
2773
2774 Locks: assumes we hold *all* the capabilities.
2775 -------------------------------------------------------------------------- */
2776
2777 void
2778 resurrectThreads (StgTSO *threads)
2779 {
2780 StgTSO *tso, *next;
2781 Capability *cap;
2782 generation *gen;
2783
2784 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2785 next = tso->global_link;
2786
2787 gen = Bdescr((P_)tso)->gen;
2788 tso->global_link = gen->threads;
2789 gen->threads = tso;
2790
2791 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2792
2793 // Wake up the thread on the Capability it was last on
2794 cap = tso->cap;
2795
2796 switch (tso->why_blocked) {
2797 case BlockedOnMVar:
2798 /* Called by GC - sched_mutex lock is currently held. */
2799 throwToSingleThreaded(cap, tso,
2800 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2801 break;
2802 case BlockedOnBlackHole:
2803 throwToSingleThreaded(cap, tso,
2804 (StgClosure *)nonTermination_closure);
2805 break;
2806 case BlockedOnSTM:
2807 throwToSingleThreaded(cap, tso,
2808 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2809 break;
2810 case NotBlocked:
2811 /* This might happen if the thread was blocked on a black hole
2812 * belonging to a thread that we've just woken up (raiseAsync
2813 * can wake up threads, remember...).
2814 */
2815 continue;
2816 case BlockedOnMsgThrowTo:
2817 // This can happen if the target is masking, blocks on a
2818 // black hole, and then is found to be unreachable. In
2819 // this case, we want to let the target wake up and carry
2820 // on, and do nothing to this thread.
2821 continue;
2822 default:
2823 barf("resurrectThreads: thread blocked in a strange way: %d",
2824 tso->why_blocked);
2825 }
2826 }
2827 }