Merge branch 'master' of http://darcs.haskell.org//ghc
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These function all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability **pcap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability **cap);
149 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163
164 static void deleteThread (Capability *cap, StgTSO *tso);
165 static void deleteAllThreads (Capability *cap);
166
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability *cap, StgTSO *tso);
169 #endif
170
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
173
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
176
177 * out of heap space
178 * timer expires (thread yields)
179 * thread blocks
180 * thread ends
181 * stack overflow
182
183 GRAN version:
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
189
190 GUM version:
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
198
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
204
205 ------------------------------------------------------------------------ */
206
207 static Capability *
208 schedule (Capability *initialCapability, Task *task)
209 {
210 StgTSO *t;
211 Capability *cap;
212 StgThreadReturnCode ret;
213 nat prev_what_next;
214 rtsBool ready_to_gc;
215 #if defined(THREADED_RTS)
216 rtsBool first = rtsTrue;
217 #endif
218
219 cap = initialCapability;
220
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
224
225 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226
227 schedulePreLoop();
228
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
231
232 while (1) {
233
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
236 // call).
237 if (cap->in_haskell) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE);
241 }
242
243 // The interruption / shutdown sequence.
244 //
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
250 //
251 // So the sequence for ^C goes like this:
252 //
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
255 //
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
262 //
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
264 //
265 // * sched_state := SCHED_SHUTTING_DOWN
266 //
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
270 //
271 // * eventually all Capabilities will shut down, and the RTS can
272 // exit.
273 //
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
276
277 switch (sched_state) {
278 case SCHED_RUNNING:
279 break;
280 case SCHED_INTERRUPTING:
281 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap,task,rtsFalse);
284
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
288 // case now.
289 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
290 // fall through
291
292 case SCHED_SHUTTING_DOWN:
293 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
296 // the run queue.
297 if (!isBoundTask(task) && emptyRunQueue(cap)) {
298 return cap;
299 }
300 break;
301 default:
302 barf("sched_state: %d", sched_state);
303 }
304
305 scheduleFindWork(&cap);
306
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap,task);
310
311 scheduleDetectDeadlock(&cap,task);
312
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
318 //
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
321
322 #if defined(THREADED_RTS)
323 if (first)
324 {
325 // XXX: ToDo
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
328 // // door.
329 // first = rtsFalse;
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
331 }
332
333 scheduleYield(&cap,task);
334
335 if (emptyRunQueue(cap)) continue; // look for work again
336 #endif
337
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap) ) {
340 ASSERT(sched_state >= SCHED_INTERRUPTING);
341 }
342 #endif
343
344 //
345 // Get a thread to run
346 //
347 t = popRunQueue(cap);
348
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity,checkTSO(t));
352
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
356 {
357 InCall *bound = t->bound;
358
359 if (bound) {
360 if (bound->task == task) {
361 // yes, the Haskell thread is bound to the current native thread
362 } else {
363 debugTrace(DEBUG_sched,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t->id);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap,t);
368 continue;
369 }
370 } else {
371 // The thread we want to run is unbound.
372 if (task->incall->tso) {
373 debugTrace(DEBUG_sched,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t->id);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap,t);
379 continue;
380 }
381 }
382 }
383 #endif
384
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state >= SCHED_INTERRUPTING &&
390 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391 deleteThread(cap,t);
392 }
393
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
400 //
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
406 #ifdef THREADED_RTS
407 if (cap->disabled && !t->bound) {
408 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
409 migrateThread(cap, t, dest_cap);
410 continue;
411 }
412 #endif
413
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
416 * +RTS -C0
417 */
418 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
419 && !emptyThreadQueues(cap)) {
420 cap->context_switch = 1;
421 }
422
423 run_thread:
424
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
427 // that.
428 cap->r.rCurrentTSO = t;
429
430 startHeapProfTimer();
431
432 // ----------------------------------------------------------------------
433 // Run the current thread
434
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436 ASSERT(t->cap == cap);
437 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
438
439 prev_what_next = t->what_next;
440
441 errno = t->saved_errno;
442 #if mingw32_HOST_OS
443 SetLastError(t->saved_winerror);
444 #endif
445
446 // reset the interrupt flag before running Haskell code
447 cap->interrupt = 0;
448
449 cap->in_haskell = rtsTrue;
450 cap->idle = 0;
451
452 dirty_TSO(cap,t);
453 dirty_STACK(cap,t->stackobj);
454
455 #if defined(THREADED_RTS)
456 if (recent_activity == ACTIVITY_DONE_GC) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
459 nat prev;
460 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
461 if (prev == ACTIVITY_DONE_GC) {
462 startTimer();
463 }
464 } else if (recent_activity != ACTIVITY_INACTIVE) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
468 // wakeUpRts().
469 recent_activity = ACTIVITY_YES;
470 }
471 #endif
472
473 traceEventRunThread(cap, t);
474
475 switch (prev_what_next) {
476
477 case ThreadKilled:
478 case ThreadComplete:
479 /* Thread already finished, return to scheduler. */
480 ret = ThreadFinished;
481 break;
482
483 case ThreadRunGHC:
484 {
485 StgRegTable *r;
486 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
487 cap = regTableToCapability(r);
488 ret = r->rRet;
489 break;
490 }
491
492 case ThreadInterpret:
493 cap = interpretBCO(cap);
494 ret = cap->r.rRet;
495 break;
496
497 default:
498 barf("schedule: invalid what_next field");
499 }
500
501 cap->in_haskell = rtsFalse;
502
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t = cap->r.rCurrentTSO;
506
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t->saved_errno = errno;
511 #if mingw32_HOST_OS
512 // Similarly for Windows error code
513 t->saved_winerror = GetLastError();
514 #endif
515
516 if (ret == ThreadBlocked) {
517 if (t->why_blocked == BlockedOnBlackHole) {
518 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
519 traceEventStopThread(cap, t, t->why_blocked + 6,
520 owner != NULL ? owner->id : 0);
521 } else {
522 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
523 }
524 } else {
525 traceEventStopThread(cap, t, ret, 0);
526 }
527
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529 ASSERT(t->cap == cap);
530
531 // ----------------------------------------------------------------------
532
533 // Costs for the scheduler are assigned to CCS_SYSTEM
534 stopHeapProfTimer();
535 #if defined(PROFILING)
536 cap->r.rCCCS = CCS_SYSTEM;
537 #endif
538
539 schedulePostRunThread(cap,t);
540
541 ready_to_gc = rtsFalse;
542
543 switch (ret) {
544 case HeapOverflow:
545 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
546 break;
547
548 case StackOverflow:
549 // just adjust the stack for this thread, then pop it back
550 // on the run queue.
551 threadStackOverflow(cap, t);
552 pushOnRunQueue(cap,t);
553 break;
554
555 case ThreadYielding:
556 if (scheduleHandleYield(cap, t, prev_what_next)) {
557 // shortcut for switching between compiler/interpreter:
558 goto run_thread;
559 }
560 break;
561
562 case ThreadBlocked:
563 scheduleHandleThreadBlocked(t);
564 break;
565
566 case ThreadFinished:
567 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
569 break;
570
571 default:
572 barf("schedule: invalid thread return code %d", (int)ret);
573 }
574
575 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
576 scheduleDoGC(&cap,task,rtsFalse);
577 }
578 } /* end of while() */
579 }
580
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
584
585 void
586 removeFromRunQueue (Capability *cap, StgTSO *tso)
587 {
588 if (tso->block_info.prev == END_TSO_QUEUE) {
589 ASSERT(cap->run_queue_hd == tso);
590 cap->run_queue_hd = tso->_link;
591 } else {
592 setTSOLink(cap, tso->block_info.prev, tso->_link);
593 }
594 if (tso->_link == END_TSO_QUEUE) {
595 ASSERT(cap->run_queue_tl == tso);
596 cap->run_queue_tl = tso->block_info.prev;
597 } else {
598 setTSOPrev(cap, tso->_link, tso->block_info.prev);
599 }
600 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
601
602 IF_DEBUG(sanity, checkRunQueue(cap));
603 }
604
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
608
609 static void
610 schedulePreLoop(void)
611 {
612 // initialisation for scheduler - what cannot go into initScheduler()
613
614 #if defined(mingw32_HOST_OS)
615 win32AllocStack();
616 #endif
617 }
618
619 /* -----------------------------------------------------------------------------
620 * scheduleFindWork()
621 *
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
624
625 static void
626 scheduleFindWork (Capability **pcap)
627 {
628 scheduleStartSignalHandlers(*pcap);
629
630 scheduleProcessInbox(pcap);
631
632 scheduleCheckBlockedThreads(*pcap);
633
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
636 #endif
637 }
638
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability *cap, Task *task)
642 {
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC
645 // - another Task is returning from a foreign call
646 // - the thread at the head of the run queue cannot be run
647 // by this Task (it is bound to another Task, or it is unbound
648 // and this task it bound).
649 return (pending_sync ||
650 cap->returning_tasks_hd != NULL ||
651 (!emptyRunQueue(cap) && (task->incall->tso == NULL
652 ? cap->run_queue_hd->bound != NULL
653 : cap->run_queue_hd->bound != task->incall)));
654 }
655
656 // This is the single place where a Task goes to sleep. There are
657 // two reasons it might need to sleep:
658 // - there are no threads to run
659 // - we need to yield this Capability to someone else
660 // (see shouldYieldCapability())
661 //
662 // Careful: the scheduler loop is quite delicate. Make sure you run
663 // the tests in testsuite/concurrent (all ways) after modifying this,
664 // and also check the benchmarks in nofib/parallel for regressions.
665
666 static void
667 scheduleYield (Capability **pcap, Task *task)
668 {
669 Capability *cap = *pcap;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING))
677 return;
678
679 // otherwise yield (sleep), and keep yielding if necessary.
680 do {
681 yieldCapability(&cap,task);
682 }
683 while (shouldYieldCapability(cap,task));
684
685 // note there may still be no threads on the run queue at this
686 // point, the caller has to check.
687
688 *pcap = cap;
689 return;
690 }
691 #endif
692
693 /* -----------------------------------------------------------------------------
694 * schedulePushWork()
695 *
696 * Push work to other Capabilities if we have some.
697 * -------------------------------------------------------------------------- */
698
699 static void
700 schedulePushWork(Capability *cap USED_IF_THREADS,
701 Task *task USED_IF_THREADS)
702 {
703 /* following code not for PARALLEL_HASKELL. I kept the call general,
704 future GUM versions might use pushing in a distributed setup */
705 #if defined(THREADED_RTS)
706
707 Capability *free_caps[n_capabilities], *cap0;
708 nat i, n_free_caps;
709
710 // migration can be turned off with +RTS -qm
711 if (!RtsFlags.ParFlags.migrate) return;
712
713 // Check whether we have more threads on our run queue, or sparks
714 // in our pool, that we could hand to another Capability.
715 if (cap->run_queue_hd == END_TSO_QUEUE) {
716 if (sparkPoolSizeCap(cap) < 2) return;
717 } else {
718 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
719 sparkPoolSizeCap(cap) < 1) return;
720 }
721
722 // First grab as many free Capabilities as we can.
723 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
724 cap0 = &capabilities[i];
725 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726 if (!emptyRunQueue(cap0)
727 || cap0->returning_tasks_hd != NULL
728 || cap0->inbox != (Message*)END_TSO_QUEUE) {
729 // it already has some work, we just grabbed it at
730 // the wrong moment. Or maybe it's deadlocked!
731 releaseCapability(cap0);
732 } else {
733 free_caps[n_free_caps++] = cap0;
734 }
735 }
736 }
737
738 // we now have n_free_caps free capabilities stashed in
739 // free_caps[]. Share our run queue equally with them. This is
740 // probably the simplest thing we could do; improvements we might
741 // want to do include:
742 //
743 // - giving high priority to moving relatively new threads, on
744 // the gournds that they haven't had time to build up a
745 // working set in the cache on this CPU/Capability.
746 //
747 // - giving low priority to moving long-lived threads
748
749 if (n_free_caps > 0) {
750 StgTSO *prev, *t, *next;
751 #ifdef SPARK_PUSHING
752 rtsBool pushed_to_all;
753 #endif
754
755 debugTrace(DEBUG_sched,
756 "cap %d: %s and %d free capabilities, sharing...",
757 cap->no,
758 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
759 "excess threads on run queue":"sparks to share (>=2)",
760 n_free_caps);
761
762 i = 0;
763 #ifdef SPARK_PUSHING
764 pushed_to_all = rtsFalse;
765 #endif
766
767 if (cap->run_queue_hd != END_TSO_QUEUE) {
768 prev = cap->run_queue_hd;
769 t = prev->_link;
770 prev->_link = END_TSO_QUEUE;
771 for (; t != END_TSO_QUEUE; t = next) {
772 next = t->_link;
773 t->_link = END_TSO_QUEUE;
774 if (t->bound == task->incall // don't move my bound thread
775 || tsoLocked(t)) { // don't move a locked thread
776 setTSOLink(cap, prev, t);
777 setTSOPrev(cap, t, prev);
778 prev = t;
779 } else if (i == n_free_caps) {
780 #ifdef SPARK_PUSHING
781 pushed_to_all = rtsTrue;
782 #endif
783 i = 0;
784 // keep one for us
785 setTSOLink(cap, prev, t);
786 setTSOPrev(cap, t, prev);
787 prev = t;
788 } else {
789 appendToRunQueue(free_caps[i],t);
790
791 traceEventMigrateThread (cap, t, free_caps[i]->no);
792
793 if (t->bound) { t->bound->task->cap = free_caps[i]; }
794 t->cap = free_caps[i];
795 i++;
796 }
797 }
798 cap->run_queue_tl = prev;
799
800 IF_DEBUG(sanity, checkRunQueue(cap));
801 }
802
803 #ifdef SPARK_PUSHING
804 /* JB I left this code in place, it would work but is not necessary */
805
806 // If there are some free capabilities that we didn't push any
807 // threads to, then try to push a spark to each one.
808 if (!pushed_to_all) {
809 StgClosure *spark;
810 // i is the next free capability to push to
811 for (; i < n_free_caps; i++) {
812 if (emptySparkPoolCap(free_caps[i])) {
813 spark = tryStealSpark(cap->sparks);
814 if (spark != NULL) {
815 /* TODO: if anyone wants to re-enable this code then
816 * they must consider the fizzledSpark(spark) case
817 * and update the per-cap spark statistics.
818 */
819 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
820
821 traceEventStealSpark(free_caps[i], t, cap->no);
822
823 newSpark(&(free_caps[i]->r), spark);
824 }
825 }
826 }
827 }
828 #endif /* SPARK_PUSHING */
829
830 // release the capabilities
831 for (i = 0; i < n_free_caps; i++) {
832 task->cap = free_caps[i];
833 releaseAndWakeupCapability(free_caps[i]);
834 }
835 }
836 task->cap = cap; // reset to point to our Capability.
837
838 #endif /* THREADED_RTS */
839
840 }
841
842 /* ----------------------------------------------------------------------------
843 * Start any pending signal handlers
844 * ------------------------------------------------------------------------- */
845
846 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
847 static void
848 scheduleStartSignalHandlers(Capability *cap)
849 {
850 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
851 // safe outside the lock
852 startSignalHandlers(cap);
853 }
854 }
855 #else
856 static void
857 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
858 {
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863 * Check for blocked threads that can be woken up.
864 * ------------------------------------------------------------------------- */
865
866 static void
867 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
868 {
869 #if !defined(THREADED_RTS)
870 //
871 // Check whether any waiting threads need to be woken up. If the
872 // run queue is empty, and there are no other tasks running, we
873 // can wait indefinitely for something to happen.
874 //
875 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
876 {
877 awaitEvent (emptyRunQueue(cap));
878 }
879 #endif
880 }
881
882 /* ----------------------------------------------------------------------------
883 * Detect deadlock conditions and attempt to resolve them.
884 * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleDetectDeadlock (Capability **pcap, Task *task)
888 {
889 Capability *cap = *pcap;
890 /*
891 * Detect deadlock: when we have no threads to run, there are no
892 * threads blocked, waiting for I/O, or sleeping, and all the
893 * other tasks are waiting for work, we must have a deadlock of
894 * some description.
895 */
896 if ( emptyThreadQueues(cap) )
897 {
898 #if defined(THREADED_RTS)
899 /*
900 * In the threaded RTS, we only check for deadlock if there
901 * has been no activity in a complete timeslice. This means
902 * we won't eagerly start a full GC just because we don't have
903 * any threads to run currently.
904 */
905 if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910 // Garbage collection can release some new threads due to
911 // either (a) finalizers or (b) threads resurrected because
912 // they are unreachable and will therefore be sent an
913 // exception. Any threads thus released will be immediately
914 // runnable.
915 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
916 cap = *pcap;
917 // when force_major == rtsTrue. scheduleDoGC sets
918 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
919 // signal.
920
921 if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924 /* If we have user-installed signal handlers, then wait
925 * for signals to arrive rather then bombing out with a
926 * deadlock.
927 */
928 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929 debugTrace(DEBUG_sched,
930 "still deadlocked, waiting for signals...");
931
932 awaitUserSignals();
933
934 if (signals_pending()) {
935 startSignalHandlers(cap);
936 }
937
938 // either we have threads to run, or we were interrupted:
939 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941 return;
942 }
943 #endif
944
945 #if !defined(THREADED_RTS)
946 /* Probably a real deadlock. Send the current main thread the
947 * Deadlock exception.
948 */
949 if (task->incall->tso) {
950 switch (task->incall->tso->why_blocked) {
951 case BlockedOnSTM:
952 case BlockedOnBlackHole:
953 case BlockedOnMsgThrowTo:
954 case BlockedOnMVar:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static void
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches != END_BF_QUEUE) {
979 processFetches();
980 }
981 # endif
982
983 if (RtsFlags.ParFlags.BufferTime) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
986 sendOldBuffers();
987 }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
994
995 static void
996 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
997 {
998 #if defined(THREADED_RTS)
999 Message *m, *next;
1000 int r;
1001 Capability *cap = *pcap;
1002
1003 while (!emptyInbox(cap)) {
1004 if (cap->r.rCurrentNursery->link == NULL ||
1005 g0->n_new_large_words >= large_alloc_lim) {
1006 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1007 cap = *pcap;
1008 }
1009
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1014 //
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r = TRY_ACQUIRE_LOCK(&cap->lock);
1021 if (r != 0) return;
1022
1023 m = cap->inbox;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025
1026 RELEASE_LOCK(&cap->lock);
1027
1028 while (m != (Message*)END_TSO_QUEUE) {
1029 next = m->link;
1030 executeMessage(cap, m);
1031 m = next;
1032 }
1033 }
1034 #endif
1035 }
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions (t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 /* some statistics gathering in the parallel case */
1086 }
1087
1088 /* -----------------------------------------------------------------------------
1089 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1090 * -------------------------------------------------------------------------- */
1091
1092 static rtsBool
1093 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1094 {
1095 // did the task ask for a large block?
1096 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1097 // if so, get one and push it on the front of the nursery.
1098 bdescr *bd;
1099 lnat blocks;
1100
1101 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1102
1103 if (blocks > BLOCKS_PER_MBLOCK) {
1104 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1105 }
1106
1107 debugTrace(DEBUG_sched,
1108 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1109 (long)t->id, what_next_strs[t->what_next], blocks);
1110
1111 // don't do this if the nursery is (nearly) full, we'll GC first.
1112 if (cap->r.rCurrentNursery->link != NULL ||
1113 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1114 // if the nursery has only one block.
1115
1116 bd = allocGroup_lock(blocks);
1117 cap->r.rNursery->n_blocks += blocks;
1118
1119 // link the new group into the list
1120 bd->link = cap->r.rCurrentNursery;
1121 bd->u.back = cap->r.rCurrentNursery->u.back;
1122 if (cap->r.rCurrentNursery->u.back != NULL) {
1123 cap->r.rCurrentNursery->u.back->link = bd;
1124 } else {
1125 cap->r.rNursery->blocks = bd;
1126 }
1127 cap->r.rCurrentNursery->u.back = bd;
1128
1129 // initialise it as a nursery block. We initialise the
1130 // step, gen_no, and flags field of *every* sub-block in
1131 // this large block, because this is easier than making
1132 // sure that we always find the block head of a large
1133 // block whenever we call Bdescr() (eg. evacuate() and
1134 // isAlive() in the GC would both have to do this, at
1135 // least).
1136 {
1137 bdescr *x;
1138 for (x = bd; x < bd + blocks; x++) {
1139 initBdescr(x,g0,g0);
1140 x->free = x->start;
1141 x->flags = 0;
1142 }
1143 }
1144
1145 // This assert can be a killer if the app is doing lots
1146 // of large block allocations.
1147 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1148
1149 // now update the nursery to point to the new block
1150 cap->r.rCurrentNursery = bd;
1151
1152 // we might be unlucky and have another thread get on the
1153 // run queue before us and steal the large block, but in that
1154 // case the thread will just end up requesting another large
1155 // block.
1156 pushOnRunQueue(cap,t);
1157 return rtsFalse; /* not actually GC'ing */
1158 }
1159 }
1160
1161 if (cap->r.rHpLim == NULL || cap->context_switch) {
1162 // Sometimes we miss a context switch, e.g. when calling
1163 // primitives in a tight loop, MAYBE_GC() doesn't check the
1164 // context switch flag, and we end up waiting for a GC.
1165 // See #1984, and concurrent/should_run/1984
1166 cap->context_switch = 0;
1167 appendToRunQueue(cap,t);
1168 } else {
1169 pushOnRunQueue(cap,t);
1170 }
1171 return rtsTrue;
1172 /* actual GC is done at the end of the while loop in schedule() */
1173 }
1174
1175 /* -----------------------------------------------------------------------------
1176 * Handle a thread that returned to the scheduler with ThreadYielding
1177 * -------------------------------------------------------------------------- */
1178
1179 static rtsBool
1180 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1181 {
1182 /* put the thread back on the run queue. Then, if we're ready to
1183 * GC, check whether this is the last task to stop. If so, wake
1184 * up the GC thread. getThread will block during a GC until the
1185 * GC is finished.
1186 */
1187
1188 ASSERT(t->_link == END_TSO_QUEUE);
1189
1190 // Shortcut if we're just switching evaluators: don't bother
1191 // doing stack squeezing (which can be expensive), just run the
1192 // thread.
1193 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1194 debugTrace(DEBUG_sched,
1195 "--<< thread %ld (%s) stopped to switch evaluators",
1196 (long)t->id, what_next_strs[t->what_next]);
1197 return rtsTrue;
1198 }
1199
1200 // Reset the context switch flag. We don't do this just before
1201 // running the thread, because that would mean we would lose ticks
1202 // during GC, which can lead to unfair scheduling (a thread hogs
1203 // the CPU because the tick always arrives during GC). This way
1204 // penalises threads that do a lot of allocation, but that seems
1205 // better than the alternative.
1206 if (cap->context_switch != 0) {
1207 cap->context_switch = 0;
1208 appendToRunQueue(cap,t);
1209 } else {
1210 pushOnRunQueue(cap,t);
1211 }
1212
1213 IF_DEBUG(sanity,
1214 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1215 checkTSO(t));
1216
1217 return rtsFalse;
1218 }
1219
1220 /* -----------------------------------------------------------------------------
1221 * Handle a thread that returned to the scheduler with ThreadBlocked
1222 * -------------------------------------------------------------------------- */
1223
1224 static void
1225 scheduleHandleThreadBlocked( StgTSO *t
1226 #if !defined(DEBUG)
1227 STG_UNUSED
1228 #endif
1229 )
1230 {
1231
1232 // We don't need to do anything. The thread is blocked, and it
1233 // has tidied up its stack and placed itself on whatever queue
1234 // it needs to be on.
1235
1236 // ASSERT(t->why_blocked != NotBlocked);
1237 // Not true: for example,
1238 // - the thread may have woken itself up already, because
1239 // threadPaused() might have raised a blocked throwTo
1240 // exception, see maybePerformBlockedException().
1241
1242 #ifdef DEBUG
1243 traceThreadStatus(DEBUG_sched, t);
1244 #endif
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248 * Handle a thread that returned to the scheduler with ThreadFinished
1249 * -------------------------------------------------------------------------- */
1250
1251 static rtsBool
1252 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1253 {
1254 /* Need to check whether this was a main thread, and if so,
1255 * return with the return value.
1256 *
1257 * We also end up here if the thread kills itself with an
1258 * uncaught exception, see Exception.cmm.
1259 */
1260
1261 // blocked exceptions can now complete, even if the thread was in
1262 // blocked mode (see #2910).
1263 awakenBlockedExceptionQueue (cap, t);
1264
1265 //
1266 // Check whether the thread that just completed was a bound
1267 // thread, and if so return with the result.
1268 //
1269 // There is an assumption here that all thread completion goes
1270 // through this point; we need to make sure that if a thread
1271 // ends up in the ThreadKilled state, that it stays on the run
1272 // queue so it can be dealt with here.
1273 //
1274
1275 if (t->bound) {
1276
1277 if (t->bound != task->incall) {
1278 #if !defined(THREADED_RTS)
1279 // Must be a bound thread that is not the topmost one. Leave
1280 // it on the run queue until the stack has unwound to the
1281 // point where we can deal with this. Leaving it on the run
1282 // queue also ensures that the garbage collector knows about
1283 // this thread and its return value (it gets dropped from the
1284 // step->threads list so there's no other way to find it).
1285 appendToRunQueue(cap,t);
1286 return rtsFalse;
1287 #else
1288 // this cannot happen in the threaded RTS, because a
1289 // bound thread can only be run by the appropriate Task.
1290 barf("finished bound thread that isn't mine");
1291 #endif
1292 }
1293
1294 ASSERT(task->incall->tso == t);
1295
1296 if (t->what_next == ThreadComplete) {
1297 if (task->incall->ret) {
1298 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1299 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1300 }
1301 task->incall->stat = Success;
1302 } else {
1303 if (task->incall->ret) {
1304 *(task->incall->ret) = NULL;
1305 }
1306 if (sched_state >= SCHED_INTERRUPTING) {
1307 if (heap_overflow) {
1308 task->incall->stat = HeapExhausted;
1309 } else {
1310 task->incall->stat = Interrupted;
1311 }
1312 } else {
1313 task->incall->stat = Killed;
1314 }
1315 }
1316 #ifdef DEBUG
1317 removeThreadLabel((StgWord)task->incall->tso->id);
1318 #endif
1319
1320 // We no longer consider this thread and task to be bound to
1321 // each other. The TSO lives on until it is GC'd, but the
1322 // task is about to be released by the caller, and we don't
1323 // want anyone following the pointer from the TSO to the
1324 // defunct task (which might have already been
1325 // re-used). This was a real bug: the GC updated
1326 // tso->bound->tso which lead to a deadlock.
1327 t->bound = NULL;
1328 task->incall->tso = NULL;
1329
1330 return rtsTrue; // tells schedule() to return
1331 }
1332
1333 return rtsFalse;
1334 }
1335
1336 /* -----------------------------------------------------------------------------
1337 * Perform a heap census
1338 * -------------------------------------------------------------------------- */
1339
1340 static rtsBool
1341 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1342 {
1343 // When we have +RTS -i0 and we're heap profiling, do a census at
1344 // every GC. This lets us get repeatable runs for debugging.
1345 if (performHeapProfile ||
1346 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1347 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1348 return rtsTrue;
1349 } else {
1350 return rtsFalse;
1351 }
1352 }
1353
1354 /* -----------------------------------------------------------------------------
1355 * Start a synchronisation of all capabilities
1356 * -------------------------------------------------------------------------- */
1357
1358 // Returns:
1359 // 0 if we successfully got a sync
1360 // non-0 if there was another sync request in progress,
1361 // and we yielded to it. The value returned is the
1362 // type of the other sync request.
1363 //
1364 #if defined(THREADED_RTS)
1365 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1366 {
1367 nat prev_pending_sync;
1368
1369 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1370
1371 if (prev_pending_sync)
1372 {
1373 do {
1374 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1375 prev_pending_sync);
1376 ASSERT(*pcap);
1377 yieldCapability(pcap,task);
1378 } while (pending_sync);
1379 return prev_pending_sync; // NOTE: task->cap might have changed now
1380 }
1381 else
1382 {
1383 return 0;
1384 }
1385 }
1386
1387 //
1388 // Grab all the capabilities except the one we already hold. Used
1389 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1390 // before a fork (SYNC_OTHER).
1391 //
1392 // Only call this after requestSync(), otherwise a deadlock might
1393 // ensue if another thread is trying to synchronise.
1394 //
1395 static void acquireAllCapabilities(Capability *cap, Task *task)
1396 {
1397 Capability *tmpcap;
1398 nat i;
1399
1400 for (i=0; i < n_capabilities; i++) {
1401 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1402 tmpcap = &capabilities[i];
1403 if (tmpcap != cap) {
1404 // we better hope this task doesn't get migrated to
1405 // another Capability while we're waiting for this one.
1406 // It won't, because load balancing happens while we have
1407 // all the Capabilities, but even so it's a slightly
1408 // unsavoury invariant.
1409 task->cap = tmpcap;
1410 waitForReturnCapability(&tmpcap, task);
1411 if (tmpcap->no != i) {
1412 barf("acquireAllCapabilities: got the wrong capability");
1413 }
1414 }
1415 }
1416 task->cap = cap;
1417 }
1418
1419 static void releaseAllCapabilities(Capability *cap, Task *task)
1420 {
1421 nat i;
1422
1423 for (i = 0; i < n_capabilities; i++) {
1424 if (cap->no != i) {
1425 task->cap = &capabilities[i];
1426 releaseCapability(&capabilities[i]);
1427 }
1428 }
1429 task->cap = cap;
1430 }
1431 #endif
1432
1433 /* -----------------------------------------------------------------------------
1434 * Perform a garbage collection if necessary
1435 * -------------------------------------------------------------------------- */
1436
1437 static void
1438 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1439 rtsBool force_major)
1440 {
1441 Capability *cap = *pcap;
1442 rtsBool heap_census;
1443 #ifdef THREADED_RTS
1444 rtsBool idle_cap[n_capabilities];
1445 rtsBool gc_type;
1446 nat i, sync;
1447 StgTSO *tso;
1448 #endif
1449
1450 if (sched_state == SCHED_SHUTTING_DOWN) {
1451 // The final GC has already been done, and the system is
1452 // shutting down. We'll probably deadlock if we try to GC
1453 // now.
1454 return;
1455 }
1456
1457 #ifdef THREADED_RTS
1458 if (sched_state < SCHED_INTERRUPTING
1459 && RtsFlags.ParFlags.parGcEnabled
1460 && N >= RtsFlags.ParFlags.parGcGen
1461 && ! oldest_gen->mark)
1462 {
1463 gc_type = SYNC_GC_PAR;
1464 } else {
1465 gc_type = SYNC_GC_SEQ;
1466 }
1467
1468 // In order to GC, there must be no threads running Haskell code.
1469 // Therefore, the GC thread needs to hold *all* the capabilities,
1470 // and release them after the GC has completed.
1471 //
1472 // This seems to be the simplest way: previous attempts involved
1473 // making all the threads with capabilities give up their
1474 // capabilities and sleep except for the *last* one, which
1475 // actually did the GC. But it's quite hard to arrange for all
1476 // the other tasks to sleep and stay asleep.
1477 //
1478
1479 /* Other capabilities are prevented from running yet more Haskell
1480 threads if pending_sync is set. Tested inside
1481 yieldCapability() and releaseCapability() in Capability.c */
1482
1483 do {
1484 sync = requestSync(pcap, task, gc_type);
1485 cap = *pcap;
1486 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1487 // someone else had a pending sync request for a GC, so
1488 // let's assume GC has been done and we don't need to GC
1489 // again.
1490 return;
1491 }
1492 if (sched_state == SCHED_SHUTTING_DOWN) {
1493 // The scheduler might now be shutting down. We tested
1494 // this above, but it might have become true since then as
1495 // we yielded the capability in requestSync().
1496 return;
1497 }
1498 } while (sync);
1499
1500 interruptAllCapabilities();
1501
1502 // The final shutdown GC is always single-threaded, because it's
1503 // possible that some of the Capabilities have no worker threads.
1504
1505 if (gc_type == SYNC_GC_SEQ)
1506 {
1507 traceEventRequestSeqGc(cap);
1508 }
1509 else
1510 {
1511 traceEventRequestParGc(cap);
1512 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1513 }
1514
1515 if (gc_type == SYNC_GC_SEQ)
1516 {
1517 // single-threaded GC: grab all the capabilities
1518 acquireAllCapabilities(cap,task);
1519 }
1520 else
1521 {
1522 // If we are load-balancing collections in this
1523 // generation, then we require all GC threads to participate
1524 // in the collection. Otherwise, we only require active
1525 // threads to participate, and we set gc_threads[i]->idle for
1526 // any idle capabilities. The rationale here is that waking
1527 // up an idle Capability takes much longer than just doing any
1528 // GC work on its behalf.
1529
1530 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1531 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1532 N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1533 for (i=0; i < n_capabilities; i++) {
1534 if (capabilities[i].disabled) {
1535 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1536 } else {
1537 idle_cap[i] = rtsFalse;
1538 }
1539 }
1540 } else {
1541 for (i=0; i < n_capabilities; i++) {
1542 if (capabilities[i].disabled) {
1543 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1544 } else if (i == cap->no ||
1545 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1546 idle_cap[i] = rtsFalse;
1547 } else {
1548 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1549 if (!idle_cap[i]) {
1550 n_failed_trygrab_idles++;
1551 } else {
1552 n_idle_caps++;
1553 }
1554 }
1555 }
1556 }
1557
1558 // We set the gc_thread[i]->idle flag if that
1559 // capability/thread is not participating in this collection.
1560 // We also keep a local record of which capabilities are idle
1561 // in idle_cap[], because scheduleDoGC() is re-entrant:
1562 // another thread might start a GC as soon as we've finished
1563 // this one, and thus the gc_thread[]->idle flags are invalid
1564 // as soon as we release any threads after GC. Getting this
1565 // wrong leads to a rare and hard to debug deadlock!
1566
1567 for (i=0; i < n_capabilities; i++) {
1568 gc_threads[i]->idle = idle_cap[i];
1569 capabilities[i].idle++;
1570 }
1571
1572 // For all capabilities participating in this GC, wait until
1573 // they have stopped mutating and are standing by for GC.
1574 waitForGcThreads(cap);
1575
1576 #if defined(THREADED_RTS)
1577 // Stable point where we can do a global check on our spark counters
1578 ASSERT(checkSparkCountInvariant());
1579 #endif
1580 }
1581
1582 #endif
1583
1584 IF_DEBUG(scheduler, printAllThreads());
1585
1586 delete_threads_and_gc:
1587 /*
1588 * We now have all the capabilities; if we're in an interrupting
1589 * state, then we should take the opportunity to delete all the
1590 * threads in the system.
1591 */
1592 if (sched_state == SCHED_INTERRUPTING) {
1593 deleteAllThreads(cap);
1594 #if defined(THREADED_RTS)
1595 // Discard all the sparks from every Capability. Why?
1596 // They'll probably be GC'd anyway since we've killed all the
1597 // threads. It just avoids the GC having to do any work to
1598 // figure out that any remaining sparks are garbage.
1599 for (i = 0; i < n_capabilities; i++) {
1600 capabilities[i].spark_stats.gcd +=
1601 sparkPoolSize(capabilities[i].sparks);
1602 // No race here since all Caps are stopped.
1603 discardSparksCap(&capabilities[i]);
1604 }
1605 #endif
1606 sched_state = SCHED_SHUTTING_DOWN;
1607 }
1608
1609 /*
1610 * When there are disabled capabilities, we want to migrate any
1611 * threads away from them. Normally this happens in the
1612 * scheduler's loop, but only for unbound threads - it's really
1613 * hard for a bound thread to migrate itself. So we have another
1614 * go here.
1615 */
1616 #if defined(THREADED_RTS)
1617 for (i = enabled_capabilities; i < n_capabilities; i++) {
1618 Capability *tmp_cap, *dest_cap;
1619 tmp_cap = &capabilities[i];
1620 ASSERT(tmp_cap->disabled);
1621 if (i != cap->no) {
1622 dest_cap = &capabilities[i % enabled_capabilities];
1623 while (!emptyRunQueue(tmp_cap)) {
1624 tso = popRunQueue(tmp_cap);
1625 migrateThread(tmp_cap, tso, dest_cap);
1626 if (tso->bound) { tso->bound->task->cap = dest_cap; }
1627 }
1628 }
1629 }
1630 #endif
1631
1632 heap_census = scheduleNeedHeapProfile(rtsTrue);
1633
1634 traceEventGcStart(cap);
1635 #if defined(THREADED_RTS)
1636 // reset pending_sync *before* GC, so that when the GC threads
1637 // emerge they don't immediately re-enter the GC.
1638 pending_sync = 0;
1639 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1640 #else
1641 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1642 #endif
1643 traceEventGcEnd(cap);
1644
1645 traceSparkCounters(cap);
1646
1647 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1648 {
1649 // We are doing a GC because the system has been idle for a
1650 // timeslice and we need to check for deadlock. Record the
1651 // fact that we've done a GC and turn off the timer signal;
1652 // it will get re-enabled if we run any threads after the GC.
1653 recent_activity = ACTIVITY_DONE_GC;
1654 stopTimer();
1655 }
1656 else
1657 {
1658 // the GC might have taken long enough for the timer to set
1659 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1660 // necessarily deadlocked:
1661 recent_activity = ACTIVITY_YES;
1662 }
1663
1664 #if defined(THREADED_RTS)
1665 // Stable point where we can do a global check on our spark counters
1666 ASSERT(checkSparkCountInvariant());
1667 #endif
1668
1669 // The heap census itself is done during GarbageCollect().
1670 if (heap_census) {
1671 performHeapProfile = rtsFalse;
1672 }
1673
1674 #if defined(THREADED_RTS)
1675 if (gc_type == SYNC_GC_PAR)
1676 {
1677 releaseGCThreads(cap);
1678 for (i = 0; i < n_capabilities; i++) {
1679 if (i != cap->no) {
1680 if (idle_cap[i]) {
1681 ASSERT(capabilities[i].running_task == task);
1682 task->cap = &capabilities[i];
1683 releaseCapability(&capabilities[i]);
1684 } else {
1685 ASSERT(capabilities[i].running_task != task);
1686 }
1687 }
1688 }
1689 task->cap = cap;
1690 }
1691 #endif
1692
1693 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1694 // GC set the heap_overflow flag, so we should proceed with
1695 // an orderly shutdown now. Ultimately we want the main
1696 // thread to return to its caller with HeapExhausted, at which
1697 // point the caller should call hs_exit(). The first step is
1698 // to delete all the threads.
1699 //
1700 // Another way to do this would be to raise an exception in
1701 // the main thread, which we really should do because it gives
1702 // the program a chance to clean up. But how do we find the
1703 // main thread? It should presumably be the same one that
1704 // gets ^C exceptions, but that's all done on the Haskell side
1705 // (GHC.TopHandler).
1706 sched_state = SCHED_INTERRUPTING;
1707 goto delete_threads_and_gc;
1708 }
1709
1710 #ifdef SPARKBALANCE
1711 /* JB
1712 Once we are all together... this would be the place to balance all
1713 spark pools. No concurrent stealing or adding of new sparks can
1714 occur. Should be defined in Sparks.c. */
1715 balanceSparkPoolsCaps(n_capabilities, capabilities);
1716 #endif
1717
1718 #if defined(THREADED_RTS)
1719 if (gc_type == SYNC_GC_SEQ) {
1720 // release our stash of capabilities.
1721 releaseAllCapabilities(cap, task);
1722 }
1723 #endif
1724
1725 return;
1726 }
1727
1728 /* ---------------------------------------------------------------------------
1729 * Singleton fork(). Do not copy any running threads.
1730 * ------------------------------------------------------------------------- */
1731
1732 pid_t
1733 forkProcess(HsStablePtr *entry
1734 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1735 STG_UNUSED
1736 #endif
1737 )
1738 {
1739 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1740 pid_t pid;
1741 StgTSO* t,*next;
1742 Capability *cap;
1743 nat g;
1744 Task *task = NULL;
1745 nat i;
1746 #ifdef THREADED_RTS
1747 nat sync;
1748 #endif
1749
1750 debugTrace(DEBUG_sched, "forking!");
1751
1752 task = newBoundTask();
1753
1754 cap = NULL;
1755 waitForReturnCapability(&cap, task);
1756
1757 #ifdef THREADED_RTS
1758 do {
1759 sync = requestSync(&cap, task, SYNC_OTHER);
1760 } while (sync);
1761
1762 acquireAllCapabilities(cap,task);
1763
1764 pending_sync = 0;
1765 #endif
1766
1767 // no funny business: hold locks while we fork, otherwise if some
1768 // other thread is holding a lock when the fork happens, the data
1769 // structure protected by the lock will forever be in an
1770 // inconsistent state in the child. See also #1391.
1771 ACQUIRE_LOCK(&sched_mutex);
1772 ACQUIRE_LOCK(&sm_mutex);
1773 ACQUIRE_LOCK(&stable_mutex);
1774 ACQUIRE_LOCK(&task->lock);
1775
1776 for (i=0; i < n_capabilities; i++) {
1777 ACQUIRE_LOCK(&capabilities[i].lock);
1778 }
1779
1780 stopTimer(); // See #4074
1781
1782 #if defined(TRACING)
1783 flushEventLog(); // so that child won't inherit dirty file buffers
1784 #endif
1785
1786 pid = fork();
1787
1788 if (pid) { // parent
1789
1790 startTimer(); // #4074
1791
1792 RELEASE_LOCK(&sched_mutex);
1793 RELEASE_LOCK(&sm_mutex);
1794 RELEASE_LOCK(&stable_mutex);
1795 RELEASE_LOCK(&task->lock);
1796
1797 for (i=0; i < n_capabilities; i++) {
1798 releaseCapability_(&capabilities[i],rtsFalse);
1799 RELEASE_LOCK(&capabilities[i].lock);
1800 }
1801 boundTaskExiting(task);
1802
1803 // just return the pid
1804 return pid;
1805
1806 } else { // child
1807
1808 #if defined(THREADED_RTS)
1809 initMutex(&sched_mutex);
1810 initMutex(&sm_mutex);
1811 initMutex(&stable_mutex);
1812 initMutex(&task->lock);
1813
1814 for (i=0; i < n_capabilities; i++) {
1815 initMutex(&capabilities[i].lock);
1816 }
1817 #endif
1818
1819 #ifdef TRACING
1820 resetTracing();
1821 #endif
1822
1823 // Now, all OS threads except the thread that forked are
1824 // stopped. We need to stop all Haskell threads, including
1825 // those involved in foreign calls. Also we need to delete
1826 // all Tasks, because they correspond to OS threads that are
1827 // now gone.
1828
1829 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1830 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1831 next = t->global_link;
1832 // don't allow threads to catch the ThreadKilled
1833 // exception, but we do want to raiseAsync() because these
1834 // threads may be evaluating thunks that we need later.
1835 deleteThread_(t->cap,t);
1836
1837 // stop the GC from updating the InCall to point to
1838 // the TSO. This is only necessary because the
1839 // OSThread bound to the TSO has been killed, and
1840 // won't get a chance to exit in the usual way (see
1841 // also scheduleHandleThreadFinished).
1842 t->bound = NULL;
1843 }
1844 }
1845
1846 discardTasksExcept(task);
1847
1848 for (i=0; i < n_capabilities; i++) {
1849 cap = &capabilities[i];
1850
1851 // Empty the run queue. It seems tempting to let all the
1852 // killed threads stay on the run queue as zombies to be
1853 // cleaned up later, but some of them may correspond to
1854 // bound threads for which the corresponding Task does not
1855 // exist.
1856 cap->run_queue_hd = END_TSO_QUEUE;
1857 cap->run_queue_tl = END_TSO_QUEUE;
1858
1859 // Any suspended C-calling Tasks are no more, their OS threads
1860 // don't exist now:
1861 cap->suspended_ccalls = NULL;
1862
1863 #if defined(THREADED_RTS)
1864 // Wipe our spare workers list, they no longer exist. New
1865 // workers will be created if necessary.
1866 cap->spare_workers = NULL;
1867 cap->n_spare_workers = 0;
1868 cap->returning_tasks_hd = NULL;
1869 cap->returning_tasks_tl = NULL;
1870 #endif
1871
1872 // Release all caps except 0, we'll use that for starting
1873 // the IO manager and running the client action below.
1874 if (cap->no != 0) {
1875 task->cap = cap;
1876 releaseCapability(cap);
1877 }
1878 }
1879 cap = &capabilities[0];
1880 task->cap = cap;
1881
1882 // Empty the threads lists. Otherwise, the garbage
1883 // collector may attempt to resurrect some of these threads.
1884 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1885 generations[g].threads = END_TSO_QUEUE;
1886 }
1887
1888 // On Unix, all timers are reset in the child, so we need to start
1889 // the timer again.
1890 initTimer();
1891 startTimer();
1892
1893 #if defined(THREADED_RTS)
1894 ioManagerStartCap(&cap);
1895 #endif
1896
1897 rts_evalStableIO(&cap, entry, NULL); // run the action
1898 rts_checkSchedStatus("forkProcess",cap);
1899
1900 rts_unlock(cap);
1901 hs_exit(); // clean up and exit
1902 stg_exit(EXIT_SUCCESS);
1903 }
1904 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1905 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1906 #endif
1907 }
1908
1909 /* ---------------------------------------------------------------------------
1910 * Changing the number of Capabilities
1911 *
1912 * Changing the number of Capabilities is very tricky! We can only do
1913 * it with the system fully stopped, so we do a full sync with
1914 * requestSync(SYNC_OTHER) and grab all the capabilities.
1915 *
1916 * Then we resize the appropriate data structures, and update all
1917 * references to the old data structures which have now moved.
1918 * Finally we release the Capabilities we are holding, and start
1919 * worker Tasks on the new Capabilities we created.
1920 *
1921 * ------------------------------------------------------------------------- */
1922
1923 void
1924 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1925 {
1926 #if !defined(THREADED_RTS)
1927
1928 barf("setNumCapabilities: not supported in the non-threaded RTS");
1929
1930 #else
1931 Task *task;
1932 Capability *cap;
1933 nat sync;
1934 StgTSO* t;
1935 nat g, n;
1936 Capability *old_capabilities = NULL;
1937
1938 if (new_n_capabilities == enabled_capabilities) return;
1939
1940 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1941 enabled_capabilities, new_n_capabilities);
1942
1943 cap = rts_lock();
1944 task = cap->running_task;
1945
1946 do {
1947 sync = requestSync(&cap, task, SYNC_OTHER);
1948 } while (sync);
1949
1950 acquireAllCapabilities(cap,task);
1951
1952 pending_sync = 0;
1953
1954 if (new_n_capabilities < enabled_capabilities)
1955 {
1956 // Reducing the number of capabilities: we do not actually
1957 // remove the extra capabilities, we just mark them as
1958 // "disabled". This has the following effects:
1959 //
1960 // - threads on a disabled capability are migrated away by the
1961 // scheduler loop
1962 //
1963 // - disabled capabilities do not participate in GC
1964 // (see scheduleDoGC())
1965 //
1966 // - No spark threads are created on this capability
1967 // (see scheduleActivateSpark())
1968 //
1969 // - We do not attempt to migrate threads *to* a disabled
1970 // capability (see schedulePushWork()).
1971 //
1972 // but in other respects, a disabled capability remains
1973 // alive. Threads may be woken up on a disabled capability,
1974 // but they will be immediately migrated away.
1975 //
1976 // This approach is much easier than trying to actually remove
1977 // the capability; we don't have to worry about GC data
1978 // structures, the nursery, etc.
1979 //
1980 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
1981 capabilities[n].disabled = rtsTrue;
1982 }
1983 enabled_capabilities = new_n_capabilities;
1984 }
1985 else
1986 {
1987 // Increasing the number of enabled capabilities.
1988 //
1989 // enable any disabled capabilities, up to the required number
1990 for (n = enabled_capabilities;
1991 n < new_n_capabilities && n < n_capabilities; n++) {
1992 capabilities[n].disabled = rtsFalse;
1993 }
1994 enabled_capabilities = n;
1995
1996 if (new_n_capabilities > n_capabilities) {
1997 #if defined(TRACING)
1998 // Allocate eventlog buffers for the new capabilities. Note this
1999 // must be done before calling moreCapabilities(), because that
2000 // will emit events to add the new capabilities to capsets.
2001 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2002 #endif
2003
2004 // Resize the capabilities array
2005 // NB. after this, capabilities points somewhere new. Any pointers
2006 // of type (Capability *) are now invalid.
2007 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2008
2009 // update our own cap pointer
2010 cap = &capabilities[cap->no];
2011
2012 // Resize and update storage manager data structures
2013 storageAddCapabilities(n_capabilities, new_n_capabilities);
2014
2015 // Update (Capability *) refs in the Task manager.
2016 updateCapabilityRefs();
2017
2018 // Update (Capability *) refs from TSOs
2019 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2020 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2021 t->cap = &capabilities[t->cap->no];
2022 }
2023 }
2024 }
2025 }
2026
2027 // We're done: release the original Capabilities
2028 releaseAllCapabilities(cap,task);
2029
2030 // Start worker tasks on the new Capabilities
2031 startWorkerTasks(n_capabilities, new_n_capabilities);
2032
2033 // finally, update n_capabilities
2034 if (new_n_capabilities > n_capabilities) {
2035 n_capabilities = enabled_capabilities = new_n_capabilities;
2036 }
2037
2038 // We can't free the old array until now, because we access it
2039 // while updating pointers in updateCapabilityRefs().
2040 if (old_capabilities) {
2041 stgFree(old_capabilities);
2042 }
2043
2044 rts_unlock(cap);
2045
2046 #endif // THREADED_RTS
2047 }
2048
2049
2050
2051 /* ---------------------------------------------------------------------------
2052 * Delete all the threads in the system
2053 * ------------------------------------------------------------------------- */
2054
2055 static void
2056 deleteAllThreads ( Capability *cap )
2057 {
2058 // NOTE: only safe to call if we own all capabilities.
2059
2060 StgTSO* t, *next;
2061 nat g;
2062
2063 debugTrace(DEBUG_sched,"deleting all threads");
2064 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2065 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2066 next = t->global_link;
2067 deleteThread(cap,t);
2068 }
2069 }
2070
2071 // The run queue now contains a bunch of ThreadKilled threads. We
2072 // must not throw these away: the main thread(s) will be in there
2073 // somewhere, and the main scheduler loop has to deal with it.
2074 // Also, the run queue is the only thing keeping these threads from
2075 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2076
2077 #if !defined(THREADED_RTS)
2078 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2079 ASSERT(sleeping_queue == END_TSO_QUEUE);
2080 #endif
2081 }
2082
2083 /* -----------------------------------------------------------------------------
2084 Managing the suspended_ccalls list.
2085 Locks required: sched_mutex
2086 -------------------------------------------------------------------------- */
2087
2088 STATIC_INLINE void
2089 suspendTask (Capability *cap, Task *task)
2090 {
2091 InCall *incall;
2092
2093 incall = task->incall;
2094 ASSERT(incall->next == NULL && incall->prev == NULL);
2095 incall->next = cap->suspended_ccalls;
2096 incall->prev = NULL;
2097 if (cap->suspended_ccalls) {
2098 cap->suspended_ccalls->prev = incall;
2099 }
2100 cap->suspended_ccalls = incall;
2101 }
2102
2103 STATIC_INLINE void
2104 recoverSuspendedTask (Capability *cap, Task *task)
2105 {
2106 InCall *incall;
2107
2108 incall = task->incall;
2109 if (incall->prev) {
2110 incall->prev->next = incall->next;
2111 } else {
2112 ASSERT(cap->suspended_ccalls == incall);
2113 cap->suspended_ccalls = incall->next;
2114 }
2115 if (incall->next) {
2116 incall->next->prev = incall->prev;
2117 }
2118 incall->next = incall->prev = NULL;
2119 }
2120
2121 /* ---------------------------------------------------------------------------
2122 * Suspending & resuming Haskell threads.
2123 *
2124 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2125 * its capability before calling the C function. This allows another
2126 * task to pick up the capability and carry on running Haskell
2127 * threads. It also means that if the C call blocks, it won't lock
2128 * the whole system.
2129 *
2130 * The Haskell thread making the C call is put to sleep for the
2131 * duration of the call, on the suspended_ccalling_threads queue. We
2132 * give out a token to the task, which it can use to resume the thread
2133 * on return from the C function.
2134 *
2135 * If this is an interruptible C call, this means that the FFI call may be
2136 * unceremoniously terminated and should be scheduled on an
2137 * unbound worker thread.
2138 * ------------------------------------------------------------------------- */
2139
2140 void *
2141 suspendThread (StgRegTable *reg, rtsBool interruptible)
2142 {
2143 Capability *cap;
2144 int saved_errno;
2145 StgTSO *tso;
2146 Task *task;
2147 #if mingw32_HOST_OS
2148 StgWord32 saved_winerror;
2149 #endif
2150
2151 saved_errno = errno;
2152 #if mingw32_HOST_OS
2153 saved_winerror = GetLastError();
2154 #endif
2155
2156 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2157 */
2158 cap = regTableToCapability(reg);
2159
2160 task = cap->running_task;
2161 tso = cap->r.rCurrentTSO;
2162
2163 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2164
2165 // XXX this might not be necessary --SDM
2166 tso->what_next = ThreadRunGHC;
2167
2168 threadPaused(cap,tso);
2169
2170 if (interruptible) {
2171 tso->why_blocked = BlockedOnCCall_Interruptible;
2172 } else {
2173 tso->why_blocked = BlockedOnCCall;
2174 }
2175
2176 // Hand back capability
2177 task->incall->suspended_tso = tso;
2178 task->incall->suspended_cap = cap;
2179
2180 ACQUIRE_LOCK(&cap->lock);
2181
2182 suspendTask(cap,task);
2183 cap->in_haskell = rtsFalse;
2184 releaseCapability_(cap,rtsFalse);
2185
2186 RELEASE_LOCK(&cap->lock);
2187
2188 errno = saved_errno;
2189 #if mingw32_HOST_OS
2190 SetLastError(saved_winerror);
2191 #endif
2192 return task;
2193 }
2194
2195 StgRegTable *
2196 resumeThread (void *task_)
2197 {
2198 StgTSO *tso;
2199 InCall *incall;
2200 Capability *cap;
2201 Task *task = task_;
2202 int saved_errno;
2203 #if mingw32_HOST_OS
2204 StgWord32 saved_winerror;
2205 #endif
2206
2207 saved_errno = errno;
2208 #if mingw32_HOST_OS
2209 saved_winerror = GetLastError();
2210 #endif
2211
2212 incall = task->incall;
2213 cap = incall->suspended_cap;
2214 task->cap = cap;
2215
2216 // Wait for permission to re-enter the RTS with the result.
2217 waitForReturnCapability(&cap,task);
2218 // we might be on a different capability now... but if so, our
2219 // entry on the suspended_ccalls list will also have been
2220 // migrated.
2221
2222 // Remove the thread from the suspended list
2223 recoverSuspendedTask(cap,task);
2224
2225 tso = incall->suspended_tso;
2226 incall->suspended_tso = NULL;
2227 incall->suspended_cap = NULL;
2228 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2229
2230 traceEventRunThread(cap, tso);
2231
2232 /* Reset blocking status */
2233 tso->why_blocked = NotBlocked;
2234
2235 if ((tso->flags & TSO_BLOCKEX) == 0) {
2236 // avoid locking the TSO if we don't have to
2237 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2238 maybePerformBlockedException(cap,tso);
2239 }
2240 }
2241
2242 cap->r.rCurrentTSO = tso;
2243 cap->in_haskell = rtsTrue;
2244 errno = saved_errno;
2245 #if mingw32_HOST_OS
2246 SetLastError(saved_winerror);
2247 #endif
2248
2249 /* We might have GC'd, mark the TSO dirty again */
2250 dirty_TSO(cap,tso);
2251 dirty_STACK(cap,tso->stackobj);
2252
2253 IF_DEBUG(sanity, checkTSO(tso));
2254
2255 return &cap->r;
2256 }
2257
2258 /* ---------------------------------------------------------------------------
2259 * scheduleThread()
2260 *
2261 * scheduleThread puts a thread on the end of the runnable queue.
2262 * This will usually be done immediately after a thread is created.
2263 * The caller of scheduleThread must create the thread using e.g.
2264 * createThread and push an appropriate closure
2265 * on this thread's stack before the scheduler is invoked.
2266 * ------------------------------------------------------------------------ */
2267
2268 void
2269 scheduleThread(Capability *cap, StgTSO *tso)
2270 {
2271 // The thread goes at the *end* of the run-queue, to avoid possible
2272 // starvation of any threads already on the queue.
2273 appendToRunQueue(cap,tso);
2274 }
2275
2276 void
2277 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2278 {
2279 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2280 // move this thread from now on.
2281 #if defined(THREADED_RTS)
2282 cpu %= enabled_capabilities;
2283 if (cpu == cap->no) {
2284 appendToRunQueue(cap,tso);
2285 } else {
2286 migrateThread(cap, tso, &capabilities[cpu]);
2287 }
2288 #else
2289 appendToRunQueue(cap,tso);
2290 #endif
2291 }
2292
2293 void
2294 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2295 {
2296 Task *task;
2297 DEBUG_ONLY( StgThreadID id );
2298 Capability *cap;
2299
2300 cap = *pcap;
2301
2302 // We already created/initialised the Task
2303 task = cap->running_task;
2304
2305 // This TSO is now a bound thread; make the Task and TSO
2306 // point to each other.
2307 tso->bound = task->incall;
2308 tso->cap = cap;
2309
2310 task->incall->tso = tso;
2311 task->incall->ret = ret;
2312 task->incall->stat = NoStatus;
2313
2314 appendToRunQueue(cap,tso);
2315
2316 DEBUG_ONLY( id = tso->id );
2317 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2318
2319 cap = schedule(cap,task);
2320
2321 ASSERT(task->incall->stat != NoStatus);
2322 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2323
2324 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2325 *pcap = cap;
2326 }
2327
2328 /* ----------------------------------------------------------------------------
2329 * Starting Tasks
2330 * ------------------------------------------------------------------------- */
2331
2332 #if defined(THREADED_RTS)
2333 void scheduleWorker (Capability *cap, Task *task)
2334 {
2335 // schedule() runs without a lock.
2336 cap = schedule(cap,task);
2337
2338 // On exit from schedule(), we have a Capability, but possibly not
2339 // the same one we started with.
2340
2341 // During shutdown, the requirement is that after all the
2342 // Capabilities are shut down, all workers that are shutting down
2343 // have finished workerTaskStop(). This is why we hold on to
2344 // cap->lock until we've finished workerTaskStop() below.
2345 //
2346 // There may be workers still involved in foreign calls; those
2347 // will just block in waitForReturnCapability() because the
2348 // Capability has been shut down.
2349 //
2350 ACQUIRE_LOCK(&cap->lock);
2351 releaseCapability_(cap,rtsFalse);
2352 workerTaskStop(task);
2353 RELEASE_LOCK(&cap->lock);
2354 }
2355 #endif
2356
2357 /* ---------------------------------------------------------------------------
2358 * Start new worker tasks on Capabilities from--to
2359 * -------------------------------------------------------------------------- */
2360
2361 static void
2362 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2363 {
2364 #if defined(THREADED_RTS)
2365 nat i;
2366 Capability *cap;
2367
2368 for (i = from; i < to; i++) {
2369 cap = &capabilities[i];
2370 ACQUIRE_LOCK(&cap->lock);
2371 startWorkerTask(cap);
2372 RELEASE_LOCK(&cap->lock);
2373 }
2374 #endif
2375 }
2376
2377 /* ---------------------------------------------------------------------------
2378 * initScheduler()
2379 *
2380 * Initialise the scheduler. This resets all the queues - if the
2381 * queues contained any threads, they'll be garbage collected at the
2382 * next pass.
2383 *
2384 * ------------------------------------------------------------------------ */
2385
2386 void
2387 initScheduler(void)
2388 {
2389 #if !defined(THREADED_RTS)
2390 blocked_queue_hd = END_TSO_QUEUE;
2391 blocked_queue_tl = END_TSO_QUEUE;
2392 sleeping_queue = END_TSO_QUEUE;
2393 #endif
2394
2395 sched_state = SCHED_RUNNING;
2396 recent_activity = ACTIVITY_YES;
2397
2398 #if defined(THREADED_RTS)
2399 /* Initialise the mutex and condition variables used by
2400 * the scheduler. */
2401 initMutex(&sched_mutex);
2402 #endif
2403
2404 ACQUIRE_LOCK(&sched_mutex);
2405
2406 /* A capability holds the state a native thread needs in
2407 * order to execute STG code. At least one capability is
2408 * floating around (only THREADED_RTS builds have more than one).
2409 */
2410 initCapabilities();
2411
2412 initTaskManager();
2413
2414 /*
2415 * Eagerly start one worker to run each Capability, except for
2416 * Capability 0. The idea is that we're probably going to start a
2417 * bound thread on Capability 0 pretty soon, so we don't want a
2418 * worker task hogging it.
2419 */
2420 startWorkerTasks(1, n_capabilities);
2421
2422 RELEASE_LOCK(&sched_mutex);
2423
2424 }
2425
2426 void
2427 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2428 /* see Capability.c, shutdownCapability() */
2429 {
2430 Task *task = NULL;
2431
2432 task = newBoundTask();
2433
2434 // If we haven't killed all the threads yet, do it now.
2435 if (sched_state < SCHED_SHUTTING_DOWN) {
2436 sched_state = SCHED_INTERRUPTING;
2437 Capability *cap = task->cap;
2438 waitForReturnCapability(&cap,task);
2439 scheduleDoGC(&cap,task,rtsFalse);
2440 ASSERT(task->incall->tso == NULL);
2441 releaseCapability(cap);
2442 }
2443 sched_state = SCHED_SHUTTING_DOWN;
2444
2445 shutdownCapabilities(task, wait_foreign);
2446
2447 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2448 // n_failed_trygrab_idles, n_idle_caps);
2449
2450 boundTaskExiting(task);
2451 }
2452
2453 void
2454 freeScheduler( void )
2455 {
2456 nat still_running;
2457
2458 ACQUIRE_LOCK(&sched_mutex);
2459 still_running = freeTaskManager();
2460 // We can only free the Capabilities if there are no Tasks still
2461 // running. We might have a Task about to return from a foreign
2462 // call into waitForReturnCapability(), for example (actually,
2463 // this should be the *only* thing that a still-running Task can
2464 // do at this point, and it will block waiting for the
2465 // Capability).
2466 if (still_running == 0) {
2467 freeCapabilities();
2468 if (n_capabilities != 1) {
2469 stgFree(capabilities);
2470 }
2471 }
2472 RELEASE_LOCK(&sched_mutex);
2473 #if defined(THREADED_RTS)
2474 closeMutex(&sched_mutex);
2475 #endif
2476 }
2477
2478 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2479 void *user USED_IF_NOT_THREADS)
2480 {
2481 #if !defined(THREADED_RTS)
2482 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2483 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2484 evac(user, (StgClosure **)(void *)&sleeping_queue);
2485 #endif
2486 }
2487
2488 /* -----------------------------------------------------------------------------
2489 performGC
2490
2491 This is the interface to the garbage collector from Haskell land.
2492 We provide this so that external C code can allocate and garbage
2493 collect when called from Haskell via _ccall_GC.
2494 -------------------------------------------------------------------------- */
2495
2496 static void
2497 performGC_(rtsBool force_major)
2498 {
2499 Task *task;
2500 Capability *cap = NULL;
2501
2502 // We must grab a new Task here, because the existing Task may be
2503 // associated with a particular Capability, and chained onto the
2504 // suspended_ccalls queue.
2505 task = newBoundTask();
2506
2507 waitForReturnCapability(&cap,task);
2508 scheduleDoGC(&cap,task,force_major);
2509 releaseCapability(cap);
2510 boundTaskExiting(task);
2511 }
2512
2513 void
2514 performGC(void)
2515 {
2516 performGC_(rtsFalse);
2517 }
2518
2519 void
2520 performMajorGC(void)
2521 {
2522 performGC_(rtsTrue);
2523 }
2524
2525 /* ---------------------------------------------------------------------------
2526 Interrupt execution
2527 - usually called inside a signal handler so it mustn't do anything fancy.
2528 ------------------------------------------------------------------------ */
2529
2530 void
2531 interruptStgRts(void)
2532 {
2533 sched_state = SCHED_INTERRUPTING;
2534 interruptAllCapabilities();
2535 #if defined(THREADED_RTS)
2536 wakeUpRts();
2537 #endif
2538 }
2539
2540 /* -----------------------------------------------------------------------------
2541 Wake up the RTS
2542
2543 This function causes at least one OS thread to wake up and run the
2544 scheduler loop. It is invoked when the RTS might be deadlocked, or
2545 an external event has arrived that may need servicing (eg. a
2546 keyboard interrupt).
2547
2548 In the single-threaded RTS we don't do anything here; we only have
2549 one thread anyway, and the event that caused us to want to wake up
2550 will have interrupted any blocking system call in progress anyway.
2551 -------------------------------------------------------------------------- */
2552
2553 #if defined(THREADED_RTS)
2554 void wakeUpRts(void)
2555 {
2556 // This forces the IO Manager thread to wakeup, which will
2557 // in turn ensure that some OS thread wakes up and runs the
2558 // scheduler loop, which will cause a GC and deadlock check.
2559 ioManagerWakeup();
2560 }
2561 #endif
2562
2563 /* -----------------------------------------------------------------------------
2564 Deleting threads
2565
2566 This is used for interruption (^C) and forking, and corresponds to
2567 raising an exception but without letting the thread catch the
2568 exception.
2569 -------------------------------------------------------------------------- */
2570
2571 static void
2572 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2573 {
2574 // NOTE: must only be called on a TSO that we have exclusive
2575 // access to, because we will call throwToSingleThreaded() below.
2576 // The TSO must be on the run queue of the Capability we own, or
2577 // we must own all Capabilities.
2578
2579 if (tso->why_blocked != BlockedOnCCall &&
2580 tso->why_blocked != BlockedOnCCall_Interruptible) {
2581 throwToSingleThreaded(tso->cap,tso,NULL);
2582 }
2583 }
2584
2585 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2586 static void
2587 deleteThread_(Capability *cap, StgTSO *tso)
2588 { // for forkProcess only:
2589 // like deleteThread(), but we delete threads in foreign calls, too.
2590
2591 if (tso->why_blocked == BlockedOnCCall ||
2592 tso->why_blocked == BlockedOnCCall_Interruptible) {
2593 tso->what_next = ThreadKilled;
2594 appendToRunQueue(tso->cap, tso);
2595 } else {
2596 deleteThread(cap,tso);
2597 }
2598 }
2599 #endif
2600
2601 /* -----------------------------------------------------------------------------
2602 raiseExceptionHelper
2603
2604 This function is called by the raise# primitve, just so that we can
2605 move some of the tricky bits of raising an exception from C-- into
2606 C. Who knows, it might be a useful re-useable thing here too.
2607 -------------------------------------------------------------------------- */
2608
2609 StgWord
2610 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2611 {
2612 Capability *cap = regTableToCapability(reg);
2613 StgThunk *raise_closure = NULL;
2614 StgPtr p, next;
2615 StgRetInfoTable *info;
2616 //
2617 // This closure represents the expression 'raise# E' where E
2618 // is the exception raise. It is used to overwrite all the
2619 // thunks which are currently under evaluataion.
2620 //
2621
2622 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2623 // LDV profiling: stg_raise_info has THUNK as its closure
2624 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2625 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2626 // 1 does not cause any problem unless profiling is performed.
2627 // However, when LDV profiling goes on, we need to linearly scan
2628 // small object pool, where raise_closure is stored, so we should
2629 // use MIN_UPD_SIZE.
2630 //
2631 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2632 // sizeofW(StgClosure)+1);
2633 //
2634
2635 //
2636 // Walk up the stack, looking for the catch frame. On the way,
2637 // we update any closures pointed to from update frames with the
2638 // raise closure that we just built.
2639 //
2640 p = tso->stackobj->sp;
2641 while(1) {
2642 info = get_ret_itbl((StgClosure *)p);
2643 next = p + stack_frame_sizeW((StgClosure *)p);
2644 switch (info->i.type) {
2645
2646 case UPDATE_FRAME:
2647 // Only create raise_closure if we need to.
2648 if (raise_closure == NULL) {
2649 raise_closure =
2650 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2651 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2652 raise_closure->payload[0] = exception;
2653 }
2654 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2655 (StgClosure *)raise_closure);
2656 p = next;
2657 continue;
2658
2659 case ATOMICALLY_FRAME:
2660 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2661 tso->stackobj->sp = p;
2662 return ATOMICALLY_FRAME;
2663
2664 case CATCH_FRAME:
2665 tso->stackobj->sp = p;
2666 return CATCH_FRAME;
2667
2668 case CATCH_STM_FRAME:
2669 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2670 tso->stackobj->sp = p;
2671 return CATCH_STM_FRAME;
2672
2673 case UNDERFLOW_FRAME:
2674 tso->stackobj->sp = p;
2675 threadStackUnderflow(cap,tso);
2676 p = tso->stackobj->sp;
2677 continue;
2678
2679 case STOP_FRAME:
2680 tso->stackobj->sp = p;
2681 return STOP_FRAME;
2682
2683 case CATCH_RETRY_FRAME:
2684 default:
2685 p = next;
2686 continue;
2687 }
2688 }
2689 }
2690
2691
2692 /* -----------------------------------------------------------------------------
2693 findRetryFrameHelper
2694
2695 This function is called by the retry# primitive. It traverses the stack
2696 leaving tso->sp referring to the frame which should handle the retry.
2697
2698 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2699 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2700
2701 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2702 create) because retries are not considered to be exceptions, despite the
2703 similar implementation.
2704
2705 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2706 not be created within memory transactions.
2707 -------------------------------------------------------------------------- */
2708
2709 StgWord
2710 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2711 {
2712 StgPtr p, next;
2713 StgRetInfoTable *info;
2714
2715 p = tso->stackobj->sp;
2716 while (1) {
2717 info = get_ret_itbl((StgClosure *)p);
2718 next = p + stack_frame_sizeW((StgClosure *)p);
2719 switch (info->i.type) {
2720
2721 case ATOMICALLY_FRAME:
2722 debugTrace(DEBUG_stm,
2723 "found ATOMICALLY_FRAME at %p during retry", p);
2724 tso->stackobj->sp = p;
2725 return ATOMICALLY_FRAME;
2726
2727 case CATCH_RETRY_FRAME:
2728 debugTrace(DEBUG_stm,
2729 "found CATCH_RETRY_FRAME at %p during retrry", p);
2730 tso->stackobj->sp = p;
2731 return CATCH_RETRY_FRAME;
2732
2733 case CATCH_STM_FRAME: {
2734 StgTRecHeader *trec = tso -> trec;
2735 StgTRecHeader *outer = trec -> enclosing_trec;
2736 debugTrace(DEBUG_stm,
2737 "found CATCH_STM_FRAME at %p during retry", p);
2738 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2739 stmAbortTransaction(cap, trec);
2740 stmFreeAbortedTRec(cap, trec);
2741 tso -> trec = outer;
2742 p = next;
2743 continue;
2744 }
2745
2746 case UNDERFLOW_FRAME:
2747 threadStackUnderflow(cap,tso);
2748 p = tso->stackobj->sp;
2749 continue;
2750
2751 default:
2752 ASSERT(info->i.type != CATCH_FRAME);
2753 ASSERT(info->i.type != STOP_FRAME);
2754 p = next;
2755 continue;
2756 }
2757 }
2758 }
2759
2760 /* -----------------------------------------------------------------------------
2761 resurrectThreads is called after garbage collection on the list of
2762 threads found to be garbage. Each of these threads will be woken
2763 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2764 on an MVar, or NonTermination if the thread was blocked on a Black
2765 Hole.
2766
2767 Locks: assumes we hold *all* the capabilities.
2768 -------------------------------------------------------------------------- */
2769
2770 void
2771 resurrectThreads (StgTSO *threads)
2772 {
2773 StgTSO *tso, *next;
2774 Capability *cap;
2775 generation *gen;
2776
2777 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2778 next = tso->global_link;
2779
2780 gen = Bdescr((P_)tso)->gen;
2781 tso->global_link = gen->threads;
2782 gen->threads = tso;
2783
2784 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2785
2786 // Wake up the thread on the Capability it was last on
2787 cap = tso->cap;
2788
2789 switch (tso->why_blocked) {
2790 case BlockedOnMVar:
2791 /* Called by GC - sched_mutex lock is currently held. */
2792 throwToSingleThreaded(cap, tso,
2793 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2794 break;
2795 case BlockedOnBlackHole:
2796 throwToSingleThreaded(cap, tso,
2797 (StgClosure *)nonTermination_closure);
2798 break;
2799 case BlockedOnSTM:
2800 throwToSingleThreaded(cap, tso,
2801 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2802 break;
2803 case NotBlocked:
2804 /* This might happen if the thread was blocked on a black hole
2805 * belonging to a thread that we've just woken up (raiseAsync
2806 * can wake up threads, remember...).
2807 */
2808 continue;
2809 case BlockedOnMsgThrowTo:
2810 // This can happen if the target is masking, blocks on a
2811 // black hole, and then is found to be unreachable. In
2812 // this case, we want to let the target wake up and carry
2813 // on, and do nothing to this thread.
2814 continue;
2815 default:
2816 barf("resurrectThreads: thread blocked in a strange way: %d",
2817 tso->why_blocked);
2818 }
2819 }
2820 }