delete old comments
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsFalse);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. t might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // And save the current errno in this thread.
485 // XXX: possibly bogus for SMP because this thread might already
486 // be running again, see code below.
487 t->saved_errno = errno;
488 #if mingw32_HOST_OS
489 // Similarly for Windows error code
490 t->saved_winerror = GetLastError();
491 #endif
492
493 if (ret == ThreadBlocked) {
494 if (t->why_blocked == BlockedOnBlackHole) {
495 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
496 traceEventStopThread(cap, t, t->why_blocked + 6,
497 owner != NULL ? owner->id : 0);
498 } else {
499 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
500 }
501 } else {
502 traceEventStopThread(cap, t, ret, 0);
503 }
504
505 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
506 ASSERT(t->cap == cap);
507
508 // ----------------------------------------------------------------------
509
510 // Costs for the scheduler are assigned to CCS_SYSTEM
511 stopHeapProfTimer();
512 #if defined(PROFILING)
513 cap->r.rCCCS = CCS_SYSTEM;
514 #endif
515
516 schedulePostRunThread(cap,t);
517
518 ready_to_gc = rtsFalse;
519
520 switch (ret) {
521 case HeapOverflow:
522 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
523 break;
524
525 case StackOverflow:
526 // just adjust the stack for this thread, then pop it back
527 // on the run queue.
528 threadStackOverflow(cap, t);
529 pushOnRunQueue(cap,t);
530 break;
531
532 case ThreadYielding:
533 if (scheduleHandleYield(cap, t, prev_what_next)) {
534 // shortcut for switching between compiler/interpreter:
535 goto run_thread;
536 }
537 break;
538
539 case ThreadBlocked:
540 scheduleHandleThreadBlocked(t);
541 break;
542
543 case ThreadFinished:
544 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
545 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
546 break;
547
548 default:
549 barf("schedule: invalid thread return code %d", (int)ret);
550 }
551
552 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
553 scheduleDoGC(&cap,task,rtsFalse);
554 }
555 } /* end of while() */
556 }
557
558 /* -----------------------------------------------------------------------------
559 * Run queue operations
560 * -------------------------------------------------------------------------- */
561
562 void
563 removeFromRunQueue (Capability *cap, StgTSO *tso)
564 {
565 if (tso->block_info.prev == END_TSO_QUEUE) {
566 ASSERT(cap->run_queue_hd == tso);
567 cap->run_queue_hd = tso->_link;
568 } else {
569 setTSOLink(cap, tso->block_info.prev, tso->_link);
570 }
571 if (tso->_link == END_TSO_QUEUE) {
572 ASSERT(cap->run_queue_tl == tso);
573 cap->run_queue_tl = tso->block_info.prev;
574 } else {
575 setTSOPrev(cap, tso->_link, tso->block_info.prev);
576 }
577 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 /* ----------------------------------------------------------------------------
583 * Setting up the scheduler loop
584 * ------------------------------------------------------------------------- */
585
586 static void
587 schedulePreLoop(void)
588 {
589 // initialisation for scheduler - what cannot go into initScheduler()
590
591 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
592 win32AllocStack();
593 #endif
594 }
595
596 /* -----------------------------------------------------------------------------
597 * scheduleFindWork()
598 *
599 * Search for work to do, and handle messages from elsewhere.
600 * -------------------------------------------------------------------------- */
601
602 static void
603 scheduleFindWork (Capability **pcap)
604 {
605 scheduleStartSignalHandlers(*pcap);
606
607 scheduleProcessInbox(pcap);
608
609 scheduleCheckBlockedThreads(*pcap);
610
611 #if defined(THREADED_RTS)
612 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
613 #endif
614 }
615
616 #if defined(THREADED_RTS)
617 STATIC_INLINE rtsBool
618 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
619 {
620 // we need to yield this capability to someone else if..
621 // - another thread is initiating a GC, and we didn't just do a GC
622 // (see Note [GC livelock])
623 // - another Task is returning from a foreign call
624 // - the thread at the head of the run queue cannot be run
625 // by this Task (it is bound to another Task, or it is unbound
626 // and this task it bound).
627 //
628 // Note [GC livelock]
629 //
630 // If we are interrupted to do a GC, then we do not immediately do
631 // another one. This avoids a starvation situation where one
632 // Capability keeps forcing a GC and the other Capabilities make no
633 // progress at all.
634
635 return ((pending_sync && !didGcLast) ||
636 cap->returning_tasks_hd != NULL ||
637 (!emptyRunQueue(cap) && (task->incall->tso == NULL
638 ? cap->run_queue_hd->bound != NULL
639 : cap->run_queue_hd->bound != task->incall)));
640 }
641
642 // This is the single place where a Task goes to sleep. There are
643 // two reasons it might need to sleep:
644 // - there are no threads to run
645 // - we need to yield this Capability to someone else
646 // (see shouldYieldCapability())
647 //
648 // Careful: the scheduler loop is quite delicate. Make sure you run
649 // the tests in testsuite/concurrent (all ways) after modifying this,
650 // and also check the benchmarks in nofib/parallel for regressions.
651
652 static void
653 scheduleYield (Capability **pcap, Task *task)
654 {
655 Capability *cap = *pcap;
656 int didGcLast = rtsFalse;
657
658 // if we have work, and we don't need to give up the Capability, continue.
659 //
660 if (!shouldYieldCapability(cap,task,rtsFalse) &&
661 (!emptyRunQueue(cap) ||
662 !emptyInbox(cap) ||
663 sched_state >= SCHED_INTERRUPTING)) {
664 return;
665 }
666
667 // otherwise yield (sleep), and keep yielding if necessary.
668 do {
669 didGcLast = yieldCapability(&cap,task, !didGcLast);
670 }
671 while (shouldYieldCapability(cap,task,didGcLast));
672
673 // note there may still be no threads on the run queue at this
674 // point, the caller has to check.
675
676 *pcap = cap;
677 return;
678 }
679 #endif
680
681 /* -----------------------------------------------------------------------------
682 * schedulePushWork()
683 *
684 * Push work to other Capabilities if we have some.
685 * -------------------------------------------------------------------------- */
686
687 static void
688 schedulePushWork(Capability *cap USED_IF_THREADS,
689 Task *task USED_IF_THREADS)
690 {
691 /* following code not for PARALLEL_HASKELL. I kept the call general,
692 future GUM versions might use pushing in a distributed setup */
693 #if defined(THREADED_RTS)
694
695 Capability *free_caps[n_capabilities], *cap0;
696 nat i, n_free_caps;
697
698 // migration can be turned off with +RTS -qm
699 if (!RtsFlags.ParFlags.migrate) return;
700
701 // Check whether we have more threads on our run queue, or sparks
702 // in our pool, that we could hand to another Capability.
703 if (cap->run_queue_hd == END_TSO_QUEUE) {
704 if (sparkPoolSizeCap(cap) < 2) return;
705 } else {
706 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
707 sparkPoolSizeCap(cap) < 1) return;
708 }
709
710 // First grab as many free Capabilities as we can.
711 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
712 cap0 = &capabilities[i];
713 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
714 if (!emptyRunQueue(cap0)
715 || cap0->returning_tasks_hd != NULL
716 || cap0->inbox != (Message*)END_TSO_QUEUE) {
717 // it already has some work, we just grabbed it at
718 // the wrong moment. Or maybe it's deadlocked!
719 releaseCapability(cap0);
720 } else {
721 free_caps[n_free_caps++] = cap0;
722 }
723 }
724 }
725
726 // we now have n_free_caps free capabilities stashed in
727 // free_caps[]. Share our run queue equally with them. This is
728 // probably the simplest thing we could do; improvements we might
729 // want to do include:
730 //
731 // - giving high priority to moving relatively new threads, on
732 // the gournds that they haven't had time to build up a
733 // working set in the cache on this CPU/Capability.
734 //
735 // - giving low priority to moving long-lived threads
736
737 if (n_free_caps > 0) {
738 StgTSO *prev, *t, *next;
739 #ifdef SPARK_PUSHING
740 rtsBool pushed_to_all;
741 #endif
742
743 debugTrace(DEBUG_sched,
744 "cap %d: %s and %d free capabilities, sharing...",
745 cap->no,
746 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
747 "excess threads on run queue":"sparks to share (>=2)",
748 n_free_caps);
749
750 i = 0;
751 #ifdef SPARK_PUSHING
752 pushed_to_all = rtsFalse;
753 #endif
754
755 if (cap->run_queue_hd != END_TSO_QUEUE) {
756 prev = cap->run_queue_hd;
757 t = prev->_link;
758 prev->_link = END_TSO_QUEUE;
759 for (; t != END_TSO_QUEUE; t = next) {
760 next = t->_link;
761 t->_link = END_TSO_QUEUE;
762 if (t->bound == task->incall // don't move my bound thread
763 || tsoLocked(t)) { // don't move a locked thread
764 setTSOLink(cap, prev, t);
765 setTSOPrev(cap, t, prev);
766 prev = t;
767 } else if (i == n_free_caps) {
768 #ifdef SPARK_PUSHING
769 pushed_to_all = rtsTrue;
770 #endif
771 i = 0;
772 // keep one for us
773 setTSOLink(cap, prev, t);
774 setTSOPrev(cap, t, prev);
775 prev = t;
776 } else {
777 appendToRunQueue(free_caps[i],t);
778
779 traceEventMigrateThread (cap, t, free_caps[i]->no);
780
781 if (t->bound) { t->bound->task->cap = free_caps[i]; }
782 t->cap = free_caps[i];
783 i++;
784 }
785 }
786 cap->run_queue_tl = prev;
787
788 IF_DEBUG(sanity, checkRunQueue(cap));
789 }
790
791 #ifdef SPARK_PUSHING
792 /* JB I left this code in place, it would work but is not necessary */
793
794 // If there are some free capabilities that we didn't push any
795 // threads to, then try to push a spark to each one.
796 if (!pushed_to_all) {
797 StgClosure *spark;
798 // i is the next free capability to push to
799 for (; i < n_free_caps; i++) {
800 if (emptySparkPoolCap(free_caps[i])) {
801 spark = tryStealSpark(cap->sparks);
802 if (spark != NULL) {
803 /* TODO: if anyone wants to re-enable this code then
804 * they must consider the fizzledSpark(spark) case
805 * and update the per-cap spark statistics.
806 */
807 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
808
809 traceEventStealSpark(free_caps[i], t, cap->no);
810
811 newSpark(&(free_caps[i]->r), spark);
812 }
813 }
814 }
815 }
816 #endif /* SPARK_PUSHING */
817
818 // release the capabilities
819 for (i = 0; i < n_free_caps; i++) {
820 task->cap = free_caps[i];
821 releaseAndWakeupCapability(free_caps[i]);
822 }
823 }
824 task->cap = cap; // reset to point to our Capability.
825
826 #endif /* THREADED_RTS */
827
828 }
829
830 /* ----------------------------------------------------------------------------
831 * Start any pending signal handlers
832 * ------------------------------------------------------------------------- */
833
834 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
835 static void
836 scheduleStartSignalHandlers(Capability *cap)
837 {
838 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
839 // safe outside the lock
840 startSignalHandlers(cap);
841 }
842 }
843 #else
844 static void
845 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
846 {
847 }
848 #endif
849
850 /* ----------------------------------------------------------------------------
851 * Check for blocked threads that can be woken up.
852 * ------------------------------------------------------------------------- */
853
854 static void
855 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
856 {
857 #if !defined(THREADED_RTS)
858 //
859 // Check whether any waiting threads need to be woken up. If the
860 // run queue is empty, and there are no other tasks running, we
861 // can wait indefinitely for something to happen.
862 //
863 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
864 {
865 awaitEvent (emptyRunQueue(cap));
866 }
867 #endif
868 }
869
870 /* ----------------------------------------------------------------------------
871 * Detect deadlock conditions and attempt to resolve them.
872 * ------------------------------------------------------------------------- */
873
874 static void
875 scheduleDetectDeadlock (Capability **pcap, Task *task)
876 {
877 Capability *cap = *pcap;
878 /*
879 * Detect deadlock: when we have no threads to run, there are no
880 * threads blocked, waiting for I/O, or sleeping, and all the
881 * other tasks are waiting for work, we must have a deadlock of
882 * some description.
883 */
884 if ( emptyThreadQueues(cap) )
885 {
886 #if defined(THREADED_RTS)
887 /*
888 * In the threaded RTS, we only check for deadlock if there
889 * has been no activity in a complete timeslice. This means
890 * we won't eagerly start a full GC just because we don't have
891 * any threads to run currently.
892 */
893 if (recent_activity != ACTIVITY_INACTIVE) return;
894 #endif
895
896 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
897
898 // Garbage collection can release some new threads due to
899 // either (a) finalizers or (b) threads resurrected because
900 // they are unreachable and will therefore be sent an
901 // exception. Any threads thus released will be immediately
902 // runnable.
903 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
904 cap = *pcap;
905 // when force_major == rtsTrue. scheduleDoGC sets
906 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
907 // signal.
908
909 if ( !emptyRunQueue(cap) ) return;
910
911 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
912 /* If we have user-installed signal handlers, then wait
913 * for signals to arrive rather then bombing out with a
914 * deadlock.
915 */
916 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
917 debugTrace(DEBUG_sched,
918 "still deadlocked, waiting for signals...");
919
920 awaitUserSignals();
921
922 if (signals_pending()) {
923 startSignalHandlers(cap);
924 }
925
926 // either we have threads to run, or we were interrupted:
927 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
928
929 return;
930 }
931 #endif
932
933 #if !defined(THREADED_RTS)
934 /* Probably a real deadlock. Send the current main thread the
935 * Deadlock exception.
936 */
937 if (task->incall->tso) {
938 switch (task->incall->tso->why_blocked) {
939 case BlockedOnSTM:
940 case BlockedOnBlackHole:
941 case BlockedOnMsgThrowTo:
942 case BlockedOnMVar:
943 throwToSingleThreaded(cap, task->incall->tso,
944 (StgClosure *)nonTermination_closure);
945 return;
946 default:
947 barf("deadlock: main thread blocked in a strange way");
948 }
949 }
950 return;
951 #endif
952 }
953 }
954
955
956 /* ----------------------------------------------------------------------------
957 * Send pending messages (PARALLEL_HASKELL only)
958 * ------------------------------------------------------------------------- */
959
960 #if defined(PARALLEL_HASKELL)
961 static void
962 scheduleSendPendingMessages(void)
963 {
964
965 # if defined(PAR) // global Mem.Mgmt., omit for now
966 if (PendingFetches != END_BF_QUEUE) {
967 processFetches();
968 }
969 # endif
970
971 if (RtsFlags.ParFlags.BufferTime) {
972 // if we use message buffering, we must send away all message
973 // packets which have become too old...
974 sendOldBuffers();
975 }
976 }
977 #endif
978
979 /* ----------------------------------------------------------------------------
980 * Process message in the current Capability's inbox
981 * ------------------------------------------------------------------------- */
982
983 static void
984 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
985 {
986 #if defined(THREADED_RTS)
987 Message *m, *next;
988 int r;
989 Capability *cap = *pcap;
990
991 while (!emptyInbox(cap)) {
992 if (cap->r.rCurrentNursery->link == NULL ||
993 g0->n_new_large_words >= large_alloc_lim) {
994 scheduleDoGC(pcap, cap->running_task, rtsFalse);
995 cap = *pcap;
996 }
997
998 // don't use a blocking acquire; if the lock is held by
999 // another thread then just carry on. This seems to avoid
1000 // getting stuck in a message ping-pong situation with other
1001 // processors. We'll check the inbox again later anyway.
1002 //
1003 // We should really use a more efficient queue data structure
1004 // here. The trickiness is that we must ensure a Capability
1005 // never goes idle if the inbox is non-empty, which is why we
1006 // use cap->lock (cap->lock is released as the last thing
1007 // before going idle; see Capability.c:releaseCapability()).
1008 r = TRY_ACQUIRE_LOCK(&cap->lock);
1009 if (r != 0) return;
1010
1011 m = cap->inbox;
1012 cap->inbox = (Message*)END_TSO_QUEUE;
1013
1014 RELEASE_LOCK(&cap->lock);
1015
1016 while (m != (Message*)END_TSO_QUEUE) {
1017 next = m->link;
1018 executeMessage(cap, m);
1019 m = next;
1020 }
1021 }
1022 #endif
1023 }
1024
1025 /* ----------------------------------------------------------------------------
1026 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1027 * ------------------------------------------------------------------------- */
1028
1029 #if defined(THREADED_RTS)
1030 static void
1031 scheduleActivateSpark(Capability *cap)
1032 {
1033 if (anySparks() && !cap->disabled)
1034 {
1035 createSparkThread(cap);
1036 debugTrace(DEBUG_sched, "creating a spark thread");
1037 }
1038 }
1039 #endif // PARALLEL_HASKELL || THREADED_RTS
1040
1041 /* ----------------------------------------------------------------------------
1042 * After running a thread...
1043 * ------------------------------------------------------------------------- */
1044
1045 static void
1046 schedulePostRunThread (Capability *cap, StgTSO *t)
1047 {
1048 // We have to be able to catch transactions that are in an
1049 // infinite loop as a result of seeing an inconsistent view of
1050 // memory, e.g.
1051 //
1052 // atomically $ do
1053 // [a,b] <- mapM readTVar [ta,tb]
1054 // when (a == b) loop
1055 //
1056 // and a is never equal to b given a consistent view of memory.
1057 //
1058 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1059 if (!stmValidateNestOfTransactions (t -> trec)) {
1060 debugTrace(DEBUG_sched | DEBUG_stm,
1061 "trec %p found wasting its time", t);
1062
1063 // strip the stack back to the
1064 // ATOMICALLY_FRAME, aborting the (nested)
1065 // transaction, and saving the stack of any
1066 // partially-evaluated thunks on the heap.
1067 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1068
1069 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1070 }
1071 }
1072
1073 /* some statistics gathering in the parallel case */
1074 }
1075
1076 /* -----------------------------------------------------------------------------
1077 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1078 * -------------------------------------------------------------------------- */
1079
1080 static rtsBool
1081 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1082 {
1083 // did the task ask for a large block?
1084 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1085 // if so, get one and push it on the front of the nursery.
1086 bdescr *bd;
1087 W_ blocks;
1088
1089 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1090
1091 if (blocks > BLOCKS_PER_MBLOCK) {
1092 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1093 }
1094
1095 debugTrace(DEBUG_sched,
1096 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1097 (long)t->id, what_next_strs[t->what_next], blocks);
1098
1099 // don't do this if the nursery is (nearly) full, we'll GC first.
1100 if (cap->r.rCurrentNursery->link != NULL ||
1101 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1102 // if the nursery has only one block.
1103
1104 bd = allocGroup_lock(blocks);
1105 cap->r.rNursery->n_blocks += blocks;
1106
1107 // link the new group into the list
1108 bd->link = cap->r.rCurrentNursery;
1109 bd->u.back = cap->r.rCurrentNursery->u.back;
1110 if (cap->r.rCurrentNursery->u.back != NULL) {
1111 cap->r.rCurrentNursery->u.back->link = bd;
1112 } else {
1113 cap->r.rNursery->blocks = bd;
1114 }
1115 cap->r.rCurrentNursery->u.back = bd;
1116
1117 // initialise it as a nursery block. We initialise the
1118 // step, gen_no, and flags field of *every* sub-block in
1119 // this large block, because this is easier than making
1120 // sure that we always find the block head of a large
1121 // block whenever we call Bdescr() (eg. evacuate() and
1122 // isAlive() in the GC would both have to do this, at
1123 // least).
1124 {
1125 bdescr *x;
1126 for (x = bd; x < bd + blocks; x++) {
1127 initBdescr(x,g0,g0);
1128 x->free = x->start;
1129 x->flags = 0;
1130 }
1131 }
1132
1133 // This assert can be a killer if the app is doing lots
1134 // of large block allocations.
1135 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1136
1137 // now update the nursery to point to the new block
1138 cap->r.rCurrentNursery = bd;
1139
1140 // we might be unlucky and have another thread get on the
1141 // run queue before us and steal the large block, but in that
1142 // case the thread will just end up requesting another large
1143 // block.
1144 pushOnRunQueue(cap,t);
1145 return rtsFalse; /* not actually GC'ing */
1146 }
1147 }
1148
1149 if (cap->r.rHpLim == NULL || cap->context_switch) {
1150 // Sometimes we miss a context switch, e.g. when calling
1151 // primitives in a tight loop, MAYBE_GC() doesn't check the
1152 // context switch flag, and we end up waiting for a GC.
1153 // See #1984, and concurrent/should_run/1984
1154 cap->context_switch = 0;
1155 appendToRunQueue(cap,t);
1156 } else {
1157 pushOnRunQueue(cap,t);
1158 }
1159 return rtsTrue;
1160 /* actual GC is done at the end of the while loop in schedule() */
1161 }
1162
1163 /* -----------------------------------------------------------------------------
1164 * Handle a thread that returned to the scheduler with ThreadYielding
1165 * -------------------------------------------------------------------------- */
1166
1167 static rtsBool
1168 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1169 {
1170 /* put the thread back on the run queue. Then, if we're ready to
1171 * GC, check whether this is the last task to stop. If so, wake
1172 * up the GC thread. getThread will block during a GC until the
1173 * GC is finished.
1174 */
1175
1176 ASSERT(t->_link == END_TSO_QUEUE);
1177
1178 // Shortcut if we're just switching evaluators: don't bother
1179 // doing stack squeezing (which can be expensive), just run the
1180 // thread.
1181 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1182 debugTrace(DEBUG_sched,
1183 "--<< thread %ld (%s) stopped to switch evaluators",
1184 (long)t->id, what_next_strs[t->what_next]);
1185 return rtsTrue;
1186 }
1187
1188 // Reset the context switch flag. We don't do this just before
1189 // running the thread, because that would mean we would lose ticks
1190 // during GC, which can lead to unfair scheduling (a thread hogs
1191 // the CPU because the tick always arrives during GC). This way
1192 // penalises threads that do a lot of allocation, but that seems
1193 // better than the alternative.
1194 if (cap->context_switch != 0) {
1195 cap->context_switch = 0;
1196 appendToRunQueue(cap,t);
1197 } else {
1198 pushOnRunQueue(cap,t);
1199 }
1200
1201 IF_DEBUG(sanity,
1202 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1203 checkTSO(t));
1204
1205 return rtsFalse;
1206 }
1207
1208 /* -----------------------------------------------------------------------------
1209 * Handle a thread that returned to the scheduler with ThreadBlocked
1210 * -------------------------------------------------------------------------- */
1211
1212 static void
1213 scheduleHandleThreadBlocked( StgTSO *t
1214 #if !defined(DEBUG)
1215 STG_UNUSED
1216 #endif
1217 )
1218 {
1219
1220 // We don't need to do anything. The thread is blocked, and it
1221 // has tidied up its stack and placed itself on whatever queue
1222 // it needs to be on.
1223
1224 // ASSERT(t->why_blocked != NotBlocked);
1225 // Not true: for example,
1226 // - the thread may have woken itself up already, because
1227 // threadPaused() might have raised a blocked throwTo
1228 // exception, see maybePerformBlockedException().
1229
1230 #ifdef DEBUG
1231 traceThreadStatus(DEBUG_sched, t);
1232 #endif
1233 }
1234
1235 /* -----------------------------------------------------------------------------
1236 * Handle a thread that returned to the scheduler with ThreadFinished
1237 * -------------------------------------------------------------------------- */
1238
1239 static rtsBool
1240 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1241 {
1242 /* Need to check whether this was a main thread, and if so,
1243 * return with the return value.
1244 *
1245 * We also end up here if the thread kills itself with an
1246 * uncaught exception, see Exception.cmm.
1247 */
1248
1249 // blocked exceptions can now complete, even if the thread was in
1250 // blocked mode (see #2910).
1251 awakenBlockedExceptionQueue (cap, t);
1252
1253 //
1254 // Check whether the thread that just completed was a bound
1255 // thread, and if so return with the result.
1256 //
1257 // There is an assumption here that all thread completion goes
1258 // through this point; we need to make sure that if a thread
1259 // ends up in the ThreadKilled state, that it stays on the run
1260 // queue so it can be dealt with here.
1261 //
1262
1263 if (t->bound) {
1264
1265 if (t->bound != task->incall) {
1266 #if !defined(THREADED_RTS)
1267 // Must be a bound thread that is not the topmost one. Leave
1268 // it on the run queue until the stack has unwound to the
1269 // point where we can deal with this. Leaving it on the run
1270 // queue also ensures that the garbage collector knows about
1271 // this thread and its return value (it gets dropped from the
1272 // step->threads list so there's no other way to find it).
1273 appendToRunQueue(cap,t);
1274 return rtsFalse;
1275 #else
1276 // this cannot happen in the threaded RTS, because a
1277 // bound thread can only be run by the appropriate Task.
1278 barf("finished bound thread that isn't mine");
1279 #endif
1280 }
1281
1282 ASSERT(task->incall->tso == t);
1283
1284 if (t->what_next == ThreadComplete) {
1285 if (task->incall->ret) {
1286 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1287 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1288 }
1289 task->incall->stat = Success;
1290 } else {
1291 if (task->incall->ret) {
1292 *(task->incall->ret) = NULL;
1293 }
1294 if (sched_state >= SCHED_INTERRUPTING) {
1295 if (heap_overflow) {
1296 task->incall->stat = HeapExhausted;
1297 } else {
1298 task->incall->stat = Interrupted;
1299 }
1300 } else {
1301 task->incall->stat = Killed;
1302 }
1303 }
1304 #ifdef DEBUG
1305 removeThreadLabel((StgWord)task->incall->tso->id);
1306 #endif
1307
1308 // We no longer consider this thread and task to be bound to
1309 // each other. The TSO lives on until it is GC'd, but the
1310 // task is about to be released by the caller, and we don't
1311 // want anyone following the pointer from the TSO to the
1312 // defunct task (which might have already been
1313 // re-used). This was a real bug: the GC updated
1314 // tso->bound->tso which lead to a deadlock.
1315 t->bound = NULL;
1316 task->incall->tso = NULL;
1317
1318 return rtsTrue; // tells schedule() to return
1319 }
1320
1321 return rtsFalse;
1322 }
1323
1324 /* -----------------------------------------------------------------------------
1325 * Perform a heap census
1326 * -------------------------------------------------------------------------- */
1327
1328 static rtsBool
1329 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1330 {
1331 // When we have +RTS -i0 and we're heap profiling, do a census at
1332 // every GC. This lets us get repeatable runs for debugging.
1333 if (performHeapProfile ||
1334 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1335 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1336 return rtsTrue;
1337 } else {
1338 return rtsFalse;
1339 }
1340 }
1341
1342 /* -----------------------------------------------------------------------------
1343 * Start a synchronisation of all capabilities
1344 * -------------------------------------------------------------------------- */
1345
1346 // Returns:
1347 // 0 if we successfully got a sync
1348 // non-0 if there was another sync request in progress,
1349 // and we yielded to it. The value returned is the
1350 // type of the other sync request.
1351 //
1352 #if defined(THREADED_RTS)
1353 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1354 {
1355 nat prev_pending_sync;
1356
1357 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1358
1359 if (prev_pending_sync)
1360 {
1361 do {
1362 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1363 prev_pending_sync);
1364 ASSERT(*pcap);
1365 yieldCapability(pcap,task,rtsTrue);
1366 } while (pending_sync);
1367 return prev_pending_sync; // NOTE: task->cap might have changed now
1368 }
1369 else
1370 {
1371 return 0;
1372 }
1373 }
1374
1375 //
1376 // Grab all the capabilities except the one we already hold. Used
1377 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1378 // before a fork (SYNC_OTHER).
1379 //
1380 // Only call this after requestSync(), otherwise a deadlock might
1381 // ensue if another thread is trying to synchronise.
1382 //
1383 static void acquireAllCapabilities(Capability *cap, Task *task)
1384 {
1385 Capability *tmpcap;
1386 nat i;
1387
1388 for (i=0; i < n_capabilities; i++) {
1389 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1390 tmpcap = &capabilities[i];
1391 if (tmpcap != cap) {
1392 // we better hope this task doesn't get migrated to
1393 // another Capability while we're waiting for this one.
1394 // It won't, because load balancing happens while we have
1395 // all the Capabilities, but even so it's a slightly
1396 // unsavoury invariant.
1397 task->cap = tmpcap;
1398 waitForReturnCapability(&tmpcap, task);
1399 if (tmpcap->no != i) {
1400 barf("acquireAllCapabilities: got the wrong capability");
1401 }
1402 }
1403 }
1404 task->cap = cap;
1405 }
1406
1407 static void releaseAllCapabilities(Capability *cap, Task *task)
1408 {
1409 nat i;
1410
1411 for (i = 0; i < n_capabilities; i++) {
1412 if (cap->no != i) {
1413 task->cap = &capabilities[i];
1414 releaseCapability(&capabilities[i]);
1415 }
1416 }
1417 task->cap = cap;
1418 }
1419 #endif
1420
1421 /* -----------------------------------------------------------------------------
1422 * Perform a garbage collection if necessary
1423 * -------------------------------------------------------------------------- */
1424
1425 static void
1426 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1427 rtsBool force_major)
1428 {
1429 Capability *cap = *pcap;
1430 rtsBool heap_census;
1431 nat collect_gen;
1432 #ifdef THREADED_RTS
1433 rtsBool idle_cap[n_capabilities];
1434 rtsBool gc_type;
1435 nat i, sync;
1436 StgTSO *tso;
1437 #endif
1438
1439 if (sched_state == SCHED_SHUTTING_DOWN) {
1440 // The final GC has already been done, and the system is
1441 // shutting down. We'll probably deadlock if we try to GC
1442 // now.
1443 return;
1444 }
1445
1446 heap_census = scheduleNeedHeapProfile(rtsTrue);
1447
1448 // Figure out which generation we are collecting, so that we can
1449 // decide whether this is a parallel GC or not.
1450 collect_gen = calcNeeded(force_major || heap_census, NULL);
1451
1452 #ifdef THREADED_RTS
1453 if (sched_state < SCHED_INTERRUPTING
1454 && RtsFlags.ParFlags.parGcEnabled
1455 && collect_gen >= RtsFlags.ParFlags.parGcGen
1456 && ! oldest_gen->mark)
1457 {
1458 gc_type = SYNC_GC_PAR;
1459 } else {
1460 gc_type = SYNC_GC_SEQ;
1461 }
1462
1463 // In order to GC, there must be no threads running Haskell code.
1464 // Therefore, the GC thread needs to hold *all* the capabilities,
1465 // and release them after the GC has completed.
1466 //
1467 // This seems to be the simplest way: previous attempts involved
1468 // making all the threads with capabilities give up their
1469 // capabilities and sleep except for the *last* one, which
1470 // actually did the GC. But it's quite hard to arrange for all
1471 // the other tasks to sleep and stay asleep.
1472 //
1473
1474 /* Other capabilities are prevented from running yet more Haskell
1475 threads if pending_sync is set. Tested inside
1476 yieldCapability() and releaseCapability() in Capability.c */
1477
1478 do {
1479 sync = requestSync(pcap, task, gc_type);
1480 cap = *pcap;
1481 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1482 // someone else had a pending sync request for a GC, so
1483 // let's assume GC has been done and we don't need to GC
1484 // again.
1485 return;
1486 }
1487 if (sched_state == SCHED_SHUTTING_DOWN) {
1488 // The scheduler might now be shutting down. We tested
1489 // this above, but it might have become true since then as
1490 // we yielded the capability in requestSync().
1491 return;
1492 }
1493 } while (sync);
1494
1495 interruptAllCapabilities();
1496
1497 // The final shutdown GC is always single-threaded, because it's
1498 // possible that some of the Capabilities have no worker threads.
1499
1500 if (gc_type == SYNC_GC_SEQ)
1501 {
1502 traceEventRequestSeqGc(cap);
1503 }
1504 else
1505 {
1506 traceEventRequestParGc(cap);
1507 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1508 }
1509
1510 if (gc_type == SYNC_GC_SEQ)
1511 {
1512 // single-threaded GC: grab all the capabilities
1513 acquireAllCapabilities(cap,task);
1514 }
1515 else
1516 {
1517 // If we are load-balancing collections in this
1518 // generation, then we require all GC threads to participate
1519 // in the collection. Otherwise, we only require active
1520 // threads to participate, and we set gc_threads[i]->idle for
1521 // any idle capabilities. The rationale here is that waking
1522 // up an idle Capability takes much longer than just doing any
1523 // GC work on its behalf.
1524
1525 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1526 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1527 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1528 for (i=0; i < n_capabilities; i++) {
1529 if (capabilities[i].disabled) {
1530 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1531 } else {
1532 idle_cap[i] = rtsFalse;
1533 }
1534 }
1535 } else {
1536 for (i=0; i < n_capabilities; i++) {
1537 if (capabilities[i].disabled) {
1538 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1539 } else if (i == cap->no ||
1540 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1541 idle_cap[i] = rtsFalse;
1542 } else {
1543 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1544 if (!idle_cap[i]) {
1545 n_failed_trygrab_idles++;
1546 } else {
1547 n_idle_caps++;
1548 }
1549 }
1550 }
1551 }
1552
1553 // We set the gc_thread[i]->idle flag if that
1554 // capability/thread is not participating in this collection.
1555 // We also keep a local record of which capabilities are idle
1556 // in idle_cap[], because scheduleDoGC() is re-entrant:
1557 // another thread might start a GC as soon as we've finished
1558 // this one, and thus the gc_thread[]->idle flags are invalid
1559 // as soon as we release any threads after GC. Getting this
1560 // wrong leads to a rare and hard to debug deadlock!
1561
1562 for (i=0; i < n_capabilities; i++) {
1563 gc_threads[i]->idle = idle_cap[i];
1564 capabilities[i].idle++;
1565 }
1566
1567 // For all capabilities participating in this GC, wait until
1568 // they have stopped mutating and are standing by for GC.
1569 waitForGcThreads(cap);
1570
1571 #if defined(THREADED_RTS)
1572 // Stable point where we can do a global check on our spark counters
1573 ASSERT(checkSparkCountInvariant());
1574 #endif
1575 }
1576
1577 #endif
1578
1579 IF_DEBUG(scheduler, printAllThreads());
1580
1581 delete_threads_and_gc:
1582 /*
1583 * We now have all the capabilities; if we're in an interrupting
1584 * state, then we should take the opportunity to delete all the
1585 * threads in the system.
1586 */
1587 if (sched_state == SCHED_INTERRUPTING) {
1588 deleteAllThreads(cap);
1589 #if defined(THREADED_RTS)
1590 // Discard all the sparks from every Capability. Why?
1591 // They'll probably be GC'd anyway since we've killed all the
1592 // threads. It just avoids the GC having to do any work to
1593 // figure out that any remaining sparks are garbage.
1594 for (i = 0; i < n_capabilities; i++) {
1595 capabilities[i].spark_stats.gcd +=
1596 sparkPoolSize(capabilities[i].sparks);
1597 // No race here since all Caps are stopped.
1598 discardSparksCap(&capabilities[i]);
1599 }
1600 #endif
1601 sched_state = SCHED_SHUTTING_DOWN;
1602 }
1603
1604 /*
1605 * When there are disabled capabilities, we want to migrate any
1606 * threads away from them. Normally this happens in the
1607 * scheduler's loop, but only for unbound threads - it's really
1608 * hard for a bound thread to migrate itself. So we have another
1609 * go here.
1610 */
1611 #if defined(THREADED_RTS)
1612 for (i = enabled_capabilities; i < n_capabilities; i++) {
1613 Capability *tmp_cap, *dest_cap;
1614 tmp_cap = &capabilities[i];
1615 ASSERT(tmp_cap->disabled);
1616 if (i != cap->no) {
1617 dest_cap = &capabilities[i % enabled_capabilities];
1618 while (!emptyRunQueue(tmp_cap)) {
1619 tso = popRunQueue(tmp_cap);
1620 migrateThread(tmp_cap, tso, dest_cap);
1621 if (tso->bound) {
1622 traceTaskMigrate(tso->bound->task,
1623 tso->bound->task->cap,
1624 dest_cap);
1625 tso->bound->task->cap = dest_cap;
1626 }
1627 }
1628 }
1629 }
1630 #endif
1631
1632 #if defined(THREADED_RTS)
1633 // reset pending_sync *before* GC, so that when the GC threads
1634 // emerge they don't immediately re-enter the GC.
1635 pending_sync = 0;
1636 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1637 #else
1638 GarbageCollect(collect_gen, heap_census, 0, cap);
1639 #endif
1640
1641 traceSparkCounters(cap);
1642
1643 switch (recent_activity) {
1644 case ACTIVITY_INACTIVE:
1645 if (force_major) {
1646 // We are doing a GC because the system has been idle for a
1647 // timeslice and we need to check for deadlock. Record the
1648 // fact that we've done a GC and turn off the timer signal;
1649 // it will get re-enabled if we run any threads after the GC.
1650 recent_activity = ACTIVITY_DONE_GC;
1651 #ifndef PROFILING
1652 stopTimer();
1653 #endif
1654 break;
1655 }
1656 // fall through...
1657
1658 case ACTIVITY_MAYBE_NO:
1659 // the GC might have taken long enough for the timer to set
1660 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1661 // but we aren't necessarily deadlocked:
1662 recent_activity = ACTIVITY_YES;
1663 break;
1664
1665 case ACTIVITY_DONE_GC:
1666 // If we are actually active, the scheduler will reset the
1667 // recent_activity flag and re-enable the timer.
1668 break;
1669 }
1670
1671 #if defined(THREADED_RTS)
1672 // Stable point where we can do a global check on our spark counters
1673 ASSERT(checkSparkCountInvariant());
1674 #endif
1675
1676 // The heap census itself is done during GarbageCollect().
1677 if (heap_census) {
1678 performHeapProfile = rtsFalse;
1679 }
1680
1681 #if defined(THREADED_RTS)
1682 if (gc_type == SYNC_GC_PAR)
1683 {
1684 releaseGCThreads(cap);
1685 for (i = 0; i < n_capabilities; i++) {
1686 if (i != cap->no) {
1687 if (idle_cap[i]) {
1688 ASSERT(capabilities[i].running_task == task);
1689 task->cap = &capabilities[i];
1690 releaseCapability(&capabilities[i]);
1691 } else {
1692 ASSERT(capabilities[i].running_task != task);
1693 }
1694 }
1695 }
1696 task->cap = cap;
1697 }
1698 #endif
1699
1700 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1701 // GC set the heap_overflow flag, so we should proceed with
1702 // an orderly shutdown now. Ultimately we want the main
1703 // thread to return to its caller with HeapExhausted, at which
1704 // point the caller should call hs_exit(). The first step is
1705 // to delete all the threads.
1706 //
1707 // Another way to do this would be to raise an exception in
1708 // the main thread, which we really should do because it gives
1709 // the program a chance to clean up. But how do we find the
1710 // main thread? It should presumably be the same one that
1711 // gets ^C exceptions, but that's all done on the Haskell side
1712 // (GHC.TopHandler).
1713 sched_state = SCHED_INTERRUPTING;
1714 goto delete_threads_and_gc;
1715 }
1716
1717 #ifdef SPARKBALANCE
1718 /* JB
1719 Once we are all together... this would be the place to balance all
1720 spark pools. No concurrent stealing or adding of new sparks can
1721 occur. Should be defined in Sparks.c. */
1722 balanceSparkPoolsCaps(n_capabilities, capabilities);
1723 #endif
1724
1725 #if defined(THREADED_RTS)
1726 if (gc_type == SYNC_GC_SEQ) {
1727 // release our stash of capabilities.
1728 releaseAllCapabilities(cap, task);
1729 }
1730 #endif
1731
1732 return;
1733 }
1734
1735 /* ---------------------------------------------------------------------------
1736 * Singleton fork(). Do not copy any running threads.
1737 * ------------------------------------------------------------------------- */
1738
1739 pid_t
1740 forkProcess(HsStablePtr *entry
1741 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1742 STG_UNUSED
1743 #endif
1744 )
1745 {
1746 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1747 pid_t pid;
1748 StgTSO* t,*next;
1749 Capability *cap;
1750 nat g;
1751 Task *task = NULL;
1752 nat i;
1753 #ifdef THREADED_RTS
1754 nat sync;
1755 #endif
1756
1757 debugTrace(DEBUG_sched, "forking!");
1758
1759 task = newBoundTask();
1760
1761 cap = NULL;
1762 waitForReturnCapability(&cap, task);
1763
1764 #ifdef THREADED_RTS
1765 do {
1766 sync = requestSync(&cap, task, SYNC_OTHER);
1767 } while (sync);
1768
1769 acquireAllCapabilities(cap,task);
1770
1771 pending_sync = 0;
1772 #endif
1773
1774 // no funny business: hold locks while we fork, otherwise if some
1775 // other thread is holding a lock when the fork happens, the data
1776 // structure protected by the lock will forever be in an
1777 // inconsistent state in the child. See also #1391.
1778 ACQUIRE_LOCK(&sched_mutex);
1779 ACQUIRE_LOCK(&sm_mutex);
1780 ACQUIRE_LOCK(&stable_mutex);
1781 ACQUIRE_LOCK(&task->lock);
1782
1783 for (i=0; i < n_capabilities; i++) {
1784 ACQUIRE_LOCK(&capabilities[i].lock);
1785 }
1786
1787 stopTimer(); // See #4074
1788
1789 #if defined(TRACING)
1790 flushEventLog(); // so that child won't inherit dirty file buffers
1791 #endif
1792
1793 pid = fork();
1794
1795 if (pid) { // parent
1796
1797 startTimer(); // #4074
1798
1799 RELEASE_LOCK(&sched_mutex);
1800 RELEASE_LOCK(&sm_mutex);
1801 RELEASE_LOCK(&stable_mutex);
1802 RELEASE_LOCK(&task->lock);
1803
1804 for (i=0; i < n_capabilities; i++) {
1805 releaseCapability_(&capabilities[i],rtsFalse);
1806 RELEASE_LOCK(&capabilities[i].lock);
1807 }
1808 boundTaskExiting(task);
1809
1810 // just return the pid
1811 return pid;
1812
1813 } else { // child
1814
1815 #if defined(THREADED_RTS)
1816 initMutex(&sched_mutex);
1817 initMutex(&sm_mutex);
1818 initMutex(&stable_mutex);
1819 initMutex(&task->lock);
1820
1821 for (i=0; i < n_capabilities; i++) {
1822 initMutex(&capabilities[i].lock);
1823 }
1824 #endif
1825
1826 #ifdef TRACING
1827 resetTracing();
1828 #endif
1829
1830 // Now, all OS threads except the thread that forked are
1831 // stopped. We need to stop all Haskell threads, including
1832 // those involved in foreign calls. Also we need to delete
1833 // all Tasks, because they correspond to OS threads that are
1834 // now gone.
1835
1836 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1837 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1838 next = t->global_link;
1839 // don't allow threads to catch the ThreadKilled
1840 // exception, but we do want to raiseAsync() because these
1841 // threads may be evaluating thunks that we need later.
1842 deleteThread_(t->cap,t);
1843
1844 // stop the GC from updating the InCall to point to
1845 // the TSO. This is only necessary because the
1846 // OSThread bound to the TSO has been killed, and
1847 // won't get a chance to exit in the usual way (see
1848 // also scheduleHandleThreadFinished).
1849 t->bound = NULL;
1850 }
1851 }
1852
1853 discardTasksExcept(task);
1854
1855 for (i=0; i < n_capabilities; i++) {
1856 cap = &capabilities[i];
1857
1858 // Empty the run queue. It seems tempting to let all the
1859 // killed threads stay on the run queue as zombies to be
1860 // cleaned up later, but some of them may correspond to
1861 // bound threads for which the corresponding Task does not
1862 // exist.
1863 cap->run_queue_hd = END_TSO_QUEUE;
1864 cap->run_queue_tl = END_TSO_QUEUE;
1865
1866 // Any suspended C-calling Tasks are no more, their OS threads
1867 // don't exist now:
1868 cap->suspended_ccalls = NULL;
1869
1870 #if defined(THREADED_RTS)
1871 // Wipe our spare workers list, they no longer exist. New
1872 // workers will be created if necessary.
1873 cap->spare_workers = NULL;
1874 cap->n_spare_workers = 0;
1875 cap->returning_tasks_hd = NULL;
1876 cap->returning_tasks_tl = NULL;
1877 #endif
1878
1879 // Release all caps except 0, we'll use that for starting
1880 // the IO manager and running the client action below.
1881 if (cap->no != 0) {
1882 task->cap = cap;
1883 releaseCapability(cap);
1884 }
1885 }
1886 cap = &capabilities[0];
1887 task->cap = cap;
1888
1889 // Empty the threads lists. Otherwise, the garbage
1890 // collector may attempt to resurrect some of these threads.
1891 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1892 generations[g].threads = END_TSO_QUEUE;
1893 }
1894
1895 // On Unix, all timers are reset in the child, so we need to start
1896 // the timer again.
1897 initTimer();
1898 startTimer();
1899
1900 // TODO: need to trace various other things in the child
1901 // like startup event, capabilities, process info etc
1902 traceTaskCreate(task, cap);
1903
1904 #if defined(THREADED_RTS)
1905 ioManagerStartCap(&cap);
1906 #endif
1907
1908 rts_evalStableIO(&cap, entry, NULL); // run the action
1909 rts_checkSchedStatus("forkProcess",cap);
1910
1911 rts_unlock(cap);
1912 hs_exit(); // clean up and exit
1913 stg_exit(EXIT_SUCCESS);
1914 }
1915 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1916 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1917 #endif
1918 }
1919
1920 /* ---------------------------------------------------------------------------
1921 * Changing the number of Capabilities
1922 *
1923 * Changing the number of Capabilities is very tricky! We can only do
1924 * it with the system fully stopped, so we do a full sync with
1925 * requestSync(SYNC_OTHER) and grab all the capabilities.
1926 *
1927 * Then we resize the appropriate data structures, and update all
1928 * references to the old data structures which have now moved.
1929 * Finally we release the Capabilities we are holding, and start
1930 * worker Tasks on the new Capabilities we created.
1931 *
1932 * ------------------------------------------------------------------------- */
1933
1934 void
1935 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1936 {
1937 #if !defined(THREADED_RTS)
1938 if (new_n_capabilities != 1) {
1939 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1940 }
1941 return;
1942 #elif defined(NOSMP)
1943 if (new_n_capabilities != 1) {
1944 errorBelch("setNumCapabilities: not supported on this platform");
1945 }
1946 return;
1947 #else
1948 Task *task;
1949 Capability *cap;
1950 nat sync;
1951 StgTSO* t;
1952 nat g, n;
1953 Capability *old_capabilities = NULL;
1954
1955 if (new_n_capabilities == enabled_capabilities) return;
1956
1957 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1958 enabled_capabilities, new_n_capabilities);
1959
1960 cap = rts_lock();
1961 task = cap->running_task;
1962
1963 do {
1964 sync = requestSync(&cap, task, SYNC_OTHER);
1965 } while (sync);
1966
1967 acquireAllCapabilities(cap,task);
1968
1969 pending_sync = 0;
1970
1971 if (new_n_capabilities < enabled_capabilities)
1972 {
1973 // Reducing the number of capabilities: we do not actually
1974 // remove the extra capabilities, we just mark them as
1975 // "disabled". This has the following effects:
1976 //
1977 // - threads on a disabled capability are migrated away by the
1978 // scheduler loop
1979 //
1980 // - disabled capabilities do not participate in GC
1981 // (see scheduleDoGC())
1982 //
1983 // - No spark threads are created on this capability
1984 // (see scheduleActivateSpark())
1985 //
1986 // - We do not attempt to migrate threads *to* a disabled
1987 // capability (see schedulePushWork()).
1988 //
1989 // but in other respects, a disabled capability remains
1990 // alive. Threads may be woken up on a disabled capability,
1991 // but they will be immediately migrated away.
1992 //
1993 // This approach is much easier than trying to actually remove
1994 // the capability; we don't have to worry about GC data
1995 // structures, the nursery, etc.
1996 //
1997 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
1998 capabilities[n].disabled = rtsTrue;
1999 traceCapDisable(&capabilities[n]);
2000 }
2001 enabled_capabilities = new_n_capabilities;
2002 }
2003 else
2004 {
2005 // Increasing the number of enabled capabilities.
2006 //
2007 // enable any disabled capabilities, up to the required number
2008 for (n = enabled_capabilities;
2009 n < new_n_capabilities && n < n_capabilities; n++) {
2010 capabilities[n].disabled = rtsFalse;
2011 traceCapEnable(&capabilities[n]);
2012 }
2013 enabled_capabilities = n;
2014
2015 if (new_n_capabilities > n_capabilities) {
2016 #if defined(TRACING)
2017 // Allocate eventlog buffers for the new capabilities. Note this
2018 // must be done before calling moreCapabilities(), because that
2019 // will emit events about creating the new capabilities and adding
2020 // them to existing capsets.
2021 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2022 #endif
2023
2024 // Resize the capabilities array
2025 // NB. after this, capabilities points somewhere new. Any pointers
2026 // of type (Capability *) are now invalid.
2027 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2028
2029 // update our own cap pointer
2030 cap = &capabilities[cap->no];
2031
2032 // Resize and update storage manager data structures
2033 storageAddCapabilities(n_capabilities, new_n_capabilities);
2034
2035 // Update (Capability *) refs in the Task manager.
2036 updateCapabilityRefs();
2037
2038 // Update (Capability *) refs from TSOs
2039 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2040 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2041 t->cap = &capabilities[t->cap->no];
2042 }
2043 }
2044 }
2045 }
2046
2047 // We're done: release the original Capabilities
2048 releaseAllCapabilities(cap,task);
2049
2050 // Start worker tasks on the new Capabilities
2051 startWorkerTasks(n_capabilities, new_n_capabilities);
2052
2053 // finally, update n_capabilities
2054 if (new_n_capabilities > n_capabilities) {
2055 n_capabilities = enabled_capabilities = new_n_capabilities;
2056 }
2057
2058 // We can't free the old array until now, because we access it
2059 // while updating pointers in updateCapabilityRefs().
2060 if (old_capabilities) {
2061 stgFree(old_capabilities);
2062 }
2063
2064 rts_unlock(cap);
2065
2066 #endif // THREADED_RTS
2067 }
2068
2069
2070
2071 /* ---------------------------------------------------------------------------
2072 * Delete all the threads in the system
2073 * ------------------------------------------------------------------------- */
2074
2075 static void
2076 deleteAllThreads ( Capability *cap )
2077 {
2078 // NOTE: only safe to call if we own all capabilities.
2079
2080 StgTSO* t, *next;
2081 nat g;
2082
2083 debugTrace(DEBUG_sched,"deleting all threads");
2084 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2085 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2086 next = t->global_link;
2087 deleteThread(cap,t);
2088 }
2089 }
2090
2091 // The run queue now contains a bunch of ThreadKilled threads. We
2092 // must not throw these away: the main thread(s) will be in there
2093 // somewhere, and the main scheduler loop has to deal with it.
2094 // Also, the run queue is the only thing keeping these threads from
2095 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2096
2097 #if !defined(THREADED_RTS)
2098 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2099 ASSERT(sleeping_queue == END_TSO_QUEUE);
2100 #endif
2101 }
2102
2103 /* -----------------------------------------------------------------------------
2104 Managing the suspended_ccalls list.
2105 Locks required: sched_mutex
2106 -------------------------------------------------------------------------- */
2107
2108 STATIC_INLINE void
2109 suspendTask (Capability *cap, Task *task)
2110 {
2111 InCall *incall;
2112
2113 incall = task->incall;
2114 ASSERT(incall->next == NULL && incall->prev == NULL);
2115 incall->next = cap->suspended_ccalls;
2116 incall->prev = NULL;
2117 if (cap->suspended_ccalls) {
2118 cap->suspended_ccalls->prev = incall;
2119 }
2120 cap->suspended_ccalls = incall;
2121 }
2122
2123 STATIC_INLINE void
2124 recoverSuspendedTask (Capability *cap, Task *task)
2125 {
2126 InCall *incall;
2127
2128 incall = task->incall;
2129 if (incall->prev) {
2130 incall->prev->next = incall->next;
2131 } else {
2132 ASSERT(cap->suspended_ccalls == incall);
2133 cap->suspended_ccalls = incall->next;
2134 }
2135 if (incall->next) {
2136 incall->next->prev = incall->prev;
2137 }
2138 incall->next = incall->prev = NULL;
2139 }
2140
2141 /* ---------------------------------------------------------------------------
2142 * Suspending & resuming Haskell threads.
2143 *
2144 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2145 * its capability before calling the C function. This allows another
2146 * task to pick up the capability and carry on running Haskell
2147 * threads. It also means that if the C call blocks, it won't lock
2148 * the whole system.
2149 *
2150 * The Haskell thread making the C call is put to sleep for the
2151 * duration of the call, on the suspended_ccalling_threads queue. We
2152 * give out a token to the task, which it can use to resume the thread
2153 * on return from the C function.
2154 *
2155 * If this is an interruptible C call, this means that the FFI call may be
2156 * unceremoniously terminated and should be scheduled on an
2157 * unbound worker thread.
2158 * ------------------------------------------------------------------------- */
2159
2160 void *
2161 suspendThread (StgRegTable *reg, rtsBool interruptible)
2162 {
2163 Capability *cap;
2164 int saved_errno;
2165 StgTSO *tso;
2166 Task *task;
2167 #if mingw32_HOST_OS
2168 StgWord32 saved_winerror;
2169 #endif
2170
2171 saved_errno = errno;
2172 #if mingw32_HOST_OS
2173 saved_winerror = GetLastError();
2174 #endif
2175
2176 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2177 */
2178 cap = regTableToCapability(reg);
2179
2180 task = cap->running_task;
2181 tso = cap->r.rCurrentTSO;
2182
2183 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2184
2185 // XXX this might not be necessary --SDM
2186 tso->what_next = ThreadRunGHC;
2187
2188 threadPaused(cap,tso);
2189
2190 if (interruptible) {
2191 tso->why_blocked = BlockedOnCCall_Interruptible;
2192 } else {
2193 tso->why_blocked = BlockedOnCCall;
2194 }
2195
2196 // Hand back capability
2197 task->incall->suspended_tso = tso;
2198 task->incall->suspended_cap = cap;
2199
2200 ACQUIRE_LOCK(&cap->lock);
2201
2202 suspendTask(cap,task);
2203 cap->in_haskell = rtsFalse;
2204 releaseCapability_(cap,rtsFalse);
2205
2206 RELEASE_LOCK(&cap->lock);
2207
2208 errno = saved_errno;
2209 #if mingw32_HOST_OS
2210 SetLastError(saved_winerror);
2211 #endif
2212 return task;
2213 }
2214
2215 StgRegTable *
2216 resumeThread (void *task_)
2217 {
2218 StgTSO *tso;
2219 InCall *incall;
2220 Capability *cap;
2221 Task *task = task_;
2222 int saved_errno;
2223 #if mingw32_HOST_OS
2224 StgWord32 saved_winerror;
2225 #endif
2226
2227 saved_errno = errno;
2228 #if mingw32_HOST_OS
2229 saved_winerror = GetLastError();
2230 #endif
2231
2232 incall = task->incall;
2233 cap = incall->suspended_cap;
2234 task->cap = cap;
2235
2236 // Wait for permission to re-enter the RTS with the result.
2237 waitForReturnCapability(&cap,task);
2238 // we might be on a different capability now... but if so, our
2239 // entry on the suspended_ccalls list will also have been
2240 // migrated.
2241
2242 // Remove the thread from the suspended list
2243 recoverSuspendedTask(cap,task);
2244
2245 tso = incall->suspended_tso;
2246 incall->suspended_tso = NULL;
2247 incall->suspended_cap = NULL;
2248 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2249
2250 traceEventRunThread(cap, tso);
2251
2252 /* Reset blocking status */
2253 tso->why_blocked = NotBlocked;
2254
2255 if ((tso->flags & TSO_BLOCKEX) == 0) {
2256 // avoid locking the TSO if we don't have to
2257 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2258 maybePerformBlockedException(cap,tso);
2259 }
2260 }
2261
2262 cap->r.rCurrentTSO = tso;
2263 cap->in_haskell = rtsTrue;
2264 errno = saved_errno;
2265 #if mingw32_HOST_OS
2266 SetLastError(saved_winerror);
2267 #endif
2268
2269 /* We might have GC'd, mark the TSO dirty again */
2270 dirty_TSO(cap,tso);
2271 dirty_STACK(cap,tso->stackobj);
2272
2273 IF_DEBUG(sanity, checkTSO(tso));
2274
2275 return &cap->r;
2276 }
2277
2278 /* ---------------------------------------------------------------------------
2279 * scheduleThread()
2280 *
2281 * scheduleThread puts a thread on the end of the runnable queue.
2282 * This will usually be done immediately after a thread is created.
2283 * The caller of scheduleThread must create the thread using e.g.
2284 * createThread and push an appropriate closure
2285 * on this thread's stack before the scheduler is invoked.
2286 * ------------------------------------------------------------------------ */
2287
2288 void
2289 scheduleThread(Capability *cap, StgTSO *tso)
2290 {
2291 // The thread goes at the *end* of the run-queue, to avoid possible
2292 // starvation of any threads already on the queue.
2293 appendToRunQueue(cap,tso);
2294 }
2295
2296 void
2297 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2298 {
2299 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2300 // move this thread from now on.
2301 #if defined(THREADED_RTS)
2302 cpu %= enabled_capabilities;
2303 if (cpu == cap->no) {
2304 appendToRunQueue(cap,tso);
2305 } else {
2306 migrateThread(cap, tso, &capabilities[cpu]);
2307 }
2308 #else
2309 appendToRunQueue(cap,tso);
2310 #endif
2311 }
2312
2313 void
2314 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2315 {
2316 Task *task;
2317 DEBUG_ONLY( StgThreadID id );
2318 Capability *cap;
2319
2320 cap = *pcap;
2321
2322 // We already created/initialised the Task
2323 task = cap->running_task;
2324
2325 // This TSO is now a bound thread; make the Task and TSO
2326 // point to each other.
2327 tso->bound = task->incall;
2328 tso->cap = cap;
2329
2330 task->incall->tso = tso;
2331 task->incall->ret = ret;
2332 task->incall->stat = NoStatus;
2333
2334 appendToRunQueue(cap,tso);
2335
2336 DEBUG_ONLY( id = tso->id );
2337 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2338
2339 cap = schedule(cap,task);
2340
2341 ASSERT(task->incall->stat != NoStatus);
2342 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2343
2344 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2345 *pcap = cap;
2346 }
2347
2348 /* ----------------------------------------------------------------------------
2349 * Starting Tasks
2350 * ------------------------------------------------------------------------- */
2351
2352 #if defined(THREADED_RTS)
2353 void scheduleWorker (Capability *cap, Task *task)
2354 {
2355 // schedule() runs without a lock.
2356 cap = schedule(cap,task);
2357
2358 // On exit from schedule(), we have a Capability, but possibly not
2359 // the same one we started with.
2360
2361 // During shutdown, the requirement is that after all the
2362 // Capabilities are shut down, all workers that are shutting down
2363 // have finished workerTaskStop(). This is why we hold on to
2364 // cap->lock until we've finished workerTaskStop() below.
2365 //
2366 // There may be workers still involved in foreign calls; those
2367 // will just block in waitForReturnCapability() because the
2368 // Capability has been shut down.
2369 //
2370 ACQUIRE_LOCK(&cap->lock);
2371 releaseCapability_(cap,rtsFalse);
2372 workerTaskStop(task);
2373 RELEASE_LOCK(&cap->lock);
2374 }
2375 #endif
2376
2377 /* ---------------------------------------------------------------------------
2378 * Start new worker tasks on Capabilities from--to
2379 * -------------------------------------------------------------------------- */
2380
2381 static void
2382 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2383 {
2384 #if defined(THREADED_RTS)
2385 nat i;
2386 Capability *cap;
2387
2388 for (i = from; i < to; i++) {
2389 cap = &capabilities[i];
2390 ACQUIRE_LOCK(&cap->lock);
2391 startWorkerTask(cap);
2392 RELEASE_LOCK(&cap->lock);
2393 }
2394 #endif
2395 }
2396
2397 /* ---------------------------------------------------------------------------
2398 * initScheduler()
2399 *
2400 * Initialise the scheduler. This resets all the queues - if the
2401 * queues contained any threads, they'll be garbage collected at the
2402 * next pass.
2403 *
2404 * ------------------------------------------------------------------------ */
2405
2406 void
2407 initScheduler(void)
2408 {
2409 #if !defined(THREADED_RTS)
2410 blocked_queue_hd = END_TSO_QUEUE;
2411 blocked_queue_tl = END_TSO_QUEUE;
2412 sleeping_queue = END_TSO_QUEUE;
2413 #endif
2414
2415 sched_state = SCHED_RUNNING;
2416 recent_activity = ACTIVITY_YES;
2417
2418 #if defined(THREADED_RTS)
2419 /* Initialise the mutex and condition variables used by
2420 * the scheduler. */
2421 initMutex(&sched_mutex);
2422 #endif
2423
2424 ACQUIRE_LOCK(&sched_mutex);
2425
2426 /* A capability holds the state a native thread needs in
2427 * order to execute STG code. At least one capability is
2428 * floating around (only THREADED_RTS builds have more than one).
2429 */
2430 initCapabilities();
2431
2432 initTaskManager();
2433
2434 /*
2435 * Eagerly start one worker to run each Capability, except for
2436 * Capability 0. The idea is that we're probably going to start a
2437 * bound thread on Capability 0 pretty soon, so we don't want a
2438 * worker task hogging it.
2439 */
2440 startWorkerTasks(1, n_capabilities);
2441
2442 RELEASE_LOCK(&sched_mutex);
2443
2444 }
2445
2446 void
2447 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2448 /* see Capability.c, shutdownCapability() */
2449 {
2450 Task *task = NULL;
2451
2452 task = newBoundTask();
2453
2454 // If we haven't killed all the threads yet, do it now.
2455 if (sched_state < SCHED_SHUTTING_DOWN) {
2456 sched_state = SCHED_INTERRUPTING;
2457 Capability *cap = task->cap;
2458 waitForReturnCapability(&cap,task);
2459 scheduleDoGC(&cap,task,rtsTrue);
2460 ASSERT(task->incall->tso == NULL);
2461 releaseCapability(cap);
2462 }
2463 sched_state = SCHED_SHUTTING_DOWN;
2464
2465 shutdownCapabilities(task, wait_foreign);
2466
2467 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2468 // n_failed_trygrab_idles, n_idle_caps);
2469
2470 boundTaskExiting(task);
2471 }
2472
2473 void
2474 freeScheduler( void )
2475 {
2476 nat still_running;
2477
2478 ACQUIRE_LOCK(&sched_mutex);
2479 still_running = freeTaskManager();
2480 // We can only free the Capabilities if there are no Tasks still
2481 // running. We might have a Task about to return from a foreign
2482 // call into waitForReturnCapability(), for example (actually,
2483 // this should be the *only* thing that a still-running Task can
2484 // do at this point, and it will block waiting for the
2485 // Capability).
2486 if (still_running == 0) {
2487 freeCapabilities();
2488 if (n_capabilities != 1) {
2489 stgFree(capabilities);
2490 }
2491 }
2492 RELEASE_LOCK(&sched_mutex);
2493 #if defined(THREADED_RTS)
2494 closeMutex(&sched_mutex);
2495 #endif
2496 }
2497
2498 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2499 void *user USED_IF_NOT_THREADS)
2500 {
2501 #if !defined(THREADED_RTS)
2502 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2503 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2504 evac(user, (StgClosure **)(void *)&sleeping_queue);
2505 #endif
2506 }
2507
2508 /* -----------------------------------------------------------------------------
2509 performGC
2510
2511 This is the interface to the garbage collector from Haskell land.
2512 We provide this so that external C code can allocate and garbage
2513 collect when called from Haskell via _ccall_GC.
2514 -------------------------------------------------------------------------- */
2515
2516 static void
2517 performGC_(rtsBool force_major)
2518 {
2519 Task *task;
2520 Capability *cap = NULL;
2521
2522 // We must grab a new Task here, because the existing Task may be
2523 // associated with a particular Capability, and chained onto the
2524 // suspended_ccalls queue.
2525 task = newBoundTask();
2526
2527 // TODO: do we need to traceTask*() here?
2528
2529 waitForReturnCapability(&cap,task);
2530 scheduleDoGC(&cap,task,force_major);
2531 releaseCapability(cap);
2532 boundTaskExiting(task);
2533 }
2534
2535 void
2536 performGC(void)
2537 {
2538 performGC_(rtsFalse);
2539 }
2540
2541 void
2542 performMajorGC(void)
2543 {
2544 performGC_(rtsTrue);
2545 }
2546
2547 /* ---------------------------------------------------------------------------
2548 Interrupt execution
2549 - usually called inside a signal handler so it mustn't do anything fancy.
2550 ------------------------------------------------------------------------ */
2551
2552 void
2553 interruptStgRts(void)
2554 {
2555 sched_state = SCHED_INTERRUPTING;
2556 interruptAllCapabilities();
2557 #if defined(THREADED_RTS)
2558 wakeUpRts();
2559 #endif
2560 }
2561
2562 /* -----------------------------------------------------------------------------
2563 Wake up the RTS
2564
2565 This function causes at least one OS thread to wake up and run the
2566 scheduler loop. It is invoked when the RTS might be deadlocked, or
2567 an external event has arrived that may need servicing (eg. a
2568 keyboard interrupt).
2569
2570 In the single-threaded RTS we don't do anything here; we only have
2571 one thread anyway, and the event that caused us to want to wake up
2572 will have interrupted any blocking system call in progress anyway.
2573 -------------------------------------------------------------------------- */
2574
2575 #if defined(THREADED_RTS)
2576 void wakeUpRts(void)
2577 {
2578 // This forces the IO Manager thread to wakeup, which will
2579 // in turn ensure that some OS thread wakes up and runs the
2580 // scheduler loop, which will cause a GC and deadlock check.
2581 ioManagerWakeup();
2582 }
2583 #endif
2584
2585 /* -----------------------------------------------------------------------------
2586 Deleting threads
2587
2588 This is used for interruption (^C) and forking, and corresponds to
2589 raising an exception but without letting the thread catch the
2590 exception.
2591 -------------------------------------------------------------------------- */
2592
2593 static void
2594 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2595 {
2596 // NOTE: must only be called on a TSO that we have exclusive
2597 // access to, because we will call throwToSingleThreaded() below.
2598 // The TSO must be on the run queue of the Capability we own, or
2599 // we must own all Capabilities.
2600
2601 if (tso->why_blocked != BlockedOnCCall &&
2602 tso->why_blocked != BlockedOnCCall_Interruptible) {
2603 throwToSingleThreaded(tso->cap,tso,NULL);
2604 }
2605 }
2606
2607 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2608 static void
2609 deleteThread_(Capability *cap, StgTSO *tso)
2610 { // for forkProcess only:
2611 // like deleteThread(), but we delete threads in foreign calls, too.
2612
2613 if (tso->why_blocked == BlockedOnCCall ||
2614 tso->why_blocked == BlockedOnCCall_Interruptible) {
2615 tso->what_next = ThreadKilled;
2616 appendToRunQueue(tso->cap, tso);
2617 } else {
2618 deleteThread(cap,tso);
2619 }
2620 }
2621 #endif
2622
2623 /* -----------------------------------------------------------------------------
2624 raiseExceptionHelper
2625
2626 This function is called by the raise# primitve, just so that we can
2627 move some of the tricky bits of raising an exception from C-- into
2628 C. Who knows, it might be a useful re-useable thing here too.
2629 -------------------------------------------------------------------------- */
2630
2631 StgWord
2632 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2633 {
2634 Capability *cap = regTableToCapability(reg);
2635 StgThunk *raise_closure = NULL;
2636 StgPtr p, next;
2637 StgRetInfoTable *info;
2638 //
2639 // This closure represents the expression 'raise# E' where E
2640 // is the exception raise. It is used to overwrite all the
2641 // thunks which are currently under evaluataion.
2642 //
2643
2644 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2645 // LDV profiling: stg_raise_info has THUNK as its closure
2646 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2647 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2648 // 1 does not cause any problem unless profiling is performed.
2649 // However, when LDV profiling goes on, we need to linearly scan
2650 // small object pool, where raise_closure is stored, so we should
2651 // use MIN_UPD_SIZE.
2652 //
2653 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2654 // sizeofW(StgClosure)+1);
2655 //
2656
2657 //
2658 // Walk up the stack, looking for the catch frame. On the way,
2659 // we update any closures pointed to from update frames with the
2660 // raise closure that we just built.
2661 //
2662 p = tso->stackobj->sp;
2663 while(1) {
2664 info = get_ret_itbl((StgClosure *)p);
2665 next = p + stack_frame_sizeW((StgClosure *)p);
2666 switch (info->i.type) {
2667
2668 case UPDATE_FRAME:
2669 // Only create raise_closure if we need to.
2670 if (raise_closure == NULL) {
2671 raise_closure =
2672 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2673 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2674 raise_closure->payload[0] = exception;
2675 }
2676 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2677 (StgClosure *)raise_closure);
2678 p = next;
2679 continue;
2680
2681 case ATOMICALLY_FRAME:
2682 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2683 tso->stackobj->sp = p;
2684 return ATOMICALLY_FRAME;
2685
2686 case CATCH_FRAME:
2687 tso->stackobj->sp = p;
2688 return CATCH_FRAME;
2689
2690 case CATCH_STM_FRAME:
2691 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2692 tso->stackobj->sp = p;
2693 return CATCH_STM_FRAME;
2694
2695 case UNDERFLOW_FRAME:
2696 tso->stackobj->sp = p;
2697 threadStackUnderflow(cap,tso);
2698 p = tso->stackobj->sp;
2699 continue;
2700
2701 case STOP_FRAME:
2702 tso->stackobj->sp = p;
2703 return STOP_FRAME;
2704
2705 case CATCH_RETRY_FRAME:
2706 default:
2707 p = next;
2708 continue;
2709 }
2710 }
2711 }
2712
2713
2714 /* -----------------------------------------------------------------------------
2715 findRetryFrameHelper
2716
2717 This function is called by the retry# primitive. It traverses the stack
2718 leaving tso->sp referring to the frame which should handle the retry.
2719
2720 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2721 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2722
2723 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2724 create) because retries are not considered to be exceptions, despite the
2725 similar implementation.
2726
2727 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2728 not be created within memory transactions.
2729 -------------------------------------------------------------------------- */
2730
2731 StgWord
2732 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2733 {
2734 StgPtr p, next;
2735 StgRetInfoTable *info;
2736
2737 p = tso->stackobj->sp;
2738 while (1) {
2739 info = get_ret_itbl((StgClosure *)p);
2740 next = p + stack_frame_sizeW((StgClosure *)p);
2741 switch (info->i.type) {
2742
2743 case ATOMICALLY_FRAME:
2744 debugTrace(DEBUG_stm,
2745 "found ATOMICALLY_FRAME at %p during retry", p);
2746 tso->stackobj->sp = p;
2747 return ATOMICALLY_FRAME;
2748
2749 case CATCH_RETRY_FRAME:
2750 debugTrace(DEBUG_stm,
2751 "found CATCH_RETRY_FRAME at %p during retry", p);
2752 tso->stackobj->sp = p;
2753 return CATCH_RETRY_FRAME;
2754
2755 case CATCH_STM_FRAME: {
2756 StgTRecHeader *trec = tso -> trec;
2757 StgTRecHeader *outer = trec -> enclosing_trec;
2758 debugTrace(DEBUG_stm,
2759 "found CATCH_STM_FRAME at %p during retry", p);
2760 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2761 stmAbortTransaction(cap, trec);
2762 stmFreeAbortedTRec(cap, trec);
2763 tso -> trec = outer;
2764 p = next;
2765 continue;
2766 }
2767
2768 case UNDERFLOW_FRAME:
2769 threadStackUnderflow(cap,tso);
2770 p = tso->stackobj->sp;
2771 continue;
2772
2773 default:
2774 ASSERT(info->i.type != CATCH_FRAME);
2775 ASSERT(info->i.type != STOP_FRAME);
2776 p = next;
2777 continue;
2778 }
2779 }
2780 }
2781
2782 /* -----------------------------------------------------------------------------
2783 resurrectThreads is called after garbage collection on the list of
2784 threads found to be garbage. Each of these threads will be woken
2785 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2786 on an MVar, or NonTermination if the thread was blocked on a Black
2787 Hole.
2788
2789 Locks: assumes we hold *all* the capabilities.
2790 -------------------------------------------------------------------------- */
2791
2792 void
2793 resurrectThreads (StgTSO *threads)
2794 {
2795 StgTSO *tso, *next;
2796 Capability *cap;
2797 generation *gen;
2798
2799 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2800 next = tso->global_link;
2801
2802 gen = Bdescr((P_)tso)->gen;
2803 tso->global_link = gen->threads;
2804 gen->threads = tso;
2805
2806 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2807
2808 // Wake up the thread on the Capability it was last on
2809 cap = tso->cap;
2810
2811 switch (tso->why_blocked) {
2812 case BlockedOnMVar:
2813 /* Called by GC - sched_mutex lock is currently held. */
2814 throwToSingleThreaded(cap, tso,
2815 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2816 break;
2817 case BlockedOnBlackHole:
2818 throwToSingleThreaded(cap, tso,
2819 (StgClosure *)nonTermination_closure);
2820 break;
2821 case BlockedOnSTM:
2822 throwToSingleThreaded(cap, tso,
2823 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2824 break;
2825 case NotBlocked:
2826 /* This might happen if the thread was blocked on a black hole
2827 * belonging to a thread that we've just woken up (raiseAsync
2828 * can wake up threads, remember...).
2829 */
2830 continue;
2831 case BlockedOnMsgThrowTo:
2832 // This can happen if the target is masking, blocks on a
2833 // black hole, and then is found to be unreachable. In
2834 // this case, we want to let the target wake up and carry
2835 // on, and do nothing to this thread.
2836 continue;
2837 default:
2838 barf("resurrectThreads: thread blocked in a strange way: %d",
2839 tso->why_blocked);
2840 }
2841 }
2842 }