Merge branch 'master' of darcs.haskell.org:/srv/darcs//ghc
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability **pcap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability **cap);
149 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163
164 static void deleteThread (Capability *cap, StgTSO *tso);
165 static void deleteAllThreads (Capability *cap);
166
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability *cap, StgTSO *tso);
169 #endif
170
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
173
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
176
177 * out of heap space
178 * timer expires (thread yields)
179 * thread blocks
180 * thread ends
181 * stack overflow
182
183 GRAN version:
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
189
190 GUM version:
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
198
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
204
205 ------------------------------------------------------------------------ */
206
207 static Capability *
208 schedule (Capability *initialCapability, Task *task)
209 {
210 StgTSO *t;
211 Capability *cap;
212 StgThreadReturnCode ret;
213 nat prev_what_next;
214 rtsBool ready_to_gc;
215 #if defined(THREADED_RTS)
216 rtsBool first = rtsTrue;
217 #endif
218
219 cap = initialCapability;
220
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
224
225 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226
227 schedulePreLoop();
228
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
231
232 while (1) {
233
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
236 // call).
237 if (cap->in_haskell) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE);
241 }
242
243 // The interruption / shutdown sequence.
244 //
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
250 //
251 // So the sequence for ^C goes like this:
252 //
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
255 //
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
262 //
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
264 //
265 // * sched_state := SCHED_SHUTTING_DOWN
266 //
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
270 //
271 // * eventually all Capabilities will shut down, and the RTS can
272 // exit.
273 //
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
276
277 switch (sched_state) {
278 case SCHED_RUNNING:
279 break;
280 case SCHED_INTERRUPTING:
281 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap,task,rtsFalse);
284
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
288 // case now.
289 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
290 // fall through
291
292 case SCHED_SHUTTING_DOWN:
293 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
296 // the run queue.
297 if (!isBoundTask(task) && emptyRunQueue(cap)) {
298 return cap;
299 }
300 break;
301 default:
302 barf("sched_state: %d", sched_state);
303 }
304
305 scheduleFindWork(&cap);
306
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap,task);
310
311 scheduleDetectDeadlock(&cap,task);
312
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
318 //
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
321
322 #if defined(THREADED_RTS)
323 if (first)
324 {
325 // XXX: ToDo
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
328 // // door.
329 // first = rtsFalse;
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
331 }
332
333 scheduleYield(&cap,task);
334
335 if (emptyRunQueue(cap)) continue; // look for work again
336 #endif
337
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap) ) {
340 ASSERT(sched_state >= SCHED_INTERRUPTING);
341 }
342 #endif
343
344 //
345 // Get a thread to run
346 //
347 t = popRunQueue(cap);
348
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity,checkTSO(t));
352
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
356 {
357 InCall *bound = t->bound;
358
359 if (bound) {
360 if (bound->task == task) {
361 // yes, the Haskell thread is bound to the current native thread
362 } else {
363 debugTrace(DEBUG_sched,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t->id);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap,t);
368 continue;
369 }
370 } else {
371 // The thread we want to run is unbound.
372 if (task->incall->tso) {
373 debugTrace(DEBUG_sched,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t->id);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap,t);
379 continue;
380 }
381 }
382 }
383 #endif
384
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state >= SCHED_INTERRUPTING &&
390 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391 deleteThread(cap,t);
392 }
393
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
400 //
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
406 #ifdef THREADED_RTS
407 if (cap->disabled && !t->bound) {
408 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
409 migrateThread(cap, t, dest_cap);
410 continue;
411 }
412 #endif
413
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
416 * +RTS -C0
417 */
418 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
419 && !emptyThreadQueues(cap)) {
420 cap->context_switch = 1;
421 }
422
423 run_thread:
424
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
427 // that.
428 cap->r.rCurrentTSO = t;
429
430 startHeapProfTimer();
431
432 // ----------------------------------------------------------------------
433 // Run the current thread
434
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436 ASSERT(t->cap == cap);
437 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
438
439 prev_what_next = t->what_next;
440
441 errno = t->saved_errno;
442 #if mingw32_HOST_OS
443 SetLastError(t->saved_winerror);
444 #endif
445
446 // reset the interrupt flag before running Haskell code
447 cap->interrupt = 0;
448
449 cap->in_haskell = rtsTrue;
450 cap->idle = 0;
451
452 dirty_TSO(cap,t);
453 dirty_STACK(cap,t->stackobj);
454
455 #if defined(THREADED_RTS)
456 if (recent_activity == ACTIVITY_DONE_GC) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
459 nat prev;
460 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
461 if (prev == ACTIVITY_DONE_GC) {
462 startTimer();
463 }
464 } else if (recent_activity != ACTIVITY_INACTIVE) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
468 // wakeUpRts().
469 recent_activity = ACTIVITY_YES;
470 }
471 #endif
472
473 traceEventRunThread(cap, t);
474
475 switch (prev_what_next) {
476
477 case ThreadKilled:
478 case ThreadComplete:
479 /* Thread already finished, return to scheduler. */
480 ret = ThreadFinished;
481 break;
482
483 case ThreadRunGHC:
484 {
485 StgRegTable *r;
486 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
487 cap = regTableToCapability(r);
488 ret = r->rRet;
489 break;
490 }
491
492 case ThreadInterpret:
493 cap = interpretBCO(cap);
494 ret = cap->r.rRet;
495 break;
496
497 default:
498 barf("schedule: invalid what_next field");
499 }
500
501 cap->in_haskell = rtsFalse;
502
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t = cap->r.rCurrentTSO;
506
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t->saved_errno = errno;
511 #if mingw32_HOST_OS
512 // Similarly for Windows error code
513 t->saved_winerror = GetLastError();
514 #endif
515
516 if (ret == ThreadBlocked) {
517 if (t->why_blocked == BlockedOnBlackHole) {
518 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
519 traceEventStopThread(cap, t, t->why_blocked + 6,
520 owner != NULL ? owner->id : 0);
521 } else {
522 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
523 }
524 } else {
525 traceEventStopThread(cap, t, ret, 0);
526 }
527
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529 ASSERT(t->cap == cap);
530
531 // ----------------------------------------------------------------------
532
533 // Costs for the scheduler are assigned to CCS_SYSTEM
534 stopHeapProfTimer();
535 #if defined(PROFILING)
536 cap->r.rCCCS = CCS_SYSTEM;
537 #endif
538
539 schedulePostRunThread(cap,t);
540
541 ready_to_gc = rtsFalse;
542
543 switch (ret) {
544 case HeapOverflow:
545 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
546 break;
547
548 case StackOverflow:
549 // just adjust the stack for this thread, then pop it back
550 // on the run queue.
551 threadStackOverflow(cap, t);
552 pushOnRunQueue(cap,t);
553 break;
554
555 case ThreadYielding:
556 if (scheduleHandleYield(cap, t, prev_what_next)) {
557 // shortcut for switching between compiler/interpreter:
558 goto run_thread;
559 }
560 break;
561
562 case ThreadBlocked:
563 scheduleHandleThreadBlocked(t);
564 break;
565
566 case ThreadFinished:
567 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
569 break;
570
571 default:
572 barf("schedule: invalid thread return code %d", (int)ret);
573 }
574
575 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
576 scheduleDoGC(&cap,task,rtsFalse);
577 }
578 } /* end of while() */
579 }
580
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
584
585 void
586 removeFromRunQueue (Capability *cap, StgTSO *tso)
587 {
588 if (tso->block_info.prev == END_TSO_QUEUE) {
589 ASSERT(cap->run_queue_hd == tso);
590 cap->run_queue_hd = tso->_link;
591 } else {
592 setTSOLink(cap, tso->block_info.prev, tso->_link);
593 }
594 if (tso->_link == END_TSO_QUEUE) {
595 ASSERT(cap->run_queue_tl == tso);
596 cap->run_queue_tl = tso->block_info.prev;
597 } else {
598 setTSOPrev(cap, tso->_link, tso->block_info.prev);
599 }
600 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
601
602 IF_DEBUG(sanity, checkRunQueue(cap));
603 }
604
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
608
609 static void
610 schedulePreLoop(void)
611 {
612 // initialisation for scheduler - what cannot go into initScheduler()
613
614 #if defined(mingw32_HOST_OS) && defined(i386_HOST_ARCH)
615 win32AllocStack();
616 #endif
617 }
618
619 /* -----------------------------------------------------------------------------
620 * scheduleFindWork()
621 *
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
624
625 static void
626 scheduleFindWork (Capability **pcap)
627 {
628 scheduleStartSignalHandlers(*pcap);
629
630 scheduleProcessInbox(pcap);
631
632 scheduleCheckBlockedThreads(*pcap);
633
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
636 #endif
637 }
638
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability *cap, Task *task)
642 {
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC
645 // - another Task is returning from a foreign call
646 // - the thread at the head of the run queue cannot be run
647 // by this Task (it is bound to another Task, or it is unbound
648 // and this task it bound).
649 return (pending_sync ||
650 cap->returning_tasks_hd != NULL ||
651 (!emptyRunQueue(cap) && (task->incall->tso == NULL
652 ? cap->run_queue_hd->bound != NULL
653 : cap->run_queue_hd->bound != task->incall)));
654 }
655
656 // This is the single place where a Task goes to sleep. There are
657 // two reasons it might need to sleep:
658 // - there are no threads to run
659 // - we need to yield this Capability to someone else
660 // (see shouldYieldCapability())
661 //
662 // Careful: the scheduler loop is quite delicate. Make sure you run
663 // the tests in testsuite/concurrent (all ways) after modifying this,
664 // and also check the benchmarks in nofib/parallel for regressions.
665
666 static void
667 scheduleYield (Capability **pcap, Task *task)
668 {
669 Capability *cap = *pcap;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING))
677 return;
678
679 // otherwise yield (sleep), and keep yielding if necessary.
680 do {
681 yieldCapability(&cap,task);
682 }
683 while (shouldYieldCapability(cap,task));
684
685 // note there may still be no threads on the run queue at this
686 // point, the caller has to check.
687
688 *pcap = cap;
689 return;
690 }
691 #endif
692
693 /* -----------------------------------------------------------------------------
694 * schedulePushWork()
695 *
696 * Push work to other Capabilities if we have some.
697 * -------------------------------------------------------------------------- */
698
699 static void
700 schedulePushWork(Capability *cap USED_IF_THREADS,
701 Task *task USED_IF_THREADS)
702 {
703 /* following code not for PARALLEL_HASKELL. I kept the call general,
704 future GUM versions might use pushing in a distributed setup */
705 #if defined(THREADED_RTS)
706
707 Capability *free_caps[n_capabilities], *cap0;
708 nat i, n_free_caps;
709
710 // migration can be turned off with +RTS -qm
711 if (!RtsFlags.ParFlags.migrate) return;
712
713 // Check whether we have more threads on our run queue, or sparks
714 // in our pool, that we could hand to another Capability.
715 if (cap->run_queue_hd == END_TSO_QUEUE) {
716 if (sparkPoolSizeCap(cap) < 2) return;
717 } else {
718 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
719 sparkPoolSizeCap(cap) < 1) return;
720 }
721
722 // First grab as many free Capabilities as we can.
723 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
724 cap0 = &capabilities[i];
725 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726 if (!emptyRunQueue(cap0)
727 || cap0->returning_tasks_hd != NULL
728 || cap0->inbox != (Message*)END_TSO_QUEUE) {
729 // it already has some work, we just grabbed it at
730 // the wrong moment. Or maybe it's deadlocked!
731 releaseCapability(cap0);
732 } else {
733 free_caps[n_free_caps++] = cap0;
734 }
735 }
736 }
737
738 // we now have n_free_caps free capabilities stashed in
739 // free_caps[]. Share our run queue equally with them. This is
740 // probably the simplest thing we could do; improvements we might
741 // want to do include:
742 //
743 // - giving high priority to moving relatively new threads, on
744 // the gournds that they haven't had time to build up a
745 // working set in the cache on this CPU/Capability.
746 //
747 // - giving low priority to moving long-lived threads
748
749 if (n_free_caps > 0) {
750 StgTSO *prev, *t, *next;
751 #ifdef SPARK_PUSHING
752 rtsBool pushed_to_all;
753 #endif
754
755 debugTrace(DEBUG_sched,
756 "cap %d: %s and %d free capabilities, sharing...",
757 cap->no,
758 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
759 "excess threads on run queue":"sparks to share (>=2)",
760 n_free_caps);
761
762 i = 0;
763 #ifdef SPARK_PUSHING
764 pushed_to_all = rtsFalse;
765 #endif
766
767 if (cap->run_queue_hd != END_TSO_QUEUE) {
768 prev = cap->run_queue_hd;
769 t = prev->_link;
770 prev->_link = END_TSO_QUEUE;
771 for (; t != END_TSO_QUEUE; t = next) {
772 next = t->_link;
773 t->_link = END_TSO_QUEUE;
774 if (t->bound == task->incall // don't move my bound thread
775 || tsoLocked(t)) { // don't move a locked thread
776 setTSOLink(cap, prev, t);
777 setTSOPrev(cap, t, prev);
778 prev = t;
779 } else if (i == n_free_caps) {
780 #ifdef SPARK_PUSHING
781 pushed_to_all = rtsTrue;
782 #endif
783 i = 0;
784 // keep one for us
785 setTSOLink(cap, prev, t);
786 setTSOPrev(cap, t, prev);
787 prev = t;
788 } else {
789 appendToRunQueue(free_caps[i],t);
790
791 traceEventMigrateThread (cap, t, free_caps[i]->no);
792
793 if (t->bound) { t->bound->task->cap = free_caps[i]; }
794 t->cap = free_caps[i];
795 i++;
796 }
797 }
798 cap->run_queue_tl = prev;
799
800 IF_DEBUG(sanity, checkRunQueue(cap));
801 }
802
803 #ifdef SPARK_PUSHING
804 /* JB I left this code in place, it would work but is not necessary */
805
806 // If there are some free capabilities that we didn't push any
807 // threads to, then try to push a spark to each one.
808 if (!pushed_to_all) {
809 StgClosure *spark;
810 // i is the next free capability to push to
811 for (; i < n_free_caps; i++) {
812 if (emptySparkPoolCap(free_caps[i])) {
813 spark = tryStealSpark(cap->sparks);
814 if (spark != NULL) {
815 /* TODO: if anyone wants to re-enable this code then
816 * they must consider the fizzledSpark(spark) case
817 * and update the per-cap spark statistics.
818 */
819 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
820
821 traceEventStealSpark(free_caps[i], t, cap->no);
822
823 newSpark(&(free_caps[i]->r), spark);
824 }
825 }
826 }
827 }
828 #endif /* SPARK_PUSHING */
829
830 // release the capabilities
831 for (i = 0; i < n_free_caps; i++) {
832 task->cap = free_caps[i];
833 releaseAndWakeupCapability(free_caps[i]);
834 }
835 }
836 task->cap = cap; // reset to point to our Capability.
837
838 #endif /* THREADED_RTS */
839
840 }
841
842 /* ----------------------------------------------------------------------------
843 * Start any pending signal handlers
844 * ------------------------------------------------------------------------- */
845
846 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
847 static void
848 scheduleStartSignalHandlers(Capability *cap)
849 {
850 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
851 // safe outside the lock
852 startSignalHandlers(cap);
853 }
854 }
855 #else
856 static void
857 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
858 {
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863 * Check for blocked threads that can be woken up.
864 * ------------------------------------------------------------------------- */
865
866 static void
867 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
868 {
869 #if !defined(THREADED_RTS)
870 //
871 // Check whether any waiting threads need to be woken up. If the
872 // run queue is empty, and there are no other tasks running, we
873 // can wait indefinitely for something to happen.
874 //
875 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
876 {
877 awaitEvent (emptyRunQueue(cap));
878 }
879 #endif
880 }
881
882 /* ----------------------------------------------------------------------------
883 * Detect deadlock conditions and attempt to resolve them.
884 * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleDetectDeadlock (Capability **pcap, Task *task)
888 {
889 Capability *cap = *pcap;
890 /*
891 * Detect deadlock: when we have no threads to run, there are no
892 * threads blocked, waiting for I/O, or sleeping, and all the
893 * other tasks are waiting for work, we must have a deadlock of
894 * some description.
895 */
896 if ( emptyThreadQueues(cap) )
897 {
898 #if defined(THREADED_RTS)
899 /*
900 * In the threaded RTS, we only check for deadlock if there
901 * has been no activity in a complete timeslice. This means
902 * we won't eagerly start a full GC just because we don't have
903 * any threads to run currently.
904 */
905 if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910 // Garbage collection can release some new threads due to
911 // either (a) finalizers or (b) threads resurrected because
912 // they are unreachable and will therefore be sent an
913 // exception. Any threads thus released will be immediately
914 // runnable.
915 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
916 cap = *pcap;
917 // when force_major == rtsTrue. scheduleDoGC sets
918 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
919 // signal.
920
921 if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924 /* If we have user-installed signal handlers, then wait
925 * for signals to arrive rather then bombing out with a
926 * deadlock.
927 */
928 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929 debugTrace(DEBUG_sched,
930 "still deadlocked, waiting for signals...");
931
932 awaitUserSignals();
933
934 if (signals_pending()) {
935 startSignalHandlers(cap);
936 }
937
938 // either we have threads to run, or we were interrupted:
939 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941 return;
942 }
943 #endif
944
945 #if !defined(THREADED_RTS)
946 /* Probably a real deadlock. Send the current main thread the
947 * Deadlock exception.
948 */
949 if (task->incall->tso) {
950 switch (task->incall->tso->why_blocked) {
951 case BlockedOnSTM:
952 case BlockedOnBlackHole:
953 case BlockedOnMsgThrowTo:
954 case BlockedOnMVar:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static void
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches != END_BF_QUEUE) {
979 processFetches();
980 }
981 # endif
982
983 if (RtsFlags.ParFlags.BufferTime) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
986 sendOldBuffers();
987 }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
994
995 static void
996 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
997 {
998 #if defined(THREADED_RTS)
999 Message *m, *next;
1000 int r;
1001 Capability *cap = *pcap;
1002
1003 while (!emptyInbox(cap)) {
1004 if (cap->r.rCurrentNursery->link == NULL ||
1005 g0->n_new_large_words >= large_alloc_lim) {
1006 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1007 cap = *pcap;
1008 }
1009
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1014 //
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r = TRY_ACQUIRE_LOCK(&cap->lock);
1021 if (r != 0) return;
1022
1023 m = cap->inbox;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025
1026 RELEASE_LOCK(&cap->lock);
1027
1028 while (m != (Message*)END_TSO_QUEUE) {
1029 next = m->link;
1030 executeMessage(cap, m);
1031 m = next;
1032 }
1033 }
1034 #endif
1035 }
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions (t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 /* some statistics gathering in the parallel case */
1086 }
1087
1088 /* -----------------------------------------------------------------------------
1089 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1090 * -------------------------------------------------------------------------- */
1091
1092 static rtsBool
1093 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1094 {
1095 // did the task ask for a large block?
1096 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1097 // if so, get one and push it on the front of the nursery.
1098 bdescr *bd;
1099 lnat blocks;
1100
1101 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1102
1103 if (blocks > BLOCKS_PER_MBLOCK) {
1104 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1105 }
1106
1107 debugTrace(DEBUG_sched,
1108 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1109 (long)t->id, what_next_strs[t->what_next], blocks);
1110
1111 // don't do this if the nursery is (nearly) full, we'll GC first.
1112 if (cap->r.rCurrentNursery->link != NULL ||
1113 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1114 // if the nursery has only one block.
1115
1116 bd = allocGroup_lock(blocks);
1117 cap->r.rNursery->n_blocks += blocks;
1118
1119 // link the new group into the list
1120 bd->link = cap->r.rCurrentNursery;
1121 bd->u.back = cap->r.rCurrentNursery->u.back;
1122 if (cap->r.rCurrentNursery->u.back != NULL) {
1123 cap->r.rCurrentNursery->u.back->link = bd;
1124 } else {
1125 cap->r.rNursery->blocks = bd;
1126 }
1127 cap->r.rCurrentNursery->u.back = bd;
1128
1129 // initialise it as a nursery block. We initialise the
1130 // step, gen_no, and flags field of *every* sub-block in
1131 // this large block, because this is easier than making
1132 // sure that we always find the block head of a large
1133 // block whenever we call Bdescr() (eg. evacuate() and
1134 // isAlive() in the GC would both have to do this, at
1135 // least).
1136 {
1137 bdescr *x;
1138 for (x = bd; x < bd + blocks; x++) {
1139 initBdescr(x,g0,g0);
1140 x->free = x->start;
1141 x->flags = 0;
1142 }
1143 }
1144
1145 // This assert can be a killer if the app is doing lots
1146 // of large block allocations.
1147 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1148
1149 // now update the nursery to point to the new block
1150 cap->r.rCurrentNursery = bd;
1151
1152 // we might be unlucky and have another thread get on the
1153 // run queue before us and steal the large block, but in that
1154 // case the thread will just end up requesting another large
1155 // block.
1156 pushOnRunQueue(cap,t);
1157 return rtsFalse; /* not actually GC'ing */
1158 }
1159 }
1160
1161 if (cap->r.rHpLim == NULL || cap->context_switch) {
1162 // Sometimes we miss a context switch, e.g. when calling
1163 // primitives in a tight loop, MAYBE_GC() doesn't check the
1164 // context switch flag, and we end up waiting for a GC.
1165 // See #1984, and concurrent/should_run/1984
1166 cap->context_switch = 0;
1167 appendToRunQueue(cap,t);
1168 } else {
1169 pushOnRunQueue(cap,t);
1170 }
1171 return rtsTrue;
1172 /* actual GC is done at the end of the while loop in schedule() */
1173 }
1174
1175 /* -----------------------------------------------------------------------------
1176 * Handle a thread that returned to the scheduler with ThreadYielding
1177 * -------------------------------------------------------------------------- */
1178
1179 static rtsBool
1180 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1181 {
1182 /* put the thread back on the run queue. Then, if we're ready to
1183 * GC, check whether this is the last task to stop. If so, wake
1184 * up the GC thread. getThread will block during a GC until the
1185 * GC is finished.
1186 */
1187
1188 ASSERT(t->_link == END_TSO_QUEUE);
1189
1190 // Shortcut if we're just switching evaluators: don't bother
1191 // doing stack squeezing (which can be expensive), just run the
1192 // thread.
1193 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1194 debugTrace(DEBUG_sched,
1195 "--<< thread %ld (%s) stopped to switch evaluators",
1196 (long)t->id, what_next_strs[t->what_next]);
1197 return rtsTrue;
1198 }
1199
1200 // Reset the context switch flag. We don't do this just before
1201 // running the thread, because that would mean we would lose ticks
1202 // during GC, which can lead to unfair scheduling (a thread hogs
1203 // the CPU because the tick always arrives during GC). This way
1204 // penalises threads that do a lot of allocation, but that seems
1205 // better than the alternative.
1206 if (cap->context_switch != 0) {
1207 cap->context_switch = 0;
1208 appendToRunQueue(cap,t);
1209 } else {
1210 pushOnRunQueue(cap,t);
1211 }
1212
1213 IF_DEBUG(sanity,
1214 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1215 checkTSO(t));
1216
1217 return rtsFalse;
1218 }
1219
1220 /* -----------------------------------------------------------------------------
1221 * Handle a thread that returned to the scheduler with ThreadBlocked
1222 * -------------------------------------------------------------------------- */
1223
1224 static void
1225 scheduleHandleThreadBlocked( StgTSO *t
1226 #if !defined(DEBUG)
1227 STG_UNUSED
1228 #endif
1229 )
1230 {
1231
1232 // We don't need to do anything. The thread is blocked, and it
1233 // has tidied up its stack and placed itself on whatever queue
1234 // it needs to be on.
1235
1236 // ASSERT(t->why_blocked != NotBlocked);
1237 // Not true: for example,
1238 // - the thread may have woken itself up already, because
1239 // threadPaused() might have raised a blocked throwTo
1240 // exception, see maybePerformBlockedException().
1241
1242 #ifdef DEBUG
1243 traceThreadStatus(DEBUG_sched, t);
1244 #endif
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248 * Handle a thread that returned to the scheduler with ThreadFinished
1249 * -------------------------------------------------------------------------- */
1250
1251 static rtsBool
1252 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1253 {
1254 /* Need to check whether this was a main thread, and if so,
1255 * return with the return value.
1256 *
1257 * We also end up here if the thread kills itself with an
1258 * uncaught exception, see Exception.cmm.
1259 */
1260
1261 // blocked exceptions can now complete, even if the thread was in
1262 // blocked mode (see #2910).
1263 awakenBlockedExceptionQueue (cap, t);
1264
1265 //
1266 // Check whether the thread that just completed was a bound
1267 // thread, and if so return with the result.
1268 //
1269 // There is an assumption here that all thread completion goes
1270 // through this point; we need to make sure that if a thread
1271 // ends up in the ThreadKilled state, that it stays on the run
1272 // queue so it can be dealt with here.
1273 //
1274
1275 if (t->bound) {
1276
1277 if (t->bound != task->incall) {
1278 #if !defined(THREADED_RTS)
1279 // Must be a bound thread that is not the topmost one. Leave
1280 // it on the run queue until the stack has unwound to the
1281 // point where we can deal with this. Leaving it on the run
1282 // queue also ensures that the garbage collector knows about
1283 // this thread and its return value (it gets dropped from the
1284 // step->threads list so there's no other way to find it).
1285 appendToRunQueue(cap,t);
1286 return rtsFalse;
1287 #else
1288 // this cannot happen in the threaded RTS, because a
1289 // bound thread can only be run by the appropriate Task.
1290 barf("finished bound thread that isn't mine");
1291 #endif
1292 }
1293
1294 ASSERT(task->incall->tso == t);
1295
1296 if (t->what_next == ThreadComplete) {
1297 if (task->incall->ret) {
1298 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1299 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1300 }
1301 task->incall->stat = Success;
1302 } else {
1303 if (task->incall->ret) {
1304 *(task->incall->ret) = NULL;
1305 }
1306 if (sched_state >= SCHED_INTERRUPTING) {
1307 if (heap_overflow) {
1308 task->incall->stat = HeapExhausted;
1309 } else {
1310 task->incall->stat = Interrupted;
1311 }
1312 } else {
1313 task->incall->stat = Killed;
1314 }
1315 }
1316 #ifdef DEBUG
1317 removeThreadLabel((StgWord)task->incall->tso->id);
1318 #endif
1319
1320 // We no longer consider this thread and task to be bound to
1321 // each other. The TSO lives on until it is GC'd, but the
1322 // task is about to be released by the caller, and we don't
1323 // want anyone following the pointer from the TSO to the
1324 // defunct task (which might have already been
1325 // re-used). This was a real bug: the GC updated
1326 // tso->bound->tso which lead to a deadlock.
1327 t->bound = NULL;
1328 task->incall->tso = NULL;
1329
1330 return rtsTrue; // tells schedule() to return
1331 }
1332
1333 return rtsFalse;
1334 }
1335
1336 /* -----------------------------------------------------------------------------
1337 * Perform a heap census
1338 * -------------------------------------------------------------------------- */
1339
1340 static rtsBool
1341 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1342 {
1343 // When we have +RTS -i0 and we're heap profiling, do a census at
1344 // every GC. This lets us get repeatable runs for debugging.
1345 if (performHeapProfile ||
1346 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1347 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1348 return rtsTrue;
1349 } else {
1350 return rtsFalse;
1351 }
1352 }
1353
1354 /* -----------------------------------------------------------------------------
1355 * Start a synchronisation of all capabilities
1356 * -------------------------------------------------------------------------- */
1357
1358 // Returns:
1359 // 0 if we successfully got a sync
1360 // non-0 if there was another sync request in progress,
1361 // and we yielded to it. The value returned is the
1362 // type of the other sync request.
1363 //
1364 #if defined(THREADED_RTS)
1365 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1366 {
1367 nat prev_pending_sync;
1368
1369 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1370
1371 if (prev_pending_sync)
1372 {
1373 do {
1374 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1375 prev_pending_sync);
1376 ASSERT(*pcap);
1377 yieldCapability(pcap,task);
1378 } while (pending_sync);
1379 return prev_pending_sync; // NOTE: task->cap might have changed now
1380 }
1381 else
1382 {
1383 return 0;
1384 }
1385 }
1386
1387 //
1388 // Grab all the capabilities except the one we already hold. Used
1389 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1390 // before a fork (SYNC_OTHER).
1391 //
1392 // Only call this after requestSync(), otherwise a deadlock might
1393 // ensue if another thread is trying to synchronise.
1394 //
1395 static void acquireAllCapabilities(Capability *cap, Task *task)
1396 {
1397 Capability *tmpcap;
1398 nat i;
1399
1400 for (i=0; i < n_capabilities; i++) {
1401 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1402 tmpcap = &capabilities[i];
1403 if (tmpcap != cap) {
1404 // we better hope this task doesn't get migrated to
1405 // another Capability while we're waiting for this one.
1406 // It won't, because load balancing happens while we have
1407 // all the Capabilities, but even so it's a slightly
1408 // unsavoury invariant.
1409 task->cap = tmpcap;
1410 waitForReturnCapability(&tmpcap, task);
1411 if (tmpcap->no != i) {
1412 barf("acquireAllCapabilities: got the wrong capability");
1413 }
1414 }
1415 }
1416 task->cap = cap;
1417 }
1418
1419 static void releaseAllCapabilities(Capability *cap, Task *task)
1420 {
1421 nat i;
1422
1423 for (i = 0; i < n_capabilities; i++) {
1424 if (cap->no != i) {
1425 task->cap = &capabilities[i];
1426 releaseCapability(&capabilities[i]);
1427 }
1428 }
1429 task->cap = cap;
1430 }
1431 #endif
1432
1433 /* -----------------------------------------------------------------------------
1434 * Perform a garbage collection if necessary
1435 * -------------------------------------------------------------------------- */
1436
1437 static void
1438 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1439 rtsBool force_major)
1440 {
1441 Capability *cap = *pcap;
1442 rtsBool heap_census;
1443 #ifdef THREADED_RTS
1444 rtsBool idle_cap[n_capabilities];
1445 rtsBool gc_type;
1446 nat i, sync;
1447 StgTSO *tso;
1448 #endif
1449
1450 if (sched_state == SCHED_SHUTTING_DOWN) {
1451 // The final GC has already been done, and the system is
1452 // shutting down. We'll probably deadlock if we try to GC
1453 // now.
1454 return;
1455 }
1456
1457 #ifdef THREADED_RTS
1458 if (sched_state < SCHED_INTERRUPTING
1459 && RtsFlags.ParFlags.parGcEnabled
1460 && N >= RtsFlags.ParFlags.parGcGen
1461 && ! oldest_gen->mark)
1462 {
1463 gc_type = SYNC_GC_PAR;
1464 } else {
1465 gc_type = SYNC_GC_SEQ;
1466 }
1467
1468 // In order to GC, there must be no threads running Haskell code.
1469 // Therefore, the GC thread needs to hold *all* the capabilities,
1470 // and release them after the GC has completed.
1471 //
1472 // This seems to be the simplest way: previous attempts involved
1473 // making all the threads with capabilities give up their
1474 // capabilities and sleep except for the *last* one, which
1475 // actually did the GC. But it's quite hard to arrange for all
1476 // the other tasks to sleep and stay asleep.
1477 //
1478
1479 /* Other capabilities are prevented from running yet more Haskell
1480 threads if pending_sync is set. Tested inside
1481 yieldCapability() and releaseCapability() in Capability.c */
1482
1483 do {
1484 sync = requestSync(pcap, task, gc_type);
1485 cap = *pcap;
1486 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1487 // someone else had a pending sync request for a GC, so
1488 // let's assume GC has been done and we don't need to GC
1489 // again.
1490 return;
1491 }
1492 if (sched_state == SCHED_SHUTTING_DOWN) {
1493 // The scheduler might now be shutting down. We tested
1494 // this above, but it might have become true since then as
1495 // we yielded the capability in requestSync().
1496 return;
1497 }
1498 } while (sync);
1499
1500 interruptAllCapabilities();
1501
1502 // The final shutdown GC is always single-threaded, because it's
1503 // possible that some of the Capabilities have no worker threads.
1504
1505 if (gc_type == SYNC_GC_SEQ)
1506 {
1507 traceEventRequestSeqGc(cap);
1508 }
1509 else
1510 {
1511 traceEventRequestParGc(cap);
1512 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1513 }
1514
1515 if (gc_type == SYNC_GC_SEQ)
1516 {
1517 // single-threaded GC: grab all the capabilities
1518 acquireAllCapabilities(cap,task);
1519 }
1520 else
1521 {
1522 // If we are load-balancing collections in this
1523 // generation, then we require all GC threads to participate
1524 // in the collection. Otherwise, we only require active
1525 // threads to participate, and we set gc_threads[i]->idle for
1526 // any idle capabilities. The rationale here is that waking
1527 // up an idle Capability takes much longer than just doing any
1528 // GC work on its behalf.
1529
1530 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1531 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1532 N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1533 for (i=0; i < n_capabilities; i++) {
1534 if (capabilities[i].disabled) {
1535 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1536 } else {
1537 idle_cap[i] = rtsFalse;
1538 }
1539 }
1540 } else {
1541 for (i=0; i < n_capabilities; i++) {
1542 if (capabilities[i].disabled) {
1543 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1544 } else if (i == cap->no ||
1545 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1546 idle_cap[i] = rtsFalse;
1547 } else {
1548 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1549 if (!idle_cap[i]) {
1550 n_failed_trygrab_idles++;
1551 } else {
1552 n_idle_caps++;
1553 }
1554 }
1555 }
1556 }
1557
1558 // We set the gc_thread[i]->idle flag if that
1559 // capability/thread is not participating in this collection.
1560 // We also keep a local record of which capabilities are idle
1561 // in idle_cap[], because scheduleDoGC() is re-entrant:
1562 // another thread might start a GC as soon as we've finished
1563 // this one, and thus the gc_thread[]->idle flags are invalid
1564 // as soon as we release any threads after GC. Getting this
1565 // wrong leads to a rare and hard to debug deadlock!
1566
1567 for (i=0; i < n_capabilities; i++) {
1568 gc_threads[i]->idle = idle_cap[i];
1569 capabilities[i].idle++;
1570 }
1571
1572 // For all capabilities participating in this GC, wait until
1573 // they have stopped mutating and are standing by for GC.
1574 waitForGcThreads(cap);
1575
1576 #if defined(THREADED_RTS)
1577 // Stable point where we can do a global check on our spark counters
1578 ASSERT(checkSparkCountInvariant());
1579 #endif
1580 }
1581
1582 #endif
1583
1584 IF_DEBUG(scheduler, printAllThreads());
1585
1586 delete_threads_and_gc:
1587 /*
1588 * We now have all the capabilities; if we're in an interrupting
1589 * state, then we should take the opportunity to delete all the
1590 * threads in the system.
1591 */
1592 if (sched_state == SCHED_INTERRUPTING) {
1593 deleteAllThreads(cap);
1594 #if defined(THREADED_RTS)
1595 // Discard all the sparks from every Capability. Why?
1596 // They'll probably be GC'd anyway since we've killed all the
1597 // threads. It just avoids the GC having to do any work to
1598 // figure out that any remaining sparks are garbage.
1599 for (i = 0; i < n_capabilities; i++) {
1600 capabilities[i].spark_stats.gcd +=
1601 sparkPoolSize(capabilities[i].sparks);
1602 // No race here since all Caps are stopped.
1603 discardSparksCap(&capabilities[i]);
1604 }
1605 #endif
1606 sched_state = SCHED_SHUTTING_DOWN;
1607 }
1608
1609 /*
1610 * When there are disabled capabilities, we want to migrate any
1611 * threads away from them. Normally this happens in the
1612 * scheduler's loop, but only for unbound threads - it's really
1613 * hard for a bound thread to migrate itself. So we have another
1614 * go here.
1615 */
1616 #if defined(THREADED_RTS)
1617 for (i = enabled_capabilities; i < n_capabilities; i++) {
1618 Capability *tmp_cap, *dest_cap;
1619 tmp_cap = &capabilities[i];
1620 ASSERT(tmp_cap->disabled);
1621 if (i != cap->no) {
1622 dest_cap = &capabilities[i % enabled_capabilities];
1623 while (!emptyRunQueue(tmp_cap)) {
1624 tso = popRunQueue(tmp_cap);
1625 migrateThread(tmp_cap, tso, dest_cap);
1626 if (tso->bound) { tso->bound->task->cap = dest_cap; }
1627 }
1628 }
1629 }
1630 #endif
1631
1632 heap_census = scheduleNeedHeapProfile(rtsTrue);
1633
1634 traceEventGcStart(cap);
1635 #if defined(THREADED_RTS)
1636 // reset pending_sync *before* GC, so that when the GC threads
1637 // emerge they don't immediately re-enter the GC.
1638 pending_sync = 0;
1639 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1640 #else
1641 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1642 #endif
1643 traceEventGcEnd(cap);
1644
1645 traceSparkCounters(cap);
1646
1647 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1648 {
1649 // We are doing a GC because the system has been idle for a
1650 // timeslice and we need to check for deadlock. Record the
1651 // fact that we've done a GC and turn off the timer signal;
1652 // it will get re-enabled if we run any threads after the GC.
1653 recent_activity = ACTIVITY_DONE_GC;
1654 stopTimer();
1655 }
1656 else
1657 {
1658 // the GC might have taken long enough for the timer to set
1659 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1660 // necessarily deadlocked:
1661 recent_activity = ACTIVITY_YES;
1662 }
1663
1664 #if defined(THREADED_RTS)
1665 // Stable point where we can do a global check on our spark counters
1666 ASSERT(checkSparkCountInvariant());
1667 #endif
1668
1669 // The heap census itself is done during GarbageCollect().
1670 if (heap_census) {
1671 performHeapProfile = rtsFalse;
1672 }
1673
1674 #if defined(THREADED_RTS)
1675 if (gc_type == SYNC_GC_PAR)
1676 {
1677 releaseGCThreads(cap);
1678 for (i = 0; i < n_capabilities; i++) {
1679 if (i != cap->no) {
1680 if (idle_cap[i]) {
1681 ASSERT(capabilities[i].running_task == task);
1682 task->cap = &capabilities[i];
1683 releaseCapability(&capabilities[i]);
1684 } else {
1685 ASSERT(capabilities[i].running_task != task);
1686 }
1687 }
1688 }
1689 task->cap = cap;
1690 }
1691 #endif
1692
1693 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1694 // GC set the heap_overflow flag, so we should proceed with
1695 // an orderly shutdown now. Ultimately we want the main
1696 // thread to return to its caller with HeapExhausted, at which
1697 // point the caller should call hs_exit(). The first step is
1698 // to delete all the threads.
1699 //
1700 // Another way to do this would be to raise an exception in
1701 // the main thread, which we really should do because it gives
1702 // the program a chance to clean up. But how do we find the
1703 // main thread? It should presumably be the same one that
1704 // gets ^C exceptions, but that's all done on the Haskell side
1705 // (GHC.TopHandler).
1706 sched_state = SCHED_INTERRUPTING;
1707 goto delete_threads_and_gc;
1708 }
1709
1710 #ifdef SPARKBALANCE
1711 /* JB
1712 Once we are all together... this would be the place to balance all
1713 spark pools. No concurrent stealing or adding of new sparks can
1714 occur. Should be defined in Sparks.c. */
1715 balanceSparkPoolsCaps(n_capabilities, capabilities);
1716 #endif
1717
1718 #if defined(THREADED_RTS)
1719 if (gc_type == SYNC_GC_SEQ) {
1720 // release our stash of capabilities.
1721 releaseAllCapabilities(cap, task);
1722 }
1723 #endif
1724
1725 return;
1726 }
1727
1728 /* ---------------------------------------------------------------------------
1729 * Singleton fork(). Do not copy any running threads.
1730 * ------------------------------------------------------------------------- */
1731
1732 pid_t
1733 forkProcess(HsStablePtr *entry
1734 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1735 STG_UNUSED
1736 #endif
1737 )
1738 {
1739 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1740 pid_t pid;
1741 StgTSO* t,*next;
1742 Capability *cap;
1743 nat g;
1744 Task *task = NULL;
1745 nat i;
1746 #ifdef THREADED_RTS
1747 nat sync;
1748 #endif
1749
1750 debugTrace(DEBUG_sched, "forking!");
1751
1752 task = newBoundTask();
1753
1754 cap = NULL;
1755 waitForReturnCapability(&cap, task);
1756
1757 #ifdef THREADED_RTS
1758 do {
1759 sync = requestSync(&cap, task, SYNC_OTHER);
1760 } while (sync);
1761
1762 acquireAllCapabilities(cap,task);
1763
1764 pending_sync = 0;
1765 #endif
1766
1767 // no funny business: hold locks while we fork, otherwise if some
1768 // other thread is holding a lock when the fork happens, the data
1769 // structure protected by the lock will forever be in an
1770 // inconsistent state in the child. See also #1391.
1771 ACQUIRE_LOCK(&sched_mutex);
1772 ACQUIRE_LOCK(&sm_mutex);
1773 ACQUIRE_LOCK(&stable_mutex);
1774 ACQUIRE_LOCK(&task->lock);
1775
1776 for (i=0; i < n_capabilities; i++) {
1777 ACQUIRE_LOCK(&capabilities[i].lock);
1778 }
1779
1780 stopTimer(); // See #4074
1781
1782 #if defined(TRACING)
1783 flushEventLog(); // so that child won't inherit dirty file buffers
1784 #endif
1785
1786 pid = fork();
1787
1788 if (pid) { // parent
1789
1790 startTimer(); // #4074
1791
1792 RELEASE_LOCK(&sched_mutex);
1793 RELEASE_LOCK(&sm_mutex);
1794 RELEASE_LOCK(&stable_mutex);
1795 RELEASE_LOCK(&task->lock);
1796
1797 for (i=0; i < n_capabilities; i++) {
1798 releaseCapability_(&capabilities[i],rtsFalse);
1799 RELEASE_LOCK(&capabilities[i].lock);
1800 }
1801 boundTaskExiting(task);
1802
1803 // just return the pid
1804 return pid;
1805
1806 } else { // child
1807
1808 #if defined(THREADED_RTS)
1809 initMutex(&sched_mutex);
1810 initMutex(&sm_mutex);
1811 initMutex(&stable_mutex);
1812 initMutex(&task->lock);
1813
1814 for (i=0; i < n_capabilities; i++) {
1815 initMutex(&capabilities[i].lock);
1816 }
1817 #endif
1818
1819 #ifdef TRACING
1820 resetTracing();
1821 #endif
1822
1823 // Now, all OS threads except the thread that forked are
1824 // stopped. We need to stop all Haskell threads, including
1825 // those involved in foreign calls. Also we need to delete
1826 // all Tasks, because they correspond to OS threads that are
1827 // now gone.
1828
1829 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1830 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1831 next = t->global_link;
1832 // don't allow threads to catch the ThreadKilled
1833 // exception, but we do want to raiseAsync() because these
1834 // threads may be evaluating thunks that we need later.
1835 deleteThread_(t->cap,t);
1836
1837 // stop the GC from updating the InCall to point to
1838 // the TSO. This is only necessary because the
1839 // OSThread bound to the TSO has been killed, and
1840 // won't get a chance to exit in the usual way (see
1841 // also scheduleHandleThreadFinished).
1842 t->bound = NULL;
1843 }
1844 }
1845
1846 discardTasksExcept(task);
1847
1848 for (i=0; i < n_capabilities; i++) {
1849 cap = &capabilities[i];
1850
1851 // Empty the run queue. It seems tempting to let all the
1852 // killed threads stay on the run queue as zombies to be
1853 // cleaned up later, but some of them may correspond to
1854 // bound threads for which the corresponding Task does not
1855 // exist.
1856 cap->run_queue_hd = END_TSO_QUEUE;
1857 cap->run_queue_tl = END_TSO_QUEUE;
1858
1859 // Any suspended C-calling Tasks are no more, their OS threads
1860 // don't exist now:
1861 cap->suspended_ccalls = NULL;
1862
1863 #if defined(THREADED_RTS)
1864 // Wipe our spare workers list, they no longer exist. New
1865 // workers will be created if necessary.
1866 cap->spare_workers = NULL;
1867 cap->n_spare_workers = 0;
1868 cap->returning_tasks_hd = NULL;
1869 cap->returning_tasks_tl = NULL;
1870 #endif
1871
1872 // Release all caps except 0, we'll use that for starting
1873 // the IO manager and running the client action below.
1874 if (cap->no != 0) {
1875 task->cap = cap;
1876 releaseCapability(cap);
1877 }
1878 }
1879 cap = &capabilities[0];
1880 task->cap = cap;
1881
1882 // Empty the threads lists. Otherwise, the garbage
1883 // collector may attempt to resurrect some of these threads.
1884 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1885 generations[g].threads = END_TSO_QUEUE;
1886 }
1887
1888 // On Unix, all timers are reset in the child, so we need to start
1889 // the timer again.
1890 initTimer();
1891 startTimer();
1892
1893 #if defined(THREADED_RTS)
1894 ioManagerStartCap(&cap);
1895 #endif
1896
1897 rts_evalStableIO(&cap, entry, NULL); // run the action
1898 rts_checkSchedStatus("forkProcess",cap);
1899
1900 rts_unlock(cap);
1901 hs_exit(); // clean up and exit
1902 stg_exit(EXIT_SUCCESS);
1903 }
1904 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1905 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1906 #endif
1907 }
1908
1909 /* ---------------------------------------------------------------------------
1910 * Changing the number of Capabilities
1911 *
1912 * Changing the number of Capabilities is very tricky! We can only do
1913 * it with the system fully stopped, so we do a full sync with
1914 * requestSync(SYNC_OTHER) and grab all the capabilities.
1915 *
1916 * Then we resize the appropriate data structures, and update all
1917 * references to the old data structures which have now moved.
1918 * Finally we release the Capabilities we are holding, and start
1919 * worker Tasks on the new Capabilities we created.
1920 *
1921 * ------------------------------------------------------------------------- */
1922
1923 void
1924 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1925 {
1926 #if !defined(THREADED_RTS)
1927 if (new_n_capabilities != 1) {
1928 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1929 }
1930 return;
1931 #elif defined(NOSMP)
1932 if (new_n_capabilities != 1) {
1933 errorBelch("setNumCapabilities: not supported on this platform");
1934 }
1935 return;
1936 #else
1937 Task *task;
1938 Capability *cap;
1939 nat sync;
1940 StgTSO* t;
1941 nat g, n;
1942 Capability *old_capabilities = NULL;
1943
1944 if (new_n_capabilities == enabled_capabilities) return;
1945
1946 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1947 enabled_capabilities, new_n_capabilities);
1948
1949 cap = rts_lock();
1950 task = cap->running_task;
1951
1952 do {
1953 sync = requestSync(&cap, task, SYNC_OTHER);
1954 } while (sync);
1955
1956 acquireAllCapabilities(cap,task);
1957
1958 pending_sync = 0;
1959
1960 if (new_n_capabilities < enabled_capabilities)
1961 {
1962 // Reducing the number of capabilities: we do not actually
1963 // remove the extra capabilities, we just mark them as
1964 // "disabled". This has the following effects:
1965 //
1966 // - threads on a disabled capability are migrated away by the
1967 // scheduler loop
1968 //
1969 // - disabled capabilities do not participate in GC
1970 // (see scheduleDoGC())
1971 //
1972 // - No spark threads are created on this capability
1973 // (see scheduleActivateSpark())
1974 //
1975 // - We do not attempt to migrate threads *to* a disabled
1976 // capability (see schedulePushWork()).
1977 //
1978 // but in other respects, a disabled capability remains
1979 // alive. Threads may be woken up on a disabled capability,
1980 // but they will be immediately migrated away.
1981 //
1982 // This approach is much easier than trying to actually remove
1983 // the capability; we don't have to worry about GC data
1984 // structures, the nursery, etc.
1985 //
1986 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
1987 capabilities[n].disabled = rtsTrue;
1988 }
1989 enabled_capabilities = new_n_capabilities;
1990 }
1991 else
1992 {
1993 // Increasing the number of enabled capabilities.
1994 //
1995 // enable any disabled capabilities, up to the required number
1996 for (n = enabled_capabilities;
1997 n < new_n_capabilities && n < n_capabilities; n++) {
1998 capabilities[n].disabled = rtsFalse;
1999 }
2000 enabled_capabilities = n;
2001
2002 if (new_n_capabilities > n_capabilities) {
2003 #if defined(TRACING)
2004 // Allocate eventlog buffers for the new capabilities. Note this
2005 // must be done before calling moreCapabilities(), because that
2006 // will emit events to add the new capabilities to capsets.
2007 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2008 #endif
2009
2010 // Resize the capabilities array
2011 // NB. after this, capabilities points somewhere new. Any pointers
2012 // of type (Capability *) are now invalid.
2013 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2014
2015 // update our own cap pointer
2016 cap = &capabilities[cap->no];
2017
2018 // Resize and update storage manager data structures
2019 storageAddCapabilities(n_capabilities, new_n_capabilities);
2020
2021 // Update (Capability *) refs in the Task manager.
2022 updateCapabilityRefs();
2023
2024 // Update (Capability *) refs from TSOs
2025 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2026 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2027 t->cap = &capabilities[t->cap->no];
2028 }
2029 }
2030 }
2031 }
2032
2033 // We're done: release the original Capabilities
2034 releaseAllCapabilities(cap,task);
2035
2036 // Start worker tasks on the new Capabilities
2037 startWorkerTasks(n_capabilities, new_n_capabilities);
2038
2039 // finally, update n_capabilities
2040 if (new_n_capabilities > n_capabilities) {
2041 n_capabilities = enabled_capabilities = new_n_capabilities;
2042 }
2043
2044 // We can't free the old array until now, because we access it
2045 // while updating pointers in updateCapabilityRefs().
2046 if (old_capabilities) {
2047 stgFree(old_capabilities);
2048 }
2049
2050 rts_unlock(cap);
2051
2052 #endif // THREADED_RTS
2053 }
2054
2055
2056
2057 /* ---------------------------------------------------------------------------
2058 * Delete all the threads in the system
2059 * ------------------------------------------------------------------------- */
2060
2061 static void
2062 deleteAllThreads ( Capability *cap )
2063 {
2064 // NOTE: only safe to call if we own all capabilities.
2065
2066 StgTSO* t, *next;
2067 nat g;
2068
2069 debugTrace(DEBUG_sched,"deleting all threads");
2070 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2071 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2072 next = t->global_link;
2073 deleteThread(cap,t);
2074 }
2075 }
2076
2077 // The run queue now contains a bunch of ThreadKilled threads. We
2078 // must not throw these away: the main thread(s) will be in there
2079 // somewhere, and the main scheduler loop has to deal with it.
2080 // Also, the run queue is the only thing keeping these threads from
2081 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2082
2083 #if !defined(THREADED_RTS)
2084 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2085 ASSERT(sleeping_queue == END_TSO_QUEUE);
2086 #endif
2087 }
2088
2089 /* -----------------------------------------------------------------------------
2090 Managing the suspended_ccalls list.
2091 Locks required: sched_mutex
2092 -------------------------------------------------------------------------- */
2093
2094 STATIC_INLINE void
2095 suspendTask (Capability *cap, Task *task)
2096 {
2097 InCall *incall;
2098
2099 incall = task->incall;
2100 ASSERT(incall->next == NULL && incall->prev == NULL);
2101 incall->next = cap->suspended_ccalls;
2102 incall->prev = NULL;
2103 if (cap->suspended_ccalls) {
2104 cap->suspended_ccalls->prev = incall;
2105 }
2106 cap->suspended_ccalls = incall;
2107 }
2108
2109 STATIC_INLINE void
2110 recoverSuspendedTask (Capability *cap, Task *task)
2111 {
2112 InCall *incall;
2113
2114 incall = task->incall;
2115 if (incall->prev) {
2116 incall->prev->next = incall->next;
2117 } else {
2118 ASSERT(cap->suspended_ccalls == incall);
2119 cap->suspended_ccalls = incall->next;
2120 }
2121 if (incall->next) {
2122 incall->next->prev = incall->prev;
2123 }
2124 incall->next = incall->prev = NULL;
2125 }
2126
2127 /* ---------------------------------------------------------------------------
2128 * Suspending & resuming Haskell threads.
2129 *
2130 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2131 * its capability before calling the C function. This allows another
2132 * task to pick up the capability and carry on running Haskell
2133 * threads. It also means that if the C call blocks, it won't lock
2134 * the whole system.
2135 *
2136 * The Haskell thread making the C call is put to sleep for the
2137 * duration of the call, on the suspended_ccalling_threads queue. We
2138 * give out a token to the task, which it can use to resume the thread
2139 * on return from the C function.
2140 *
2141 * If this is an interruptible C call, this means that the FFI call may be
2142 * unceremoniously terminated and should be scheduled on an
2143 * unbound worker thread.
2144 * ------------------------------------------------------------------------- */
2145
2146 void *
2147 suspendThread (StgRegTable *reg, rtsBool interruptible)
2148 {
2149 Capability *cap;
2150 int saved_errno;
2151 StgTSO *tso;
2152 Task *task;
2153 #if mingw32_HOST_OS
2154 StgWord32 saved_winerror;
2155 #endif
2156
2157 saved_errno = errno;
2158 #if mingw32_HOST_OS
2159 saved_winerror = GetLastError();
2160 #endif
2161
2162 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2163 */
2164 cap = regTableToCapability(reg);
2165
2166 task = cap->running_task;
2167 tso = cap->r.rCurrentTSO;
2168
2169 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2170
2171 // XXX this might not be necessary --SDM
2172 tso->what_next = ThreadRunGHC;
2173
2174 threadPaused(cap,tso);
2175
2176 if (interruptible) {
2177 tso->why_blocked = BlockedOnCCall_Interruptible;
2178 } else {
2179 tso->why_blocked = BlockedOnCCall;
2180 }
2181
2182 // Hand back capability
2183 task->incall->suspended_tso = tso;
2184 task->incall->suspended_cap = cap;
2185
2186 ACQUIRE_LOCK(&cap->lock);
2187
2188 suspendTask(cap,task);
2189 cap->in_haskell = rtsFalse;
2190 releaseCapability_(cap,rtsFalse);
2191
2192 RELEASE_LOCK(&cap->lock);
2193
2194 errno = saved_errno;
2195 #if mingw32_HOST_OS
2196 SetLastError(saved_winerror);
2197 #endif
2198 return task;
2199 }
2200
2201 StgRegTable *
2202 resumeThread (void *task_)
2203 {
2204 StgTSO *tso;
2205 InCall *incall;
2206 Capability *cap;
2207 Task *task = task_;
2208 int saved_errno;
2209 #if mingw32_HOST_OS
2210 StgWord32 saved_winerror;
2211 #endif
2212
2213 saved_errno = errno;
2214 #if mingw32_HOST_OS
2215 saved_winerror = GetLastError();
2216 #endif
2217
2218 incall = task->incall;
2219 cap = incall->suspended_cap;
2220 task->cap = cap;
2221
2222 // Wait for permission to re-enter the RTS with the result.
2223 waitForReturnCapability(&cap,task);
2224 // we might be on a different capability now... but if so, our
2225 // entry on the suspended_ccalls list will also have been
2226 // migrated.
2227
2228 // Remove the thread from the suspended list
2229 recoverSuspendedTask(cap,task);
2230
2231 tso = incall->suspended_tso;
2232 incall->suspended_tso = NULL;
2233 incall->suspended_cap = NULL;
2234 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2235
2236 traceEventRunThread(cap, tso);
2237
2238 /* Reset blocking status */
2239 tso->why_blocked = NotBlocked;
2240
2241 if ((tso->flags & TSO_BLOCKEX) == 0) {
2242 // avoid locking the TSO if we don't have to
2243 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2244 maybePerformBlockedException(cap,tso);
2245 }
2246 }
2247
2248 cap->r.rCurrentTSO = tso;
2249 cap->in_haskell = rtsTrue;
2250 errno = saved_errno;
2251 #if mingw32_HOST_OS
2252 SetLastError(saved_winerror);
2253 #endif
2254
2255 /* We might have GC'd, mark the TSO dirty again */
2256 dirty_TSO(cap,tso);
2257 dirty_STACK(cap,tso->stackobj);
2258
2259 IF_DEBUG(sanity, checkTSO(tso));
2260
2261 return &cap->r;
2262 }
2263
2264 /* ---------------------------------------------------------------------------
2265 * scheduleThread()
2266 *
2267 * scheduleThread puts a thread on the end of the runnable queue.
2268 * This will usually be done immediately after a thread is created.
2269 * The caller of scheduleThread must create the thread using e.g.
2270 * createThread and push an appropriate closure
2271 * on this thread's stack before the scheduler is invoked.
2272 * ------------------------------------------------------------------------ */
2273
2274 void
2275 scheduleThread(Capability *cap, StgTSO *tso)
2276 {
2277 // The thread goes at the *end* of the run-queue, to avoid possible
2278 // starvation of any threads already on the queue.
2279 appendToRunQueue(cap,tso);
2280 }
2281
2282 void
2283 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2284 {
2285 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2286 // move this thread from now on.
2287 #if defined(THREADED_RTS)
2288 cpu %= enabled_capabilities;
2289 if (cpu == cap->no) {
2290 appendToRunQueue(cap,tso);
2291 } else {
2292 migrateThread(cap, tso, &capabilities[cpu]);
2293 }
2294 #else
2295 appendToRunQueue(cap,tso);
2296 #endif
2297 }
2298
2299 void
2300 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2301 {
2302 Task *task;
2303 DEBUG_ONLY( StgThreadID id );
2304 Capability *cap;
2305
2306 cap = *pcap;
2307
2308 // We already created/initialised the Task
2309 task = cap->running_task;
2310
2311 // This TSO is now a bound thread; make the Task and TSO
2312 // point to each other.
2313 tso->bound = task->incall;
2314 tso->cap = cap;
2315
2316 task->incall->tso = tso;
2317 task->incall->ret = ret;
2318 task->incall->stat = NoStatus;
2319
2320 appendToRunQueue(cap,tso);
2321
2322 DEBUG_ONLY( id = tso->id );
2323 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2324
2325 cap = schedule(cap,task);
2326
2327 ASSERT(task->incall->stat != NoStatus);
2328 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2329
2330 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2331 *pcap = cap;
2332 }
2333
2334 /* ----------------------------------------------------------------------------
2335 * Starting Tasks
2336 * ------------------------------------------------------------------------- */
2337
2338 #if defined(THREADED_RTS)
2339 void scheduleWorker (Capability *cap, Task *task)
2340 {
2341 // schedule() runs without a lock.
2342 cap = schedule(cap,task);
2343
2344 // On exit from schedule(), we have a Capability, but possibly not
2345 // the same one we started with.
2346
2347 // During shutdown, the requirement is that after all the
2348 // Capabilities are shut down, all workers that are shutting down
2349 // have finished workerTaskStop(). This is why we hold on to
2350 // cap->lock until we've finished workerTaskStop() below.
2351 //
2352 // There may be workers still involved in foreign calls; those
2353 // will just block in waitForReturnCapability() because the
2354 // Capability has been shut down.
2355 //
2356 ACQUIRE_LOCK(&cap->lock);
2357 releaseCapability_(cap,rtsFalse);
2358 workerTaskStop(task);
2359 RELEASE_LOCK(&cap->lock);
2360 }
2361 #endif
2362
2363 /* ---------------------------------------------------------------------------
2364 * Start new worker tasks on Capabilities from--to
2365 * -------------------------------------------------------------------------- */
2366
2367 static void
2368 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2369 {
2370 #if defined(THREADED_RTS)
2371 nat i;
2372 Capability *cap;
2373
2374 for (i = from; i < to; i++) {
2375 cap = &capabilities[i];
2376 ACQUIRE_LOCK(&cap->lock);
2377 startWorkerTask(cap);
2378 RELEASE_LOCK(&cap->lock);
2379 }
2380 #endif
2381 }
2382
2383 /* ---------------------------------------------------------------------------
2384 * initScheduler()
2385 *
2386 * Initialise the scheduler. This resets all the queues - if the
2387 * queues contained any threads, they'll be garbage collected at the
2388 * next pass.
2389 *
2390 * ------------------------------------------------------------------------ */
2391
2392 void
2393 initScheduler(void)
2394 {
2395 #if !defined(THREADED_RTS)
2396 blocked_queue_hd = END_TSO_QUEUE;
2397 blocked_queue_tl = END_TSO_QUEUE;
2398 sleeping_queue = END_TSO_QUEUE;
2399 #endif
2400
2401 sched_state = SCHED_RUNNING;
2402 recent_activity = ACTIVITY_YES;
2403
2404 #if defined(THREADED_RTS)
2405 /* Initialise the mutex and condition variables used by
2406 * the scheduler. */
2407 initMutex(&sched_mutex);
2408 #endif
2409
2410 ACQUIRE_LOCK(&sched_mutex);
2411
2412 /* A capability holds the state a native thread needs in
2413 * order to execute STG code. At least one capability is
2414 * floating around (only THREADED_RTS builds have more than one).
2415 */
2416 initCapabilities();
2417
2418 initTaskManager();
2419
2420 /*
2421 * Eagerly start one worker to run each Capability, except for
2422 * Capability 0. The idea is that we're probably going to start a
2423 * bound thread on Capability 0 pretty soon, so we don't want a
2424 * worker task hogging it.
2425 */
2426 startWorkerTasks(1, n_capabilities);
2427
2428 RELEASE_LOCK(&sched_mutex);
2429
2430 }
2431
2432 void
2433 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2434 /* see Capability.c, shutdownCapability() */
2435 {
2436 Task *task = NULL;
2437
2438 task = newBoundTask();
2439
2440 // If we haven't killed all the threads yet, do it now.
2441 if (sched_state < SCHED_SHUTTING_DOWN) {
2442 sched_state = SCHED_INTERRUPTING;
2443 Capability *cap = task->cap;
2444 waitForReturnCapability(&cap,task);
2445 scheduleDoGC(&cap,task,rtsFalse);
2446 ASSERT(task->incall->tso == NULL);
2447 releaseCapability(cap);
2448 }
2449 sched_state = SCHED_SHUTTING_DOWN;
2450
2451 shutdownCapabilities(task, wait_foreign);
2452
2453 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2454 // n_failed_trygrab_idles, n_idle_caps);
2455
2456 boundTaskExiting(task);
2457 }
2458
2459 void
2460 freeScheduler( void )
2461 {
2462 nat still_running;
2463
2464 ACQUIRE_LOCK(&sched_mutex);
2465 still_running = freeTaskManager();
2466 // We can only free the Capabilities if there are no Tasks still
2467 // running. We might have a Task about to return from a foreign
2468 // call into waitForReturnCapability(), for example (actually,
2469 // this should be the *only* thing that a still-running Task can
2470 // do at this point, and it will block waiting for the
2471 // Capability).
2472 if (still_running == 0) {
2473 freeCapabilities();
2474 if (n_capabilities != 1) {
2475 stgFree(capabilities);
2476 }
2477 }
2478 RELEASE_LOCK(&sched_mutex);
2479 #if defined(THREADED_RTS)
2480 closeMutex(&sched_mutex);
2481 #endif
2482 }
2483
2484 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2485 void *user USED_IF_NOT_THREADS)
2486 {
2487 #if !defined(THREADED_RTS)
2488 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2489 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2490 evac(user, (StgClosure **)(void *)&sleeping_queue);
2491 #endif
2492 }
2493
2494 /* -----------------------------------------------------------------------------
2495 performGC
2496
2497 This is the interface to the garbage collector from Haskell land.
2498 We provide this so that external C code can allocate and garbage
2499 collect when called from Haskell via _ccall_GC.
2500 -------------------------------------------------------------------------- */
2501
2502 static void
2503 performGC_(rtsBool force_major)
2504 {
2505 Task *task;
2506 Capability *cap = NULL;
2507
2508 // We must grab a new Task here, because the existing Task may be
2509 // associated with a particular Capability, and chained onto the
2510 // suspended_ccalls queue.
2511 task = newBoundTask();
2512
2513 waitForReturnCapability(&cap,task);
2514 scheduleDoGC(&cap,task,force_major);
2515 releaseCapability(cap);
2516 boundTaskExiting(task);
2517 }
2518
2519 void
2520 performGC(void)
2521 {
2522 performGC_(rtsFalse);
2523 }
2524
2525 void
2526 performMajorGC(void)
2527 {
2528 performGC_(rtsTrue);
2529 }
2530
2531 /* ---------------------------------------------------------------------------
2532 Interrupt execution
2533 - usually called inside a signal handler so it mustn't do anything fancy.
2534 ------------------------------------------------------------------------ */
2535
2536 void
2537 interruptStgRts(void)
2538 {
2539 sched_state = SCHED_INTERRUPTING;
2540 interruptAllCapabilities();
2541 #if defined(THREADED_RTS)
2542 wakeUpRts();
2543 #endif
2544 }
2545
2546 /* -----------------------------------------------------------------------------
2547 Wake up the RTS
2548
2549 This function causes at least one OS thread to wake up and run the
2550 scheduler loop. It is invoked when the RTS might be deadlocked, or
2551 an external event has arrived that may need servicing (eg. a
2552 keyboard interrupt).
2553
2554 In the single-threaded RTS we don't do anything here; we only have
2555 one thread anyway, and the event that caused us to want to wake up
2556 will have interrupted any blocking system call in progress anyway.
2557 -------------------------------------------------------------------------- */
2558
2559 #if defined(THREADED_RTS)
2560 void wakeUpRts(void)
2561 {
2562 // This forces the IO Manager thread to wakeup, which will
2563 // in turn ensure that some OS thread wakes up and runs the
2564 // scheduler loop, which will cause a GC and deadlock check.
2565 ioManagerWakeup();
2566 }
2567 #endif
2568
2569 /* -----------------------------------------------------------------------------
2570 Deleting threads
2571
2572 This is used for interruption (^C) and forking, and corresponds to
2573 raising an exception but without letting the thread catch the
2574 exception.
2575 -------------------------------------------------------------------------- */
2576
2577 static void
2578 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2579 {
2580 // NOTE: must only be called on a TSO that we have exclusive
2581 // access to, because we will call throwToSingleThreaded() below.
2582 // The TSO must be on the run queue of the Capability we own, or
2583 // we must own all Capabilities.
2584
2585 if (tso->why_blocked != BlockedOnCCall &&
2586 tso->why_blocked != BlockedOnCCall_Interruptible) {
2587 throwToSingleThreaded(tso->cap,tso,NULL);
2588 }
2589 }
2590
2591 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2592 static void
2593 deleteThread_(Capability *cap, StgTSO *tso)
2594 { // for forkProcess only:
2595 // like deleteThread(), but we delete threads in foreign calls, too.
2596
2597 if (tso->why_blocked == BlockedOnCCall ||
2598 tso->why_blocked == BlockedOnCCall_Interruptible) {
2599 tso->what_next = ThreadKilled;
2600 appendToRunQueue(tso->cap, tso);
2601 } else {
2602 deleteThread(cap,tso);
2603 }
2604 }
2605 #endif
2606
2607 /* -----------------------------------------------------------------------------
2608 raiseExceptionHelper
2609
2610 This function is called by the raise# primitve, just so that we can
2611 move some of the tricky bits of raising an exception from C-- into
2612 C. Who knows, it might be a useful re-useable thing here too.
2613 -------------------------------------------------------------------------- */
2614
2615 StgWord
2616 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2617 {
2618 Capability *cap = regTableToCapability(reg);
2619 StgThunk *raise_closure = NULL;
2620 StgPtr p, next;
2621 StgRetInfoTable *info;
2622 //
2623 // This closure represents the expression 'raise# E' where E
2624 // is the exception raise. It is used to overwrite all the
2625 // thunks which are currently under evaluataion.
2626 //
2627
2628 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2629 // LDV profiling: stg_raise_info has THUNK as its closure
2630 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2631 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2632 // 1 does not cause any problem unless profiling is performed.
2633 // However, when LDV profiling goes on, we need to linearly scan
2634 // small object pool, where raise_closure is stored, so we should
2635 // use MIN_UPD_SIZE.
2636 //
2637 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2638 // sizeofW(StgClosure)+1);
2639 //
2640
2641 //
2642 // Walk up the stack, looking for the catch frame. On the way,
2643 // we update any closures pointed to from update frames with the
2644 // raise closure that we just built.
2645 //
2646 p = tso->stackobj->sp;
2647 while(1) {
2648 info = get_ret_itbl((StgClosure *)p);
2649 next = p + stack_frame_sizeW((StgClosure *)p);
2650 switch (info->i.type) {
2651
2652 case UPDATE_FRAME:
2653 // Only create raise_closure if we need to.
2654 if (raise_closure == NULL) {
2655 raise_closure =
2656 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2657 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2658 raise_closure->payload[0] = exception;
2659 }
2660 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2661 (StgClosure *)raise_closure);
2662 p = next;
2663 continue;
2664
2665 case ATOMICALLY_FRAME:
2666 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2667 tso->stackobj->sp = p;
2668 return ATOMICALLY_FRAME;
2669
2670 case CATCH_FRAME:
2671 tso->stackobj->sp = p;
2672 return CATCH_FRAME;
2673
2674 case CATCH_STM_FRAME:
2675 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2676 tso->stackobj->sp = p;
2677 return CATCH_STM_FRAME;
2678
2679 case UNDERFLOW_FRAME:
2680 tso->stackobj->sp = p;
2681 threadStackUnderflow(cap,tso);
2682 p = tso->stackobj->sp;
2683 continue;
2684
2685 case STOP_FRAME:
2686 tso->stackobj->sp = p;
2687 return STOP_FRAME;
2688
2689 case CATCH_RETRY_FRAME:
2690 default:
2691 p = next;
2692 continue;
2693 }
2694 }
2695 }
2696
2697
2698 /* -----------------------------------------------------------------------------
2699 findRetryFrameHelper
2700
2701 This function is called by the retry# primitive. It traverses the stack
2702 leaving tso->sp referring to the frame which should handle the retry.
2703
2704 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2705 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2706
2707 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2708 create) because retries are not considered to be exceptions, despite the
2709 similar implementation.
2710
2711 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2712 not be created within memory transactions.
2713 -------------------------------------------------------------------------- */
2714
2715 StgWord
2716 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2717 {
2718 StgPtr p, next;
2719 StgRetInfoTable *info;
2720
2721 p = tso->stackobj->sp;
2722 while (1) {
2723 info = get_ret_itbl((StgClosure *)p);
2724 next = p + stack_frame_sizeW((StgClosure *)p);
2725 switch (info->i.type) {
2726
2727 case ATOMICALLY_FRAME:
2728 debugTrace(DEBUG_stm,
2729 "found ATOMICALLY_FRAME at %p during retry", p);
2730 tso->stackobj->sp = p;
2731 return ATOMICALLY_FRAME;
2732
2733 case CATCH_RETRY_FRAME:
2734 debugTrace(DEBUG_stm,
2735 "found CATCH_RETRY_FRAME at %p during retrry", p);
2736 tso->stackobj->sp = p;
2737 return CATCH_RETRY_FRAME;
2738
2739 case CATCH_STM_FRAME: {
2740 StgTRecHeader *trec = tso -> trec;
2741 StgTRecHeader *outer = trec -> enclosing_trec;
2742 debugTrace(DEBUG_stm,
2743 "found CATCH_STM_FRAME at %p during retry", p);
2744 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2745 stmAbortTransaction(cap, trec);
2746 stmFreeAbortedTRec(cap, trec);
2747 tso -> trec = outer;
2748 p = next;
2749 continue;
2750 }
2751
2752 case UNDERFLOW_FRAME:
2753 threadStackUnderflow(cap,tso);
2754 p = tso->stackobj->sp;
2755 continue;
2756
2757 default:
2758 ASSERT(info->i.type != CATCH_FRAME);
2759 ASSERT(info->i.type != STOP_FRAME);
2760 p = next;
2761 continue;
2762 }
2763 }
2764 }
2765
2766 /* -----------------------------------------------------------------------------
2767 resurrectThreads is called after garbage collection on the list of
2768 threads found to be garbage. Each of these threads will be woken
2769 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2770 on an MVar, or NonTermination if the thread was blocked on a Black
2771 Hole.
2772
2773 Locks: assumes we hold *all* the capabilities.
2774 -------------------------------------------------------------------------- */
2775
2776 void
2777 resurrectThreads (StgTSO *threads)
2778 {
2779 StgTSO *tso, *next;
2780 Capability *cap;
2781 generation *gen;
2782
2783 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2784 next = tso->global_link;
2785
2786 gen = Bdescr((P_)tso)->gen;
2787 tso->global_link = gen->threads;
2788 gen->threads = tso;
2789
2790 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2791
2792 // Wake up the thread on the Capability it was last on
2793 cap = tso->cap;
2794
2795 switch (tso->why_blocked) {
2796 case BlockedOnMVar:
2797 /* Called by GC - sched_mutex lock is currently held. */
2798 throwToSingleThreaded(cap, tso,
2799 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2800 break;
2801 case BlockedOnBlackHole:
2802 throwToSingleThreaded(cap, tso,
2803 (StgClosure *)nonTermination_closure);
2804 break;
2805 case BlockedOnSTM:
2806 throwToSingleThreaded(cap, tso,
2807 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2808 break;
2809 case NotBlocked:
2810 /* This might happen if the thread was blocked on a black hole
2811 * belonging to a thread that we've just woken up (raiseAsync
2812 * can wake up threads, remember...).
2813 */
2814 continue;
2815 case BlockedOnMsgThrowTo:
2816 // This can happen if the target is masking, blocks on a
2817 // black hole, and then is found to be unreachable. In
2818 // this case, we want to let the target wake up and carry
2819 // on, and do nothing to this thread.
2820 continue;
2821 default:
2822 barf("resurrectThreads: thread blocked in a strange way: %d",
2823 tso->why_blocked);
2824 }
2825 }
2826 }