tidy up
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability **pcap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability **cap);
149 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163
164 static void deleteThread (Capability *cap, StgTSO *tso);
165 static void deleteAllThreads (Capability *cap);
166
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability *cap, StgTSO *tso);
169 #endif
170
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
173
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
176
177 * out of heap space
178 * timer expires (thread yields)
179 * thread blocks
180 * thread ends
181 * stack overflow
182
183 GRAN version:
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
189
190 GUM version:
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
198
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
204
205 ------------------------------------------------------------------------ */
206
207 static Capability *
208 schedule (Capability *initialCapability, Task *task)
209 {
210 StgTSO *t;
211 Capability *cap;
212 StgThreadReturnCode ret;
213 nat prev_what_next;
214 rtsBool ready_to_gc;
215 #if defined(THREADED_RTS)
216 rtsBool first = rtsTrue;
217 #endif
218
219 cap = initialCapability;
220
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
224
225 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226
227 schedulePreLoop();
228
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
231
232 while (1) {
233
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
236 // call).
237 if (cap->in_haskell) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE);
241 }
242
243 // The interruption / shutdown sequence.
244 //
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
250 //
251 // So the sequence for ^C goes like this:
252 //
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
255 //
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
262 //
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
264 //
265 // * sched_state := SCHED_SHUTTING_DOWN
266 //
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
270 //
271 // * eventually all Capabilities will shut down, and the RTS can
272 // exit.
273 //
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
276
277 switch (sched_state) {
278 case SCHED_RUNNING:
279 break;
280 case SCHED_INTERRUPTING:
281 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap,task,rtsFalse);
284
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
288 // case now.
289 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
290 // fall through
291
292 case SCHED_SHUTTING_DOWN:
293 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
296 // the run queue.
297 if (!isBoundTask(task) && emptyRunQueue(cap)) {
298 return cap;
299 }
300 break;
301 default:
302 barf("sched_state: %d", sched_state);
303 }
304
305 scheduleFindWork(&cap);
306
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap,task);
310
311 scheduleDetectDeadlock(&cap,task);
312
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
318 //
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
321
322 #if defined(THREADED_RTS)
323 if (first)
324 {
325 // XXX: ToDo
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
328 // // door.
329 // first = rtsFalse;
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
331 }
332
333 scheduleYield(&cap,task);
334
335 if (emptyRunQueue(cap)) continue; // look for work again
336 #endif
337
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap) ) {
340 ASSERT(sched_state >= SCHED_INTERRUPTING);
341 }
342 #endif
343
344 //
345 // Get a thread to run
346 //
347 t = popRunQueue(cap);
348
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity,checkTSO(t));
352
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
356 {
357 InCall *bound = t->bound;
358
359 if (bound) {
360 if (bound->task == task) {
361 // yes, the Haskell thread is bound to the current native thread
362 } else {
363 debugTrace(DEBUG_sched,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t->id);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap,t);
368 continue;
369 }
370 } else {
371 // The thread we want to run is unbound.
372 if (task->incall->tso) {
373 debugTrace(DEBUG_sched,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t->id);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap,t);
379 continue;
380 }
381 }
382 }
383 #endif
384
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state >= SCHED_INTERRUPTING &&
390 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391 deleteThread(cap,t);
392 }
393
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
400 //
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
406 #ifdef THREADED_RTS
407 if (cap->disabled && !t->bound) {
408 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
409 migrateThread(cap, t, dest_cap);
410 continue;
411 }
412 #endif
413
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
416 * +RTS -C0
417 */
418 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
419 && !emptyThreadQueues(cap)) {
420 cap->context_switch = 1;
421 }
422
423 run_thread:
424
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
427 // that.
428 cap->r.rCurrentTSO = t;
429
430 startHeapProfTimer();
431
432 // ----------------------------------------------------------------------
433 // Run the current thread
434
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436 ASSERT(t->cap == cap);
437 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
438
439 prev_what_next = t->what_next;
440
441 errno = t->saved_errno;
442 #if mingw32_HOST_OS
443 SetLastError(t->saved_winerror);
444 #endif
445
446 // reset the interrupt flag before running Haskell code
447 cap->interrupt = 0;
448
449 cap->in_haskell = rtsTrue;
450 cap->idle = 0;
451
452 dirty_TSO(cap,t);
453 dirty_STACK(cap,t->stackobj);
454
455 #if defined(THREADED_RTS)
456 if (recent_activity == ACTIVITY_DONE_GC) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
459 nat prev;
460 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
461 if (prev == ACTIVITY_DONE_GC) {
462 startTimer();
463 }
464 } else if (recent_activity != ACTIVITY_INACTIVE) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
468 // wakeUpRts().
469 recent_activity = ACTIVITY_YES;
470 }
471 #endif
472
473 traceEventRunThread(cap, t);
474
475 switch (prev_what_next) {
476
477 case ThreadKilled:
478 case ThreadComplete:
479 /* Thread already finished, return to scheduler. */
480 ret = ThreadFinished;
481 break;
482
483 case ThreadRunGHC:
484 {
485 StgRegTable *r;
486 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
487 cap = regTableToCapability(r);
488 ret = r->rRet;
489 break;
490 }
491
492 case ThreadInterpret:
493 cap = interpretBCO(cap);
494 ret = cap->r.rRet;
495 break;
496
497 default:
498 barf("schedule: invalid what_next field");
499 }
500
501 cap->in_haskell = rtsFalse;
502
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t = cap->r.rCurrentTSO;
506
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t->saved_errno = errno;
511 #if mingw32_HOST_OS
512 // Similarly for Windows error code
513 t->saved_winerror = GetLastError();
514 #endif
515
516 if (ret == ThreadBlocked) {
517 if (t->why_blocked == BlockedOnBlackHole) {
518 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
519 traceEventStopThread(cap, t, t->why_blocked + 6,
520 owner != NULL ? owner->id : 0);
521 } else {
522 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
523 }
524 } else {
525 traceEventStopThread(cap, t, ret, 0);
526 }
527
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529 ASSERT(t->cap == cap);
530
531 // ----------------------------------------------------------------------
532
533 // Costs for the scheduler are assigned to CCS_SYSTEM
534 stopHeapProfTimer();
535 #if defined(PROFILING)
536 cap->r.rCCCS = CCS_SYSTEM;
537 #endif
538
539 schedulePostRunThread(cap,t);
540
541 ready_to_gc = rtsFalse;
542
543 switch (ret) {
544 case HeapOverflow:
545 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
546 break;
547
548 case StackOverflow:
549 // just adjust the stack for this thread, then pop it back
550 // on the run queue.
551 threadStackOverflow(cap, t);
552 pushOnRunQueue(cap,t);
553 break;
554
555 case ThreadYielding:
556 if (scheduleHandleYield(cap, t, prev_what_next)) {
557 // shortcut for switching between compiler/interpreter:
558 goto run_thread;
559 }
560 break;
561
562 case ThreadBlocked:
563 scheduleHandleThreadBlocked(t);
564 break;
565
566 case ThreadFinished:
567 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
569 break;
570
571 default:
572 barf("schedule: invalid thread return code %d", (int)ret);
573 }
574
575 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
576 scheduleDoGC(&cap,task,rtsFalse);
577 }
578 } /* end of while() */
579 }
580
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
584
585 void
586 removeFromRunQueue (Capability *cap, StgTSO *tso)
587 {
588 if (tso->block_info.prev == END_TSO_QUEUE) {
589 ASSERT(cap->run_queue_hd == tso);
590 cap->run_queue_hd = tso->_link;
591 } else {
592 setTSOLink(cap, tso->block_info.prev, tso->_link);
593 }
594 if (tso->_link == END_TSO_QUEUE) {
595 ASSERT(cap->run_queue_tl == tso);
596 cap->run_queue_tl = tso->block_info.prev;
597 } else {
598 setTSOPrev(cap, tso->_link, tso->block_info.prev);
599 }
600 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
601
602 IF_DEBUG(sanity, checkRunQueue(cap));
603 }
604
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
608
609 static void
610 schedulePreLoop(void)
611 {
612 // initialisation for scheduler - what cannot go into initScheduler()
613
614 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
615 win32AllocStack();
616 #endif
617 }
618
619 /* -----------------------------------------------------------------------------
620 * scheduleFindWork()
621 *
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
624
625 static void
626 scheduleFindWork (Capability **pcap)
627 {
628 scheduleStartSignalHandlers(*pcap);
629
630 scheduleProcessInbox(pcap);
631
632 scheduleCheckBlockedThreads(*pcap);
633
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
636 #endif
637 }
638
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
642 {
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC, and we didn't just do a GC
645 // (see Note [GC livelock])
646 // - another Task is returning from a foreign call
647 // - the thread at the head of the run queue cannot be run
648 // by this Task (it is bound to another Task, or it is unbound
649 // and this task it bound).
650 //
651 // Note [GC livelock]
652 //
653 // If we are interrupted to do a GC, then we do not immediately do
654 // another one. This avoids a starvation situation where one
655 // Capability keeps forcing a GC and the other Capabilities make no
656 // progress at all.
657
658 return ((pending_sync && !didGcLast) ||
659 cap->returning_tasks_hd != NULL ||
660 (!emptyRunQueue(cap) && (task->incall->tso == NULL
661 ? cap->run_queue_hd->bound != NULL
662 : cap->run_queue_hd->bound != task->incall)));
663 }
664
665 // This is the single place where a Task goes to sleep. There are
666 // two reasons it might need to sleep:
667 // - there are no threads to run
668 // - we need to yield this Capability to someone else
669 // (see shouldYieldCapability())
670 //
671 // Careful: the scheduler loop is quite delicate. Make sure you run
672 // the tests in testsuite/concurrent (all ways) after modifying this,
673 // and also check the benchmarks in nofib/parallel for regressions.
674
675 static void
676 scheduleYield (Capability **pcap, Task *task)
677 {
678 Capability *cap = *pcap;
679 int didGcLast = rtsFalse;
680
681 // if we have work, and we don't need to give up the Capability, continue.
682 //
683 if (!shouldYieldCapability(cap,task,rtsFalse) &&
684 (!emptyRunQueue(cap) ||
685 !emptyInbox(cap) ||
686 sched_state >= SCHED_INTERRUPTING)) {
687 return;
688 }
689
690 // otherwise yield (sleep), and keep yielding if necessary.
691 do {
692 didGcLast = yieldCapability(&cap,task, !didGcLast);
693 }
694 while (shouldYieldCapability(cap,task,didGcLast));
695
696 // note there may still be no threads on the run queue at this
697 // point, the caller has to check.
698
699 *pcap = cap;
700 return;
701 }
702 #endif
703
704 /* -----------------------------------------------------------------------------
705 * schedulePushWork()
706 *
707 * Push work to other Capabilities if we have some.
708 * -------------------------------------------------------------------------- */
709
710 static void
711 schedulePushWork(Capability *cap USED_IF_THREADS,
712 Task *task USED_IF_THREADS)
713 {
714 /* following code not for PARALLEL_HASKELL. I kept the call general,
715 future GUM versions might use pushing in a distributed setup */
716 #if defined(THREADED_RTS)
717
718 Capability *free_caps[n_capabilities], *cap0;
719 nat i, n_free_caps;
720
721 // migration can be turned off with +RTS -qm
722 if (!RtsFlags.ParFlags.migrate) return;
723
724 // Check whether we have more threads on our run queue, or sparks
725 // in our pool, that we could hand to another Capability.
726 if (cap->run_queue_hd == END_TSO_QUEUE) {
727 if (sparkPoolSizeCap(cap) < 2) return;
728 } else {
729 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
730 sparkPoolSizeCap(cap) < 1) return;
731 }
732
733 // First grab as many free Capabilities as we can.
734 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
735 cap0 = &capabilities[i];
736 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
737 if (!emptyRunQueue(cap0)
738 || cap0->returning_tasks_hd != NULL
739 || cap0->inbox != (Message*)END_TSO_QUEUE) {
740 // it already has some work, we just grabbed it at
741 // the wrong moment. Or maybe it's deadlocked!
742 releaseCapability(cap0);
743 } else {
744 free_caps[n_free_caps++] = cap0;
745 }
746 }
747 }
748
749 // we now have n_free_caps free capabilities stashed in
750 // free_caps[]. Share our run queue equally with them. This is
751 // probably the simplest thing we could do; improvements we might
752 // want to do include:
753 //
754 // - giving high priority to moving relatively new threads, on
755 // the gournds that they haven't had time to build up a
756 // working set in the cache on this CPU/Capability.
757 //
758 // - giving low priority to moving long-lived threads
759
760 if (n_free_caps > 0) {
761 StgTSO *prev, *t, *next;
762 #ifdef SPARK_PUSHING
763 rtsBool pushed_to_all;
764 #endif
765
766 debugTrace(DEBUG_sched,
767 "cap %d: %s and %d free capabilities, sharing...",
768 cap->no,
769 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
770 "excess threads on run queue":"sparks to share (>=2)",
771 n_free_caps);
772
773 i = 0;
774 #ifdef SPARK_PUSHING
775 pushed_to_all = rtsFalse;
776 #endif
777
778 if (cap->run_queue_hd != END_TSO_QUEUE) {
779 prev = cap->run_queue_hd;
780 t = prev->_link;
781 prev->_link = END_TSO_QUEUE;
782 for (; t != END_TSO_QUEUE; t = next) {
783 next = t->_link;
784 t->_link = END_TSO_QUEUE;
785 if (t->bound == task->incall // don't move my bound thread
786 || tsoLocked(t)) { // don't move a locked thread
787 setTSOLink(cap, prev, t);
788 setTSOPrev(cap, t, prev);
789 prev = t;
790 } else if (i == n_free_caps) {
791 #ifdef SPARK_PUSHING
792 pushed_to_all = rtsTrue;
793 #endif
794 i = 0;
795 // keep one for us
796 setTSOLink(cap, prev, t);
797 setTSOPrev(cap, t, prev);
798 prev = t;
799 } else {
800 appendToRunQueue(free_caps[i],t);
801
802 traceEventMigrateThread (cap, t, free_caps[i]->no);
803
804 if (t->bound) { t->bound->task->cap = free_caps[i]; }
805 t->cap = free_caps[i];
806 i++;
807 }
808 }
809 cap->run_queue_tl = prev;
810
811 IF_DEBUG(sanity, checkRunQueue(cap));
812 }
813
814 #ifdef SPARK_PUSHING
815 /* JB I left this code in place, it would work but is not necessary */
816
817 // If there are some free capabilities that we didn't push any
818 // threads to, then try to push a spark to each one.
819 if (!pushed_to_all) {
820 StgClosure *spark;
821 // i is the next free capability to push to
822 for (; i < n_free_caps; i++) {
823 if (emptySparkPoolCap(free_caps[i])) {
824 spark = tryStealSpark(cap->sparks);
825 if (spark != NULL) {
826 /* TODO: if anyone wants to re-enable this code then
827 * they must consider the fizzledSpark(spark) case
828 * and update the per-cap spark statistics.
829 */
830 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
831
832 traceEventStealSpark(free_caps[i], t, cap->no);
833
834 newSpark(&(free_caps[i]->r), spark);
835 }
836 }
837 }
838 }
839 #endif /* SPARK_PUSHING */
840
841 // release the capabilities
842 for (i = 0; i < n_free_caps; i++) {
843 task->cap = free_caps[i];
844 releaseAndWakeupCapability(free_caps[i]);
845 }
846 }
847 task->cap = cap; // reset to point to our Capability.
848
849 #endif /* THREADED_RTS */
850
851 }
852
853 /* ----------------------------------------------------------------------------
854 * Start any pending signal handlers
855 * ------------------------------------------------------------------------- */
856
857 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
858 static void
859 scheduleStartSignalHandlers(Capability *cap)
860 {
861 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
862 // safe outside the lock
863 startSignalHandlers(cap);
864 }
865 }
866 #else
867 static void
868 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
869 {
870 }
871 #endif
872
873 /* ----------------------------------------------------------------------------
874 * Check for blocked threads that can be woken up.
875 * ------------------------------------------------------------------------- */
876
877 static void
878 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
879 {
880 #if !defined(THREADED_RTS)
881 //
882 // Check whether any waiting threads need to be woken up. If the
883 // run queue is empty, and there are no other tasks running, we
884 // can wait indefinitely for something to happen.
885 //
886 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
887 {
888 awaitEvent (emptyRunQueue(cap));
889 }
890 #endif
891 }
892
893 /* ----------------------------------------------------------------------------
894 * Detect deadlock conditions and attempt to resolve them.
895 * ------------------------------------------------------------------------- */
896
897 static void
898 scheduleDetectDeadlock (Capability **pcap, Task *task)
899 {
900 Capability *cap = *pcap;
901 /*
902 * Detect deadlock: when we have no threads to run, there are no
903 * threads blocked, waiting for I/O, or sleeping, and all the
904 * other tasks are waiting for work, we must have a deadlock of
905 * some description.
906 */
907 if ( emptyThreadQueues(cap) )
908 {
909 #if defined(THREADED_RTS)
910 /*
911 * In the threaded RTS, we only check for deadlock if there
912 * has been no activity in a complete timeslice. This means
913 * we won't eagerly start a full GC just because we don't have
914 * any threads to run currently.
915 */
916 if (recent_activity != ACTIVITY_INACTIVE) return;
917 #endif
918
919 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
920
921 // Garbage collection can release some new threads due to
922 // either (a) finalizers or (b) threads resurrected because
923 // they are unreachable and will therefore be sent an
924 // exception. Any threads thus released will be immediately
925 // runnable.
926 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
927 cap = *pcap;
928 // when force_major == rtsTrue. scheduleDoGC sets
929 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
930 // signal.
931
932 if ( !emptyRunQueue(cap) ) return;
933
934 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
935 /* If we have user-installed signal handlers, then wait
936 * for signals to arrive rather then bombing out with a
937 * deadlock.
938 */
939 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
940 debugTrace(DEBUG_sched,
941 "still deadlocked, waiting for signals...");
942
943 awaitUserSignals();
944
945 if (signals_pending()) {
946 startSignalHandlers(cap);
947 }
948
949 // either we have threads to run, or we were interrupted:
950 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
951
952 return;
953 }
954 #endif
955
956 #if !defined(THREADED_RTS)
957 /* Probably a real deadlock. Send the current main thread the
958 * Deadlock exception.
959 */
960 if (task->incall->tso) {
961 switch (task->incall->tso->why_blocked) {
962 case BlockedOnSTM:
963 case BlockedOnBlackHole:
964 case BlockedOnMsgThrowTo:
965 case BlockedOnMVar:
966 throwToSingleThreaded(cap, task->incall->tso,
967 (StgClosure *)nonTermination_closure);
968 return;
969 default:
970 barf("deadlock: main thread blocked in a strange way");
971 }
972 }
973 return;
974 #endif
975 }
976 }
977
978
979 /* ----------------------------------------------------------------------------
980 * Send pending messages (PARALLEL_HASKELL only)
981 * ------------------------------------------------------------------------- */
982
983 #if defined(PARALLEL_HASKELL)
984 static void
985 scheduleSendPendingMessages(void)
986 {
987
988 # if defined(PAR) // global Mem.Mgmt., omit for now
989 if (PendingFetches != END_BF_QUEUE) {
990 processFetches();
991 }
992 # endif
993
994 if (RtsFlags.ParFlags.BufferTime) {
995 // if we use message buffering, we must send away all message
996 // packets which have become too old...
997 sendOldBuffers();
998 }
999 }
1000 #endif
1001
1002 /* ----------------------------------------------------------------------------
1003 * Process message in the current Capability's inbox
1004 * ------------------------------------------------------------------------- */
1005
1006 static void
1007 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
1008 {
1009 #if defined(THREADED_RTS)
1010 Message *m, *next;
1011 int r;
1012 Capability *cap = *pcap;
1013
1014 while (!emptyInbox(cap)) {
1015 if (cap->r.rCurrentNursery->link == NULL ||
1016 g0->n_new_large_words >= large_alloc_lim) {
1017 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1018 cap = *pcap;
1019 }
1020
1021 // don't use a blocking acquire; if the lock is held by
1022 // another thread then just carry on. This seems to avoid
1023 // getting stuck in a message ping-pong situation with other
1024 // processors. We'll check the inbox again later anyway.
1025 //
1026 // We should really use a more efficient queue data structure
1027 // here. The trickiness is that we must ensure a Capability
1028 // never goes idle if the inbox is non-empty, which is why we
1029 // use cap->lock (cap->lock is released as the last thing
1030 // before going idle; see Capability.c:releaseCapability()).
1031 r = TRY_ACQUIRE_LOCK(&cap->lock);
1032 if (r != 0) return;
1033
1034 m = cap->inbox;
1035 cap->inbox = (Message*)END_TSO_QUEUE;
1036
1037 RELEASE_LOCK(&cap->lock);
1038
1039 while (m != (Message*)END_TSO_QUEUE) {
1040 next = m->link;
1041 executeMessage(cap, m);
1042 m = next;
1043 }
1044 }
1045 #endif
1046 }
1047
1048 /* ----------------------------------------------------------------------------
1049 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1050 * ------------------------------------------------------------------------- */
1051
1052 #if defined(THREADED_RTS)
1053 static void
1054 scheduleActivateSpark(Capability *cap)
1055 {
1056 if (anySparks() && !cap->disabled)
1057 {
1058 createSparkThread(cap);
1059 debugTrace(DEBUG_sched, "creating a spark thread");
1060 }
1061 }
1062 #endif // PARALLEL_HASKELL || THREADED_RTS
1063
1064 /* ----------------------------------------------------------------------------
1065 * After running a thread...
1066 * ------------------------------------------------------------------------- */
1067
1068 static void
1069 schedulePostRunThread (Capability *cap, StgTSO *t)
1070 {
1071 // We have to be able to catch transactions that are in an
1072 // infinite loop as a result of seeing an inconsistent view of
1073 // memory, e.g.
1074 //
1075 // atomically $ do
1076 // [a,b] <- mapM readTVar [ta,tb]
1077 // when (a == b) loop
1078 //
1079 // and a is never equal to b given a consistent view of memory.
1080 //
1081 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1082 if (!stmValidateNestOfTransactions (t -> trec)) {
1083 debugTrace(DEBUG_sched | DEBUG_stm,
1084 "trec %p found wasting its time", t);
1085
1086 // strip the stack back to the
1087 // ATOMICALLY_FRAME, aborting the (nested)
1088 // transaction, and saving the stack of any
1089 // partially-evaluated thunks on the heap.
1090 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1091
1092 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1093 }
1094 }
1095
1096 /* some statistics gathering in the parallel case */
1097 }
1098
1099 /* -----------------------------------------------------------------------------
1100 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1101 * -------------------------------------------------------------------------- */
1102
1103 static rtsBool
1104 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1105 {
1106 // did the task ask for a large block?
1107 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1108 // if so, get one and push it on the front of the nursery.
1109 bdescr *bd;
1110 lnat blocks;
1111
1112 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1113
1114 if (blocks > BLOCKS_PER_MBLOCK) {
1115 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1116 }
1117
1118 debugTrace(DEBUG_sched,
1119 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1120 (long)t->id, what_next_strs[t->what_next], blocks);
1121
1122 // don't do this if the nursery is (nearly) full, we'll GC first.
1123 if (cap->r.rCurrentNursery->link != NULL ||
1124 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1125 // if the nursery has only one block.
1126
1127 bd = allocGroup_lock(blocks);
1128 cap->r.rNursery->n_blocks += blocks;
1129
1130 // link the new group into the list
1131 bd->link = cap->r.rCurrentNursery;
1132 bd->u.back = cap->r.rCurrentNursery->u.back;
1133 if (cap->r.rCurrentNursery->u.back != NULL) {
1134 cap->r.rCurrentNursery->u.back->link = bd;
1135 } else {
1136 cap->r.rNursery->blocks = bd;
1137 }
1138 cap->r.rCurrentNursery->u.back = bd;
1139
1140 // initialise it as a nursery block. We initialise the
1141 // step, gen_no, and flags field of *every* sub-block in
1142 // this large block, because this is easier than making
1143 // sure that we always find the block head of a large
1144 // block whenever we call Bdescr() (eg. evacuate() and
1145 // isAlive() in the GC would both have to do this, at
1146 // least).
1147 {
1148 bdescr *x;
1149 for (x = bd; x < bd + blocks; x++) {
1150 initBdescr(x,g0,g0);
1151 x->free = x->start;
1152 x->flags = 0;
1153 }
1154 }
1155
1156 // This assert can be a killer if the app is doing lots
1157 // of large block allocations.
1158 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1159
1160 // now update the nursery to point to the new block
1161 cap->r.rCurrentNursery = bd;
1162
1163 // we might be unlucky and have another thread get on the
1164 // run queue before us and steal the large block, but in that
1165 // case the thread will just end up requesting another large
1166 // block.
1167 pushOnRunQueue(cap,t);
1168 return rtsFalse; /* not actually GC'ing */
1169 }
1170 }
1171
1172 if (cap->r.rHpLim == NULL || cap->context_switch) {
1173 // Sometimes we miss a context switch, e.g. when calling
1174 // primitives in a tight loop, MAYBE_GC() doesn't check the
1175 // context switch flag, and we end up waiting for a GC.
1176 // See #1984, and concurrent/should_run/1984
1177 cap->context_switch = 0;
1178 appendToRunQueue(cap,t);
1179 } else {
1180 pushOnRunQueue(cap,t);
1181 }
1182 return rtsTrue;
1183 /* actual GC is done at the end of the while loop in schedule() */
1184 }
1185
1186 /* -----------------------------------------------------------------------------
1187 * Handle a thread that returned to the scheduler with ThreadYielding
1188 * -------------------------------------------------------------------------- */
1189
1190 static rtsBool
1191 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1192 {
1193 /* put the thread back on the run queue. Then, if we're ready to
1194 * GC, check whether this is the last task to stop. If so, wake
1195 * up the GC thread. getThread will block during a GC until the
1196 * GC is finished.
1197 */
1198
1199 ASSERT(t->_link == END_TSO_QUEUE);
1200
1201 // Shortcut if we're just switching evaluators: don't bother
1202 // doing stack squeezing (which can be expensive), just run the
1203 // thread.
1204 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1205 debugTrace(DEBUG_sched,
1206 "--<< thread %ld (%s) stopped to switch evaluators",
1207 (long)t->id, what_next_strs[t->what_next]);
1208 return rtsTrue;
1209 }
1210
1211 // Reset the context switch flag. We don't do this just before
1212 // running the thread, because that would mean we would lose ticks
1213 // during GC, which can lead to unfair scheduling (a thread hogs
1214 // the CPU because the tick always arrives during GC). This way
1215 // penalises threads that do a lot of allocation, but that seems
1216 // better than the alternative.
1217 if (cap->context_switch != 0) {
1218 cap->context_switch = 0;
1219 appendToRunQueue(cap,t);
1220 } else {
1221 pushOnRunQueue(cap,t);
1222 }
1223
1224 IF_DEBUG(sanity,
1225 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1226 checkTSO(t));
1227
1228 return rtsFalse;
1229 }
1230
1231 /* -----------------------------------------------------------------------------
1232 * Handle a thread that returned to the scheduler with ThreadBlocked
1233 * -------------------------------------------------------------------------- */
1234
1235 static void
1236 scheduleHandleThreadBlocked( StgTSO *t
1237 #if !defined(DEBUG)
1238 STG_UNUSED
1239 #endif
1240 )
1241 {
1242
1243 // We don't need to do anything. The thread is blocked, and it
1244 // has tidied up its stack and placed itself on whatever queue
1245 // it needs to be on.
1246
1247 // ASSERT(t->why_blocked != NotBlocked);
1248 // Not true: for example,
1249 // - the thread may have woken itself up already, because
1250 // threadPaused() might have raised a blocked throwTo
1251 // exception, see maybePerformBlockedException().
1252
1253 #ifdef DEBUG
1254 traceThreadStatus(DEBUG_sched, t);
1255 #endif
1256 }
1257
1258 /* -----------------------------------------------------------------------------
1259 * Handle a thread that returned to the scheduler with ThreadFinished
1260 * -------------------------------------------------------------------------- */
1261
1262 static rtsBool
1263 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1264 {
1265 /* Need to check whether this was a main thread, and if so,
1266 * return with the return value.
1267 *
1268 * We also end up here if the thread kills itself with an
1269 * uncaught exception, see Exception.cmm.
1270 */
1271
1272 // blocked exceptions can now complete, even if the thread was in
1273 // blocked mode (see #2910).
1274 awakenBlockedExceptionQueue (cap, t);
1275
1276 //
1277 // Check whether the thread that just completed was a bound
1278 // thread, and if so return with the result.
1279 //
1280 // There is an assumption here that all thread completion goes
1281 // through this point; we need to make sure that if a thread
1282 // ends up in the ThreadKilled state, that it stays on the run
1283 // queue so it can be dealt with here.
1284 //
1285
1286 if (t->bound) {
1287
1288 if (t->bound != task->incall) {
1289 #if !defined(THREADED_RTS)
1290 // Must be a bound thread that is not the topmost one. Leave
1291 // it on the run queue until the stack has unwound to the
1292 // point where we can deal with this. Leaving it on the run
1293 // queue also ensures that the garbage collector knows about
1294 // this thread and its return value (it gets dropped from the
1295 // step->threads list so there's no other way to find it).
1296 appendToRunQueue(cap,t);
1297 return rtsFalse;
1298 #else
1299 // this cannot happen in the threaded RTS, because a
1300 // bound thread can only be run by the appropriate Task.
1301 barf("finished bound thread that isn't mine");
1302 #endif
1303 }
1304
1305 ASSERT(task->incall->tso == t);
1306
1307 if (t->what_next == ThreadComplete) {
1308 if (task->incall->ret) {
1309 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1310 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1311 }
1312 task->incall->stat = Success;
1313 } else {
1314 if (task->incall->ret) {
1315 *(task->incall->ret) = NULL;
1316 }
1317 if (sched_state >= SCHED_INTERRUPTING) {
1318 if (heap_overflow) {
1319 task->incall->stat = HeapExhausted;
1320 } else {
1321 task->incall->stat = Interrupted;
1322 }
1323 } else {
1324 task->incall->stat = Killed;
1325 }
1326 }
1327 #ifdef DEBUG
1328 removeThreadLabel((StgWord)task->incall->tso->id);
1329 #endif
1330
1331 // We no longer consider this thread and task to be bound to
1332 // each other. The TSO lives on until it is GC'd, but the
1333 // task is about to be released by the caller, and we don't
1334 // want anyone following the pointer from the TSO to the
1335 // defunct task (which might have already been
1336 // re-used). This was a real bug: the GC updated
1337 // tso->bound->tso which lead to a deadlock.
1338 t->bound = NULL;
1339 task->incall->tso = NULL;
1340
1341 return rtsTrue; // tells schedule() to return
1342 }
1343
1344 return rtsFalse;
1345 }
1346
1347 /* -----------------------------------------------------------------------------
1348 * Perform a heap census
1349 * -------------------------------------------------------------------------- */
1350
1351 static rtsBool
1352 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1353 {
1354 // When we have +RTS -i0 and we're heap profiling, do a census at
1355 // every GC. This lets us get repeatable runs for debugging.
1356 if (performHeapProfile ||
1357 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1358 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1359 return rtsTrue;
1360 } else {
1361 return rtsFalse;
1362 }
1363 }
1364
1365 /* -----------------------------------------------------------------------------
1366 * Start a synchronisation of all capabilities
1367 * -------------------------------------------------------------------------- */
1368
1369 // Returns:
1370 // 0 if we successfully got a sync
1371 // non-0 if there was another sync request in progress,
1372 // and we yielded to it. The value returned is the
1373 // type of the other sync request.
1374 //
1375 #if defined(THREADED_RTS)
1376 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1377 {
1378 nat prev_pending_sync;
1379
1380 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1381
1382 if (prev_pending_sync)
1383 {
1384 do {
1385 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1386 prev_pending_sync);
1387 ASSERT(*pcap);
1388 yieldCapability(pcap,task,rtsTrue);
1389 } while (pending_sync);
1390 return prev_pending_sync; // NOTE: task->cap might have changed now
1391 }
1392 else
1393 {
1394 return 0;
1395 }
1396 }
1397
1398 //
1399 // Grab all the capabilities except the one we already hold. Used
1400 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1401 // before a fork (SYNC_OTHER).
1402 //
1403 // Only call this after requestSync(), otherwise a deadlock might
1404 // ensue if another thread is trying to synchronise.
1405 //
1406 static void acquireAllCapabilities(Capability *cap, Task *task)
1407 {
1408 Capability *tmpcap;
1409 nat i;
1410
1411 for (i=0; i < n_capabilities; i++) {
1412 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1413 tmpcap = &capabilities[i];
1414 if (tmpcap != cap) {
1415 // we better hope this task doesn't get migrated to
1416 // another Capability while we're waiting for this one.
1417 // It won't, because load balancing happens while we have
1418 // all the Capabilities, but even so it's a slightly
1419 // unsavoury invariant.
1420 task->cap = tmpcap;
1421 waitForReturnCapability(&tmpcap, task);
1422 if (tmpcap->no != i) {
1423 barf("acquireAllCapabilities: got the wrong capability");
1424 }
1425 }
1426 }
1427 task->cap = cap;
1428 }
1429
1430 static void releaseAllCapabilities(Capability *cap, Task *task)
1431 {
1432 nat i;
1433
1434 for (i = 0; i < n_capabilities; i++) {
1435 if (cap->no != i) {
1436 task->cap = &capabilities[i];
1437 releaseCapability(&capabilities[i]);
1438 }
1439 }
1440 task->cap = cap;
1441 }
1442 #endif
1443
1444 /* -----------------------------------------------------------------------------
1445 * Perform a garbage collection if necessary
1446 * -------------------------------------------------------------------------- */
1447
1448 static void
1449 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1450 rtsBool force_major)
1451 {
1452 Capability *cap = *pcap;
1453 rtsBool heap_census;
1454 nat collect_gen;
1455 #ifdef THREADED_RTS
1456 rtsBool idle_cap[n_capabilities];
1457 rtsBool gc_type;
1458 nat i, sync;
1459 StgTSO *tso;
1460 #endif
1461
1462 if (sched_state == SCHED_SHUTTING_DOWN) {
1463 // The final GC has already been done, and the system is
1464 // shutting down. We'll probably deadlock if we try to GC
1465 // now.
1466 return;
1467 }
1468
1469 heap_census = scheduleNeedHeapProfile(rtsTrue);
1470
1471 // Figure out which generation we are collecting, so that we can
1472 // decide whether this is a parallel GC or not.
1473 collect_gen = calcNeeded(force_major || heap_census, NULL);
1474
1475 #ifdef THREADED_RTS
1476 if (sched_state < SCHED_INTERRUPTING
1477 && RtsFlags.ParFlags.parGcEnabled
1478 && collect_gen >= RtsFlags.ParFlags.parGcGen
1479 && ! oldest_gen->mark)
1480 {
1481 gc_type = SYNC_GC_PAR;
1482 } else {
1483 gc_type = SYNC_GC_SEQ;
1484 }
1485
1486 // In order to GC, there must be no threads running Haskell code.
1487 // Therefore, the GC thread needs to hold *all* the capabilities,
1488 // and release them after the GC has completed.
1489 //
1490 // This seems to be the simplest way: previous attempts involved
1491 // making all the threads with capabilities give up their
1492 // capabilities and sleep except for the *last* one, which
1493 // actually did the GC. But it's quite hard to arrange for all
1494 // the other tasks to sleep and stay asleep.
1495 //
1496
1497 /* Other capabilities are prevented from running yet more Haskell
1498 threads if pending_sync is set. Tested inside
1499 yieldCapability() and releaseCapability() in Capability.c */
1500
1501 do {
1502 sync = requestSync(pcap, task, gc_type);
1503 cap = *pcap;
1504 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1505 // someone else had a pending sync request for a GC, so
1506 // let's assume GC has been done and we don't need to GC
1507 // again.
1508 return;
1509 }
1510 if (sched_state == SCHED_SHUTTING_DOWN) {
1511 // The scheduler might now be shutting down. We tested
1512 // this above, but it might have become true since then as
1513 // we yielded the capability in requestSync().
1514 return;
1515 }
1516 } while (sync);
1517
1518 interruptAllCapabilities();
1519
1520 // The final shutdown GC is always single-threaded, because it's
1521 // possible that some of the Capabilities have no worker threads.
1522
1523 if (gc_type == SYNC_GC_SEQ)
1524 {
1525 traceEventRequestSeqGc(cap);
1526 }
1527 else
1528 {
1529 traceEventRequestParGc(cap);
1530 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1531 }
1532
1533 if (gc_type == SYNC_GC_SEQ)
1534 {
1535 // single-threaded GC: grab all the capabilities
1536 acquireAllCapabilities(cap,task);
1537 }
1538 else
1539 {
1540 // If we are load-balancing collections in this
1541 // generation, then we require all GC threads to participate
1542 // in the collection. Otherwise, we only require active
1543 // threads to participate, and we set gc_threads[i]->idle for
1544 // any idle capabilities. The rationale here is that waking
1545 // up an idle Capability takes much longer than just doing any
1546 // GC work on its behalf.
1547
1548 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1549 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1550 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1551 for (i=0; i < n_capabilities; i++) {
1552 if (capabilities[i].disabled) {
1553 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1554 } else {
1555 idle_cap[i] = rtsFalse;
1556 }
1557 }
1558 } else {
1559 for (i=0; i < n_capabilities; i++) {
1560 if (capabilities[i].disabled) {
1561 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1562 } else if (i == cap->no ||
1563 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1564 idle_cap[i] = rtsFalse;
1565 } else {
1566 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1567 if (!idle_cap[i]) {
1568 n_failed_trygrab_idles++;
1569 } else {
1570 n_idle_caps++;
1571 }
1572 }
1573 }
1574 }
1575
1576 // We set the gc_thread[i]->idle flag if that
1577 // capability/thread is not participating in this collection.
1578 // We also keep a local record of which capabilities are idle
1579 // in idle_cap[], because scheduleDoGC() is re-entrant:
1580 // another thread might start a GC as soon as we've finished
1581 // this one, and thus the gc_thread[]->idle flags are invalid
1582 // as soon as we release any threads after GC. Getting this
1583 // wrong leads to a rare and hard to debug deadlock!
1584
1585 for (i=0; i < n_capabilities; i++) {
1586 gc_threads[i]->idle = idle_cap[i];
1587 capabilities[i].idle++;
1588 }
1589
1590 // For all capabilities participating in this GC, wait until
1591 // they have stopped mutating and are standing by for GC.
1592 waitForGcThreads(cap);
1593
1594 #if defined(THREADED_RTS)
1595 // Stable point where we can do a global check on our spark counters
1596 ASSERT(checkSparkCountInvariant());
1597 #endif
1598 }
1599
1600 #endif
1601
1602 IF_DEBUG(scheduler, printAllThreads());
1603
1604 delete_threads_and_gc:
1605 /*
1606 * We now have all the capabilities; if we're in an interrupting
1607 * state, then we should take the opportunity to delete all the
1608 * threads in the system.
1609 */
1610 if (sched_state == SCHED_INTERRUPTING) {
1611 deleteAllThreads(cap);
1612 #if defined(THREADED_RTS)
1613 // Discard all the sparks from every Capability. Why?
1614 // They'll probably be GC'd anyway since we've killed all the
1615 // threads. It just avoids the GC having to do any work to
1616 // figure out that any remaining sparks are garbage.
1617 for (i = 0; i < n_capabilities; i++) {
1618 capabilities[i].spark_stats.gcd +=
1619 sparkPoolSize(capabilities[i].sparks);
1620 // No race here since all Caps are stopped.
1621 discardSparksCap(&capabilities[i]);
1622 }
1623 #endif
1624 sched_state = SCHED_SHUTTING_DOWN;
1625 }
1626
1627 /*
1628 * When there are disabled capabilities, we want to migrate any
1629 * threads away from them. Normally this happens in the
1630 * scheduler's loop, but only for unbound threads - it's really
1631 * hard for a bound thread to migrate itself. So we have another
1632 * go here.
1633 */
1634 #if defined(THREADED_RTS)
1635 for (i = enabled_capabilities; i < n_capabilities; i++) {
1636 Capability *tmp_cap, *dest_cap;
1637 tmp_cap = &capabilities[i];
1638 ASSERT(tmp_cap->disabled);
1639 if (i != cap->no) {
1640 dest_cap = &capabilities[i % enabled_capabilities];
1641 while (!emptyRunQueue(tmp_cap)) {
1642 tso = popRunQueue(tmp_cap);
1643 migrateThread(tmp_cap, tso, dest_cap);
1644 if (tso->bound) {
1645 traceTaskMigrate(tso->bound->task,
1646 tso->bound->task->cap,
1647 dest_cap);
1648 tso->bound->task->cap = dest_cap;
1649 }
1650 }
1651 }
1652 }
1653 #endif
1654
1655 #if defined(THREADED_RTS)
1656 // reset pending_sync *before* GC, so that when the GC threads
1657 // emerge they don't immediately re-enter the GC.
1658 pending_sync = 0;
1659 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1660 #else
1661 GarbageCollect(collect_gen, heap_census, 0, cap);
1662 #endif
1663
1664 traceSparkCounters(cap);
1665
1666 switch (recent_activity) {
1667 case ACTIVITY_INACTIVE:
1668 if (force_major) {
1669 // We are doing a GC because the system has been idle for a
1670 // timeslice and we need to check for deadlock. Record the
1671 // fact that we've done a GC and turn off the timer signal;
1672 // it will get re-enabled if we run any threads after the GC.
1673 recent_activity = ACTIVITY_DONE_GC;
1674 stopTimer();
1675 break;
1676 }
1677 // fall through...
1678
1679 case ACTIVITY_MAYBE_NO:
1680 // the GC might have taken long enough for the timer to set
1681 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1682 // but we aren't necessarily deadlocked:
1683 recent_activity = ACTIVITY_YES;
1684 break;
1685
1686 case ACTIVITY_DONE_GC:
1687 // If we are actually active, the scheduler will reset the
1688 // recent_activity flag and re-enable the timer.
1689 break;
1690 }
1691
1692 #if defined(THREADED_RTS)
1693 // Stable point where we can do a global check on our spark counters
1694 ASSERT(checkSparkCountInvariant());
1695 #endif
1696
1697 // The heap census itself is done during GarbageCollect().
1698 if (heap_census) {
1699 performHeapProfile = rtsFalse;
1700 }
1701
1702 #if defined(THREADED_RTS)
1703 if (gc_type == SYNC_GC_PAR)
1704 {
1705 releaseGCThreads(cap);
1706 for (i = 0; i < n_capabilities; i++) {
1707 if (i != cap->no) {
1708 if (idle_cap[i]) {
1709 ASSERT(capabilities[i].running_task == task);
1710 task->cap = &capabilities[i];
1711 releaseCapability(&capabilities[i]);
1712 } else {
1713 ASSERT(capabilities[i].running_task != task);
1714 }
1715 }
1716 }
1717 task->cap = cap;
1718 }
1719 #endif
1720
1721 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1722 // GC set the heap_overflow flag, so we should proceed with
1723 // an orderly shutdown now. Ultimately we want the main
1724 // thread to return to its caller with HeapExhausted, at which
1725 // point the caller should call hs_exit(). The first step is
1726 // to delete all the threads.
1727 //
1728 // Another way to do this would be to raise an exception in
1729 // the main thread, which we really should do because it gives
1730 // the program a chance to clean up. But how do we find the
1731 // main thread? It should presumably be the same one that
1732 // gets ^C exceptions, but that's all done on the Haskell side
1733 // (GHC.TopHandler).
1734 sched_state = SCHED_INTERRUPTING;
1735 goto delete_threads_and_gc;
1736 }
1737
1738 #ifdef SPARKBALANCE
1739 /* JB
1740 Once we are all together... this would be the place to balance all
1741 spark pools. No concurrent stealing or adding of new sparks can
1742 occur. Should be defined in Sparks.c. */
1743 balanceSparkPoolsCaps(n_capabilities, capabilities);
1744 #endif
1745
1746 #if defined(THREADED_RTS)
1747 if (gc_type == SYNC_GC_SEQ) {
1748 // release our stash of capabilities.
1749 releaseAllCapabilities(cap, task);
1750 }
1751 #endif
1752
1753 return;
1754 }
1755
1756 /* ---------------------------------------------------------------------------
1757 * Singleton fork(). Do not copy any running threads.
1758 * ------------------------------------------------------------------------- */
1759
1760 pid_t
1761 forkProcess(HsStablePtr *entry
1762 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1763 STG_UNUSED
1764 #endif
1765 )
1766 {
1767 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1768 pid_t pid;
1769 StgTSO* t,*next;
1770 Capability *cap;
1771 nat g;
1772 Task *task = NULL;
1773 nat i;
1774 #ifdef THREADED_RTS
1775 nat sync;
1776 #endif
1777
1778 debugTrace(DEBUG_sched, "forking!");
1779
1780 task = newBoundTask();
1781
1782 cap = NULL;
1783 waitForReturnCapability(&cap, task);
1784
1785 #ifdef THREADED_RTS
1786 do {
1787 sync = requestSync(&cap, task, SYNC_OTHER);
1788 } while (sync);
1789
1790 acquireAllCapabilities(cap,task);
1791
1792 pending_sync = 0;
1793 #endif
1794
1795 // no funny business: hold locks while we fork, otherwise if some
1796 // other thread is holding a lock when the fork happens, the data
1797 // structure protected by the lock will forever be in an
1798 // inconsistent state in the child. See also #1391.
1799 ACQUIRE_LOCK(&sched_mutex);
1800 ACQUIRE_LOCK(&sm_mutex);
1801 ACQUIRE_LOCK(&stable_mutex);
1802 ACQUIRE_LOCK(&task->lock);
1803
1804 for (i=0; i < n_capabilities; i++) {
1805 ACQUIRE_LOCK(&capabilities[i].lock);
1806 }
1807
1808 stopTimer(); // See #4074
1809
1810 #if defined(TRACING)
1811 flushEventLog(); // so that child won't inherit dirty file buffers
1812 #endif
1813
1814 pid = fork();
1815
1816 if (pid) { // parent
1817
1818 startTimer(); // #4074
1819
1820 RELEASE_LOCK(&sched_mutex);
1821 RELEASE_LOCK(&sm_mutex);
1822 RELEASE_LOCK(&stable_mutex);
1823 RELEASE_LOCK(&task->lock);
1824
1825 for (i=0; i < n_capabilities; i++) {
1826 releaseCapability_(&capabilities[i],rtsFalse);
1827 RELEASE_LOCK(&capabilities[i].lock);
1828 }
1829 boundTaskExiting(task);
1830
1831 // just return the pid
1832 return pid;
1833
1834 } else { // child
1835
1836 #if defined(THREADED_RTS)
1837 initMutex(&sched_mutex);
1838 initMutex(&sm_mutex);
1839 initMutex(&stable_mutex);
1840 initMutex(&task->lock);
1841
1842 for (i=0; i < n_capabilities; i++) {
1843 initMutex(&capabilities[i].lock);
1844 }
1845 #endif
1846
1847 #ifdef TRACING
1848 resetTracing();
1849 #endif
1850
1851 // Now, all OS threads except the thread that forked are
1852 // stopped. We need to stop all Haskell threads, including
1853 // those involved in foreign calls. Also we need to delete
1854 // all Tasks, because they correspond to OS threads that are
1855 // now gone.
1856
1857 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1858 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1859 next = t->global_link;
1860 // don't allow threads to catch the ThreadKilled
1861 // exception, but we do want to raiseAsync() because these
1862 // threads may be evaluating thunks that we need later.
1863 deleteThread_(t->cap,t);
1864
1865 // stop the GC from updating the InCall to point to
1866 // the TSO. This is only necessary because the
1867 // OSThread bound to the TSO has been killed, and
1868 // won't get a chance to exit in the usual way (see
1869 // also scheduleHandleThreadFinished).
1870 t->bound = NULL;
1871 }
1872 }
1873
1874 discardTasksExcept(task);
1875
1876 for (i=0; i < n_capabilities; i++) {
1877 cap = &capabilities[i];
1878
1879 // Empty the run queue. It seems tempting to let all the
1880 // killed threads stay on the run queue as zombies to be
1881 // cleaned up later, but some of them may correspond to
1882 // bound threads for which the corresponding Task does not
1883 // exist.
1884 cap->run_queue_hd = END_TSO_QUEUE;
1885 cap->run_queue_tl = END_TSO_QUEUE;
1886
1887 // Any suspended C-calling Tasks are no more, their OS threads
1888 // don't exist now:
1889 cap->suspended_ccalls = NULL;
1890
1891 #if defined(THREADED_RTS)
1892 // Wipe our spare workers list, they no longer exist. New
1893 // workers will be created if necessary.
1894 cap->spare_workers = NULL;
1895 cap->n_spare_workers = 0;
1896 cap->returning_tasks_hd = NULL;
1897 cap->returning_tasks_tl = NULL;
1898 #endif
1899
1900 // Release all caps except 0, we'll use that for starting
1901 // the IO manager and running the client action below.
1902 if (cap->no != 0) {
1903 task->cap = cap;
1904 releaseCapability(cap);
1905 }
1906 }
1907 cap = &capabilities[0];
1908 task->cap = cap;
1909
1910 // Empty the threads lists. Otherwise, the garbage
1911 // collector may attempt to resurrect some of these threads.
1912 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1913 generations[g].threads = END_TSO_QUEUE;
1914 }
1915
1916 // On Unix, all timers are reset in the child, so we need to start
1917 // the timer again.
1918 initTimer();
1919 startTimer();
1920
1921 // TODO: need to trace various other things in the child
1922 // like startup event, capabilities, process info etc
1923 traceTaskCreate(task, cap);
1924
1925 #if defined(THREADED_RTS)
1926 ioManagerStartCap(&cap);
1927 #endif
1928
1929 rts_evalStableIO(&cap, entry, NULL); // run the action
1930 rts_checkSchedStatus("forkProcess",cap);
1931
1932 rts_unlock(cap);
1933 hs_exit(); // clean up and exit
1934 stg_exit(EXIT_SUCCESS);
1935 }
1936 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1937 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1938 #endif
1939 }
1940
1941 /* ---------------------------------------------------------------------------
1942 * Changing the number of Capabilities
1943 *
1944 * Changing the number of Capabilities is very tricky! We can only do
1945 * it with the system fully stopped, so we do a full sync with
1946 * requestSync(SYNC_OTHER) and grab all the capabilities.
1947 *
1948 * Then we resize the appropriate data structures, and update all
1949 * references to the old data structures which have now moved.
1950 * Finally we release the Capabilities we are holding, and start
1951 * worker Tasks on the new Capabilities we created.
1952 *
1953 * ------------------------------------------------------------------------- */
1954
1955 void
1956 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1957 {
1958 #if !defined(THREADED_RTS)
1959 if (new_n_capabilities != 1) {
1960 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1961 }
1962 return;
1963 #elif defined(NOSMP)
1964 if (new_n_capabilities != 1) {
1965 errorBelch("setNumCapabilities: not supported on this platform");
1966 }
1967 return;
1968 #else
1969 Task *task;
1970 Capability *cap;
1971 nat sync;
1972 StgTSO* t;
1973 nat g, n;
1974 Capability *old_capabilities = NULL;
1975
1976 if (new_n_capabilities == enabled_capabilities) return;
1977
1978 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1979 enabled_capabilities, new_n_capabilities);
1980
1981 cap = rts_lock();
1982 task = cap->running_task;
1983
1984 do {
1985 sync = requestSync(&cap, task, SYNC_OTHER);
1986 } while (sync);
1987
1988 acquireAllCapabilities(cap,task);
1989
1990 pending_sync = 0;
1991
1992 if (new_n_capabilities < enabled_capabilities)
1993 {
1994 // Reducing the number of capabilities: we do not actually
1995 // remove the extra capabilities, we just mark them as
1996 // "disabled". This has the following effects:
1997 //
1998 // - threads on a disabled capability are migrated away by the
1999 // scheduler loop
2000 //
2001 // - disabled capabilities do not participate in GC
2002 // (see scheduleDoGC())
2003 //
2004 // - No spark threads are created on this capability
2005 // (see scheduleActivateSpark())
2006 //
2007 // - We do not attempt to migrate threads *to* a disabled
2008 // capability (see schedulePushWork()).
2009 //
2010 // but in other respects, a disabled capability remains
2011 // alive. Threads may be woken up on a disabled capability,
2012 // but they will be immediately migrated away.
2013 //
2014 // This approach is much easier than trying to actually remove
2015 // the capability; we don't have to worry about GC data
2016 // structures, the nursery, etc.
2017 //
2018 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2019 capabilities[n].disabled = rtsTrue;
2020 traceCapDisable(&capabilities[n]);
2021 }
2022 enabled_capabilities = new_n_capabilities;
2023 }
2024 else
2025 {
2026 // Increasing the number of enabled capabilities.
2027 //
2028 // enable any disabled capabilities, up to the required number
2029 for (n = enabled_capabilities;
2030 n < new_n_capabilities && n < n_capabilities; n++) {
2031 capabilities[n].disabled = rtsFalse;
2032 traceCapEnable(&capabilities[n]);
2033 }
2034 enabled_capabilities = n;
2035
2036 if (new_n_capabilities > n_capabilities) {
2037 #if defined(TRACING)
2038 // Allocate eventlog buffers for the new capabilities. Note this
2039 // must be done before calling moreCapabilities(), because that
2040 // will emit events about creating the new capabilities and adding
2041 // them to existing capsets.
2042 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2043 #endif
2044
2045 // Resize the capabilities array
2046 // NB. after this, capabilities points somewhere new. Any pointers
2047 // of type (Capability *) are now invalid.
2048 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2049
2050 // update our own cap pointer
2051 cap = &capabilities[cap->no];
2052
2053 // Resize and update storage manager data structures
2054 storageAddCapabilities(n_capabilities, new_n_capabilities);
2055
2056 // Update (Capability *) refs in the Task manager.
2057 updateCapabilityRefs();
2058
2059 // Update (Capability *) refs from TSOs
2060 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2061 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2062 t->cap = &capabilities[t->cap->no];
2063 }
2064 }
2065 }
2066 }
2067
2068 // We're done: release the original Capabilities
2069 releaseAllCapabilities(cap,task);
2070
2071 // Start worker tasks on the new Capabilities
2072 startWorkerTasks(n_capabilities, new_n_capabilities);
2073
2074 // finally, update n_capabilities
2075 if (new_n_capabilities > n_capabilities) {
2076 n_capabilities = enabled_capabilities = new_n_capabilities;
2077 }
2078
2079 // We can't free the old array until now, because we access it
2080 // while updating pointers in updateCapabilityRefs().
2081 if (old_capabilities) {
2082 stgFree(old_capabilities);
2083 }
2084
2085 rts_unlock(cap);
2086
2087 #endif // THREADED_RTS
2088 }
2089
2090
2091
2092 /* ---------------------------------------------------------------------------
2093 * Delete all the threads in the system
2094 * ------------------------------------------------------------------------- */
2095
2096 static void
2097 deleteAllThreads ( Capability *cap )
2098 {
2099 // NOTE: only safe to call if we own all capabilities.
2100
2101 StgTSO* t, *next;
2102 nat g;
2103
2104 debugTrace(DEBUG_sched,"deleting all threads");
2105 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2106 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2107 next = t->global_link;
2108 deleteThread(cap,t);
2109 }
2110 }
2111
2112 // The run queue now contains a bunch of ThreadKilled threads. We
2113 // must not throw these away: the main thread(s) will be in there
2114 // somewhere, and the main scheduler loop has to deal with it.
2115 // Also, the run queue is the only thing keeping these threads from
2116 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2117
2118 #if !defined(THREADED_RTS)
2119 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2120 ASSERT(sleeping_queue == END_TSO_QUEUE);
2121 #endif
2122 }
2123
2124 /* -----------------------------------------------------------------------------
2125 Managing the suspended_ccalls list.
2126 Locks required: sched_mutex
2127 -------------------------------------------------------------------------- */
2128
2129 STATIC_INLINE void
2130 suspendTask (Capability *cap, Task *task)
2131 {
2132 InCall *incall;
2133
2134 incall = task->incall;
2135 ASSERT(incall->next == NULL && incall->prev == NULL);
2136 incall->next = cap->suspended_ccalls;
2137 incall->prev = NULL;
2138 if (cap->suspended_ccalls) {
2139 cap->suspended_ccalls->prev = incall;
2140 }
2141 cap->suspended_ccalls = incall;
2142 }
2143
2144 STATIC_INLINE void
2145 recoverSuspendedTask (Capability *cap, Task *task)
2146 {
2147 InCall *incall;
2148
2149 incall = task->incall;
2150 if (incall->prev) {
2151 incall->prev->next = incall->next;
2152 } else {
2153 ASSERT(cap->suspended_ccalls == incall);
2154 cap->suspended_ccalls = incall->next;
2155 }
2156 if (incall->next) {
2157 incall->next->prev = incall->prev;
2158 }
2159 incall->next = incall->prev = NULL;
2160 }
2161
2162 /* ---------------------------------------------------------------------------
2163 * Suspending & resuming Haskell threads.
2164 *
2165 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2166 * its capability before calling the C function. This allows another
2167 * task to pick up the capability and carry on running Haskell
2168 * threads. It also means that if the C call blocks, it won't lock
2169 * the whole system.
2170 *
2171 * The Haskell thread making the C call is put to sleep for the
2172 * duration of the call, on the suspended_ccalling_threads queue. We
2173 * give out a token to the task, which it can use to resume the thread
2174 * on return from the C function.
2175 *
2176 * If this is an interruptible C call, this means that the FFI call may be
2177 * unceremoniously terminated and should be scheduled on an
2178 * unbound worker thread.
2179 * ------------------------------------------------------------------------- */
2180
2181 void *
2182 suspendThread (StgRegTable *reg, rtsBool interruptible)
2183 {
2184 Capability *cap;
2185 int saved_errno;
2186 StgTSO *tso;
2187 Task *task;
2188 #if mingw32_HOST_OS
2189 StgWord32 saved_winerror;
2190 #endif
2191
2192 saved_errno = errno;
2193 #if mingw32_HOST_OS
2194 saved_winerror = GetLastError();
2195 #endif
2196
2197 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2198 */
2199 cap = regTableToCapability(reg);
2200
2201 task = cap->running_task;
2202 tso = cap->r.rCurrentTSO;
2203
2204 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2205
2206 // XXX this might not be necessary --SDM
2207 tso->what_next = ThreadRunGHC;
2208
2209 threadPaused(cap,tso);
2210
2211 if (interruptible) {
2212 tso->why_blocked = BlockedOnCCall_Interruptible;
2213 } else {
2214 tso->why_blocked = BlockedOnCCall;
2215 }
2216
2217 // Hand back capability
2218 task->incall->suspended_tso = tso;
2219 task->incall->suspended_cap = cap;
2220
2221 ACQUIRE_LOCK(&cap->lock);
2222
2223 suspendTask(cap,task);
2224 cap->in_haskell = rtsFalse;
2225 releaseCapability_(cap,rtsFalse);
2226
2227 RELEASE_LOCK(&cap->lock);
2228
2229 errno = saved_errno;
2230 #if mingw32_HOST_OS
2231 SetLastError(saved_winerror);
2232 #endif
2233 return task;
2234 }
2235
2236 StgRegTable *
2237 resumeThread (void *task_)
2238 {
2239 StgTSO *tso;
2240 InCall *incall;
2241 Capability *cap;
2242 Task *task = task_;
2243 int saved_errno;
2244 #if mingw32_HOST_OS
2245 StgWord32 saved_winerror;
2246 #endif
2247
2248 saved_errno = errno;
2249 #if mingw32_HOST_OS
2250 saved_winerror = GetLastError();
2251 #endif
2252
2253 incall = task->incall;
2254 cap = incall->suspended_cap;
2255 task->cap = cap;
2256
2257 // Wait for permission to re-enter the RTS with the result.
2258 waitForReturnCapability(&cap,task);
2259 // we might be on a different capability now... but if so, our
2260 // entry on the suspended_ccalls list will also have been
2261 // migrated.
2262
2263 // Remove the thread from the suspended list
2264 recoverSuspendedTask(cap,task);
2265
2266 tso = incall->suspended_tso;
2267 incall->suspended_tso = NULL;
2268 incall->suspended_cap = NULL;
2269 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2270
2271 traceEventRunThread(cap, tso);
2272
2273 /* Reset blocking status */
2274 tso->why_blocked = NotBlocked;
2275
2276 if ((tso->flags & TSO_BLOCKEX) == 0) {
2277 // avoid locking the TSO if we don't have to
2278 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2279 maybePerformBlockedException(cap,tso);
2280 }
2281 }
2282
2283 cap->r.rCurrentTSO = tso;
2284 cap->in_haskell = rtsTrue;
2285 errno = saved_errno;
2286 #if mingw32_HOST_OS
2287 SetLastError(saved_winerror);
2288 #endif
2289
2290 /* We might have GC'd, mark the TSO dirty again */
2291 dirty_TSO(cap,tso);
2292 dirty_STACK(cap,tso->stackobj);
2293
2294 IF_DEBUG(sanity, checkTSO(tso));
2295
2296 return &cap->r;
2297 }
2298
2299 /* ---------------------------------------------------------------------------
2300 * scheduleThread()
2301 *
2302 * scheduleThread puts a thread on the end of the runnable queue.
2303 * This will usually be done immediately after a thread is created.
2304 * The caller of scheduleThread must create the thread using e.g.
2305 * createThread and push an appropriate closure
2306 * on this thread's stack before the scheduler is invoked.
2307 * ------------------------------------------------------------------------ */
2308
2309 void
2310 scheduleThread(Capability *cap, StgTSO *tso)
2311 {
2312 // The thread goes at the *end* of the run-queue, to avoid possible
2313 // starvation of any threads already on the queue.
2314 appendToRunQueue(cap,tso);
2315 }
2316
2317 void
2318 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2319 {
2320 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2321 // move this thread from now on.
2322 #if defined(THREADED_RTS)
2323 cpu %= enabled_capabilities;
2324 if (cpu == cap->no) {
2325 appendToRunQueue(cap,tso);
2326 } else {
2327 migrateThread(cap, tso, &capabilities[cpu]);
2328 }
2329 #else
2330 appendToRunQueue(cap,tso);
2331 #endif
2332 }
2333
2334 void
2335 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2336 {
2337 Task *task;
2338 DEBUG_ONLY( StgThreadID id );
2339 Capability *cap;
2340
2341 cap = *pcap;
2342
2343 // We already created/initialised the Task
2344 task = cap->running_task;
2345
2346 // This TSO is now a bound thread; make the Task and TSO
2347 // point to each other.
2348 tso->bound = task->incall;
2349 tso->cap = cap;
2350
2351 task->incall->tso = tso;
2352 task->incall->ret = ret;
2353 task->incall->stat = NoStatus;
2354
2355 appendToRunQueue(cap,tso);
2356
2357 DEBUG_ONLY( id = tso->id );
2358 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2359
2360 cap = schedule(cap,task);
2361
2362 ASSERT(task->incall->stat != NoStatus);
2363 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2364
2365 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2366 *pcap = cap;
2367 }
2368
2369 /* ----------------------------------------------------------------------------
2370 * Starting Tasks
2371 * ------------------------------------------------------------------------- */
2372
2373 #if defined(THREADED_RTS)
2374 void scheduleWorker (Capability *cap, Task *task)
2375 {
2376 // schedule() runs without a lock.
2377 cap = schedule(cap,task);
2378
2379 // On exit from schedule(), we have a Capability, but possibly not
2380 // the same one we started with.
2381
2382 // During shutdown, the requirement is that after all the
2383 // Capabilities are shut down, all workers that are shutting down
2384 // have finished workerTaskStop(). This is why we hold on to
2385 // cap->lock until we've finished workerTaskStop() below.
2386 //
2387 // There may be workers still involved in foreign calls; those
2388 // will just block in waitForReturnCapability() because the
2389 // Capability has been shut down.
2390 //
2391 ACQUIRE_LOCK(&cap->lock);
2392 releaseCapability_(cap,rtsFalse);
2393 workerTaskStop(task);
2394 RELEASE_LOCK(&cap->lock);
2395 }
2396 #endif
2397
2398 /* ---------------------------------------------------------------------------
2399 * Start new worker tasks on Capabilities from--to
2400 * -------------------------------------------------------------------------- */
2401
2402 static void
2403 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2404 {
2405 #if defined(THREADED_RTS)
2406 nat i;
2407 Capability *cap;
2408
2409 for (i = from; i < to; i++) {
2410 cap = &capabilities[i];
2411 ACQUIRE_LOCK(&cap->lock);
2412 startWorkerTask(cap);
2413 RELEASE_LOCK(&cap->lock);
2414 }
2415 #endif
2416 }
2417
2418 /* ---------------------------------------------------------------------------
2419 * initScheduler()
2420 *
2421 * Initialise the scheduler. This resets all the queues - if the
2422 * queues contained any threads, they'll be garbage collected at the
2423 * next pass.
2424 *
2425 * ------------------------------------------------------------------------ */
2426
2427 void
2428 initScheduler(void)
2429 {
2430 #if !defined(THREADED_RTS)
2431 blocked_queue_hd = END_TSO_QUEUE;
2432 blocked_queue_tl = END_TSO_QUEUE;
2433 sleeping_queue = END_TSO_QUEUE;
2434 #endif
2435
2436 sched_state = SCHED_RUNNING;
2437 recent_activity = ACTIVITY_YES;
2438
2439 #if defined(THREADED_RTS)
2440 /* Initialise the mutex and condition variables used by
2441 * the scheduler. */
2442 initMutex(&sched_mutex);
2443 #endif
2444
2445 ACQUIRE_LOCK(&sched_mutex);
2446
2447 /* A capability holds the state a native thread needs in
2448 * order to execute STG code. At least one capability is
2449 * floating around (only THREADED_RTS builds have more than one).
2450 */
2451 initCapabilities();
2452
2453 initTaskManager();
2454
2455 /*
2456 * Eagerly start one worker to run each Capability, except for
2457 * Capability 0. The idea is that we're probably going to start a
2458 * bound thread on Capability 0 pretty soon, so we don't want a
2459 * worker task hogging it.
2460 */
2461 startWorkerTasks(1, n_capabilities);
2462
2463 RELEASE_LOCK(&sched_mutex);
2464
2465 }
2466
2467 void
2468 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2469 /* see Capability.c, shutdownCapability() */
2470 {
2471 Task *task = NULL;
2472
2473 task = newBoundTask();
2474
2475 // If we haven't killed all the threads yet, do it now.
2476 if (sched_state < SCHED_SHUTTING_DOWN) {
2477 sched_state = SCHED_INTERRUPTING;
2478 Capability *cap = task->cap;
2479 waitForReturnCapability(&cap,task);
2480 scheduleDoGC(&cap,task,rtsTrue);
2481 ASSERT(task->incall->tso == NULL);
2482 releaseCapability(cap);
2483 }
2484 sched_state = SCHED_SHUTTING_DOWN;
2485
2486 shutdownCapabilities(task, wait_foreign);
2487
2488 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2489 // n_failed_trygrab_idles, n_idle_caps);
2490
2491 boundTaskExiting(task);
2492 }
2493
2494 void
2495 freeScheduler( void )
2496 {
2497 nat still_running;
2498
2499 ACQUIRE_LOCK(&sched_mutex);
2500 still_running = freeTaskManager();
2501 // We can only free the Capabilities if there are no Tasks still
2502 // running. We might have a Task about to return from a foreign
2503 // call into waitForReturnCapability(), for example (actually,
2504 // this should be the *only* thing that a still-running Task can
2505 // do at this point, and it will block waiting for the
2506 // Capability).
2507 if (still_running == 0) {
2508 freeCapabilities();
2509 if (n_capabilities != 1) {
2510 stgFree(capabilities);
2511 }
2512 }
2513 RELEASE_LOCK(&sched_mutex);
2514 #if defined(THREADED_RTS)
2515 closeMutex(&sched_mutex);
2516 #endif
2517 }
2518
2519 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2520 void *user USED_IF_NOT_THREADS)
2521 {
2522 #if !defined(THREADED_RTS)
2523 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2524 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2525 evac(user, (StgClosure **)(void *)&sleeping_queue);
2526 #endif
2527 }
2528
2529 /* -----------------------------------------------------------------------------
2530 performGC
2531
2532 This is the interface to the garbage collector from Haskell land.
2533 We provide this so that external C code can allocate and garbage
2534 collect when called from Haskell via _ccall_GC.
2535 -------------------------------------------------------------------------- */
2536
2537 static void
2538 performGC_(rtsBool force_major)
2539 {
2540 Task *task;
2541 Capability *cap = NULL;
2542
2543 // We must grab a new Task here, because the existing Task may be
2544 // associated with a particular Capability, and chained onto the
2545 // suspended_ccalls queue.
2546 task = newBoundTask();
2547
2548 // TODO: do we need to traceTask*() here?
2549
2550 waitForReturnCapability(&cap,task);
2551 scheduleDoGC(&cap,task,force_major);
2552 releaseCapability(cap);
2553 boundTaskExiting(task);
2554 }
2555
2556 void
2557 performGC(void)
2558 {
2559 performGC_(rtsFalse);
2560 }
2561
2562 void
2563 performMajorGC(void)
2564 {
2565 performGC_(rtsTrue);
2566 }
2567
2568 /* ---------------------------------------------------------------------------
2569 Interrupt execution
2570 - usually called inside a signal handler so it mustn't do anything fancy.
2571 ------------------------------------------------------------------------ */
2572
2573 void
2574 interruptStgRts(void)
2575 {
2576 sched_state = SCHED_INTERRUPTING;
2577 interruptAllCapabilities();
2578 #if defined(THREADED_RTS)
2579 wakeUpRts();
2580 #endif
2581 }
2582
2583 /* -----------------------------------------------------------------------------
2584 Wake up the RTS
2585
2586 This function causes at least one OS thread to wake up and run the
2587 scheduler loop. It is invoked when the RTS might be deadlocked, or
2588 an external event has arrived that may need servicing (eg. a
2589 keyboard interrupt).
2590
2591 In the single-threaded RTS we don't do anything here; we only have
2592 one thread anyway, and the event that caused us to want to wake up
2593 will have interrupted any blocking system call in progress anyway.
2594 -------------------------------------------------------------------------- */
2595
2596 #if defined(THREADED_RTS)
2597 void wakeUpRts(void)
2598 {
2599 // This forces the IO Manager thread to wakeup, which will
2600 // in turn ensure that some OS thread wakes up and runs the
2601 // scheduler loop, which will cause a GC and deadlock check.
2602 ioManagerWakeup();
2603 }
2604 #endif
2605
2606 /* -----------------------------------------------------------------------------
2607 Deleting threads
2608
2609 This is used for interruption (^C) and forking, and corresponds to
2610 raising an exception but without letting the thread catch the
2611 exception.
2612 -------------------------------------------------------------------------- */
2613
2614 static void
2615 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2616 {
2617 // NOTE: must only be called on a TSO that we have exclusive
2618 // access to, because we will call throwToSingleThreaded() below.
2619 // The TSO must be on the run queue of the Capability we own, or
2620 // we must own all Capabilities.
2621
2622 if (tso->why_blocked != BlockedOnCCall &&
2623 tso->why_blocked != BlockedOnCCall_Interruptible) {
2624 throwToSingleThreaded(tso->cap,tso,NULL);
2625 }
2626 }
2627
2628 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2629 static void
2630 deleteThread_(Capability *cap, StgTSO *tso)
2631 { // for forkProcess only:
2632 // like deleteThread(), but we delete threads in foreign calls, too.
2633
2634 if (tso->why_blocked == BlockedOnCCall ||
2635 tso->why_blocked == BlockedOnCCall_Interruptible) {
2636 tso->what_next = ThreadKilled;
2637 appendToRunQueue(tso->cap, tso);
2638 } else {
2639 deleteThread(cap,tso);
2640 }
2641 }
2642 #endif
2643
2644 /* -----------------------------------------------------------------------------
2645 raiseExceptionHelper
2646
2647 This function is called by the raise# primitve, just so that we can
2648 move some of the tricky bits of raising an exception from C-- into
2649 C. Who knows, it might be a useful re-useable thing here too.
2650 -------------------------------------------------------------------------- */
2651
2652 StgWord
2653 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2654 {
2655 Capability *cap = regTableToCapability(reg);
2656 StgThunk *raise_closure = NULL;
2657 StgPtr p, next;
2658 StgRetInfoTable *info;
2659 //
2660 // This closure represents the expression 'raise# E' where E
2661 // is the exception raise. It is used to overwrite all the
2662 // thunks which are currently under evaluataion.
2663 //
2664
2665 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2666 // LDV profiling: stg_raise_info has THUNK as its closure
2667 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2668 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2669 // 1 does not cause any problem unless profiling is performed.
2670 // However, when LDV profiling goes on, we need to linearly scan
2671 // small object pool, where raise_closure is stored, so we should
2672 // use MIN_UPD_SIZE.
2673 //
2674 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2675 // sizeofW(StgClosure)+1);
2676 //
2677
2678 //
2679 // Walk up the stack, looking for the catch frame. On the way,
2680 // we update any closures pointed to from update frames with the
2681 // raise closure that we just built.
2682 //
2683 p = tso->stackobj->sp;
2684 while(1) {
2685 info = get_ret_itbl((StgClosure *)p);
2686 next = p + stack_frame_sizeW((StgClosure *)p);
2687 switch (info->i.type) {
2688
2689 case UPDATE_FRAME:
2690 // Only create raise_closure if we need to.
2691 if (raise_closure == NULL) {
2692 raise_closure =
2693 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2694 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2695 raise_closure->payload[0] = exception;
2696 }
2697 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2698 (StgClosure *)raise_closure);
2699 p = next;
2700 continue;
2701
2702 case ATOMICALLY_FRAME:
2703 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2704 tso->stackobj->sp = p;
2705 return ATOMICALLY_FRAME;
2706
2707 case CATCH_FRAME:
2708 tso->stackobj->sp = p;
2709 return CATCH_FRAME;
2710
2711 case CATCH_STM_FRAME:
2712 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2713 tso->stackobj->sp = p;
2714 return CATCH_STM_FRAME;
2715
2716 case UNDERFLOW_FRAME:
2717 tso->stackobj->sp = p;
2718 threadStackUnderflow(cap,tso);
2719 p = tso->stackobj->sp;
2720 continue;
2721
2722 case STOP_FRAME:
2723 tso->stackobj->sp = p;
2724 return STOP_FRAME;
2725
2726 case CATCH_RETRY_FRAME:
2727 default:
2728 p = next;
2729 continue;
2730 }
2731 }
2732 }
2733
2734
2735 /* -----------------------------------------------------------------------------
2736 findRetryFrameHelper
2737
2738 This function is called by the retry# primitive. It traverses the stack
2739 leaving tso->sp referring to the frame which should handle the retry.
2740
2741 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2742 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2743
2744 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2745 create) because retries are not considered to be exceptions, despite the
2746 similar implementation.
2747
2748 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2749 not be created within memory transactions.
2750 -------------------------------------------------------------------------- */
2751
2752 StgWord
2753 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2754 {
2755 StgPtr p, next;
2756 StgRetInfoTable *info;
2757
2758 p = tso->stackobj->sp;
2759 while (1) {
2760 info = get_ret_itbl((StgClosure *)p);
2761 next = p + stack_frame_sizeW((StgClosure *)p);
2762 switch (info->i.type) {
2763
2764 case ATOMICALLY_FRAME:
2765 debugTrace(DEBUG_stm,
2766 "found ATOMICALLY_FRAME at %p during retry", p);
2767 tso->stackobj->sp = p;
2768 return ATOMICALLY_FRAME;
2769
2770 case CATCH_RETRY_FRAME:
2771 debugTrace(DEBUG_stm,
2772 "found CATCH_RETRY_FRAME at %p during retrry", p);
2773 tso->stackobj->sp = p;
2774 return CATCH_RETRY_FRAME;
2775
2776 case CATCH_STM_FRAME: {
2777 StgTRecHeader *trec = tso -> trec;
2778 StgTRecHeader *outer = trec -> enclosing_trec;
2779 debugTrace(DEBUG_stm,
2780 "found CATCH_STM_FRAME at %p during retry", p);
2781 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2782 stmAbortTransaction(cap, trec);
2783 stmFreeAbortedTRec(cap, trec);
2784 tso -> trec = outer;
2785 p = next;
2786 continue;
2787 }
2788
2789 case UNDERFLOW_FRAME:
2790 threadStackUnderflow(cap,tso);
2791 p = tso->stackobj->sp;
2792 continue;
2793
2794 default:
2795 ASSERT(info->i.type != CATCH_FRAME);
2796 ASSERT(info->i.type != STOP_FRAME);
2797 p = next;
2798 continue;
2799 }
2800 }
2801 }
2802
2803 /* -----------------------------------------------------------------------------
2804 resurrectThreads is called after garbage collection on the list of
2805 threads found to be garbage. Each of these threads will be woken
2806 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2807 on an MVar, or NonTermination if the thread was blocked on a Black
2808 Hole.
2809
2810 Locks: assumes we hold *all* the capabilities.
2811 -------------------------------------------------------------------------- */
2812
2813 void
2814 resurrectThreads (StgTSO *threads)
2815 {
2816 StgTSO *tso, *next;
2817 Capability *cap;
2818 generation *gen;
2819
2820 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2821 next = tso->global_link;
2822
2823 gen = Bdescr((P_)tso)->gen;
2824 tso->global_link = gen->threads;
2825 gen->threads = tso;
2826
2827 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2828
2829 // Wake up the thread on the Capability it was last on
2830 cap = tso->cap;
2831
2832 switch (tso->why_blocked) {
2833 case BlockedOnMVar:
2834 /* Called by GC - sched_mutex lock is currently held. */
2835 throwToSingleThreaded(cap, tso,
2836 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2837 break;
2838 case BlockedOnBlackHole:
2839 throwToSingleThreaded(cap, tso,
2840 (StgClosure *)nonTermination_closure);
2841 break;
2842 case BlockedOnSTM:
2843 throwToSingleThreaded(cap, tso,
2844 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2845 break;
2846 case NotBlocked:
2847 /* This might happen if the thread was blocked on a black hole
2848 * belonging to a thread that we've just woken up (raiseAsync
2849 * can wake up threads, remember...).
2850 */
2851 continue;
2852 case BlockedOnMsgThrowTo:
2853 // This can happen if the target is masking, blocks on a
2854 // black hole, and then is found to be unreachable. In
2855 // this case, we want to let the target wake up and carry
2856 // on, and do nothing to this thread.
2857 continue;
2858 default:
2859 barf("resurrectThreads: thread blocked in a strange way: %d",
2860 tso->why_blocked);
2861 }
2862 }
2863 }