Strings and comments only: 'to to ' fixes
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsFalse);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. t might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // And save the current errno in this thread.
485 // XXX: possibly bogus for SMP because this thread might already
486 // be running again, see code below.
487 t->saved_errno = errno;
488 #if mingw32_HOST_OS
489 // Similarly for Windows error code
490 t->saved_winerror = GetLastError();
491 #endif
492
493 if (ret == ThreadBlocked) {
494 if (t->why_blocked == BlockedOnBlackHole) {
495 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
496 traceEventStopThread(cap, t, t->why_blocked + 6,
497 owner != NULL ? owner->id : 0);
498 } else {
499 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
500 }
501 } else {
502 traceEventStopThread(cap, t, ret, 0);
503 }
504
505 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
506 ASSERT(t->cap == cap);
507
508 // ----------------------------------------------------------------------
509
510 // Costs for the scheduler are assigned to CCS_SYSTEM
511 stopHeapProfTimer();
512 #if defined(PROFILING)
513 cap->r.rCCCS = CCS_SYSTEM;
514 #endif
515
516 schedulePostRunThread(cap,t);
517
518 ready_to_gc = rtsFalse;
519
520 switch (ret) {
521 case HeapOverflow:
522 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
523 break;
524
525 case StackOverflow:
526 // just adjust the stack for this thread, then pop it back
527 // on the run queue.
528 threadStackOverflow(cap, t);
529 pushOnRunQueue(cap,t);
530 break;
531
532 case ThreadYielding:
533 if (scheduleHandleYield(cap, t, prev_what_next)) {
534 // shortcut for switching between compiler/interpreter:
535 goto run_thread;
536 }
537 break;
538
539 case ThreadBlocked:
540 scheduleHandleThreadBlocked(t);
541 break;
542
543 case ThreadFinished:
544 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
545 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
546 break;
547
548 default:
549 barf("schedule: invalid thread return code %d", (int)ret);
550 }
551
552 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
553 scheduleDoGC(&cap,task,rtsFalse);
554 }
555 } /* end of while() */
556 }
557
558 /* -----------------------------------------------------------------------------
559 * Run queue operations
560 * -------------------------------------------------------------------------- */
561
562 void
563 removeFromRunQueue (Capability *cap, StgTSO *tso)
564 {
565 if (tso->block_info.prev == END_TSO_QUEUE) {
566 ASSERT(cap->run_queue_hd == tso);
567 cap->run_queue_hd = tso->_link;
568 } else {
569 setTSOLink(cap, tso->block_info.prev, tso->_link);
570 }
571 if (tso->_link == END_TSO_QUEUE) {
572 ASSERT(cap->run_queue_tl == tso);
573 cap->run_queue_tl = tso->block_info.prev;
574 } else {
575 setTSOPrev(cap, tso->_link, tso->block_info.prev);
576 }
577 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->returning_tasks_hd != NULL ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 nat i, n_free_caps;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Check whether we have more threads on our run queue, or sparks
709 // in our pool, that we could hand to another Capability.
710 if (emptyRunQueue(cap)) {
711 if (sparkPoolSizeCap(cap) < 2) return;
712 } else {
713 if (singletonRunQueue(cap) &&
714 sparkPoolSizeCap(cap) < 1) return;
715 }
716
717 // First grab as many free Capabilities as we can.
718 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
719 cap0 = &capabilities[i];
720 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
721 if (!emptyRunQueue(cap0)
722 || cap0->returning_tasks_hd != NULL
723 || cap0->inbox != (Message*)END_TSO_QUEUE) {
724 // it already has some work, we just grabbed it at
725 // the wrong moment. Or maybe it's deadlocked!
726 releaseCapability(cap0);
727 } else {
728 free_caps[n_free_caps++] = cap0;
729 }
730 }
731 }
732
733 // we now have n_free_caps free capabilities stashed in
734 // free_caps[]. Share our run queue equally with them. This is
735 // probably the simplest thing we could do; improvements we might
736 // want to do include:
737 //
738 // - giving high priority to moving relatively new threads, on
739 // the gournds that they haven't had time to build up a
740 // working set in the cache on this CPU/Capability.
741 //
742 // - giving low priority to moving long-lived threads
743
744 if (n_free_caps > 0) {
745 StgTSO *prev, *t, *next;
746 #ifdef SPARK_PUSHING
747 rtsBool pushed_to_all;
748 #endif
749
750 debugTrace(DEBUG_sched,
751 "cap %d: %s and %d free capabilities, sharing...",
752 cap->no,
753 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
754 "excess threads on run queue":"sparks to share (>=2)",
755 n_free_caps);
756
757 i = 0;
758 #ifdef SPARK_PUSHING
759 pushed_to_all = rtsFalse;
760 #endif
761
762 if (cap->run_queue_hd != END_TSO_QUEUE) {
763 prev = cap->run_queue_hd;
764 t = prev->_link;
765 prev->_link = END_TSO_QUEUE;
766 for (; t != END_TSO_QUEUE; t = next) {
767 next = t->_link;
768 t->_link = END_TSO_QUEUE;
769 if (t->bound == task->incall // don't move my bound thread
770 || tsoLocked(t)) { // don't move a locked thread
771 setTSOLink(cap, prev, t);
772 setTSOPrev(cap, t, prev);
773 prev = t;
774 } else if (i == n_free_caps) {
775 #ifdef SPARK_PUSHING
776 pushed_to_all = rtsTrue;
777 #endif
778 i = 0;
779 // keep one for us
780 setTSOLink(cap, prev, t);
781 setTSOPrev(cap, t, prev);
782 prev = t;
783 } else {
784 appendToRunQueue(free_caps[i],t);
785
786 traceEventMigrateThread (cap, t, free_caps[i]->no);
787
788 if (t->bound) { t->bound->task->cap = free_caps[i]; }
789 t->cap = free_caps[i];
790 i++;
791 }
792 }
793 cap->run_queue_tl = prev;
794
795 IF_DEBUG(sanity, checkRunQueue(cap));
796 }
797
798 #ifdef SPARK_PUSHING
799 /* JB I left this code in place, it would work but is not necessary */
800
801 // If there are some free capabilities that we didn't push any
802 // threads to, then try to push a spark to each one.
803 if (!pushed_to_all) {
804 StgClosure *spark;
805 // i is the next free capability to push to
806 for (; i < n_free_caps; i++) {
807 if (emptySparkPoolCap(free_caps[i])) {
808 spark = tryStealSpark(cap->sparks);
809 if (spark != NULL) {
810 /* TODO: if anyone wants to re-enable this code then
811 * they must consider the fizzledSpark(spark) case
812 * and update the per-cap spark statistics.
813 */
814 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
815
816 traceEventStealSpark(free_caps[i], t, cap->no);
817
818 newSpark(&(free_caps[i]->r), spark);
819 }
820 }
821 }
822 }
823 #endif /* SPARK_PUSHING */
824
825 // release the capabilities
826 for (i = 0; i < n_free_caps; i++) {
827 task->cap = free_caps[i];
828 releaseAndWakeupCapability(free_caps[i]);
829 }
830 }
831 task->cap = cap; // reset to point to our Capability.
832
833 #endif /* THREADED_RTS */
834
835 }
836
837 /* ----------------------------------------------------------------------------
838 * Start any pending signal handlers
839 * ------------------------------------------------------------------------- */
840
841 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
842 static void
843 scheduleStartSignalHandlers(Capability *cap)
844 {
845 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
846 // safe outside the lock
847 startSignalHandlers(cap);
848 }
849 }
850 #else
851 static void
852 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
853 {
854 }
855 #endif
856
857 /* ----------------------------------------------------------------------------
858 * Check for blocked threads that can be woken up.
859 * ------------------------------------------------------------------------- */
860
861 static void
862 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
863 {
864 #if !defined(THREADED_RTS)
865 //
866 // Check whether any waiting threads need to be woken up. If the
867 // run queue is empty, and there are no other tasks running, we
868 // can wait indefinitely for something to happen.
869 //
870 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
871 {
872 awaitEvent (emptyRunQueue(cap));
873 }
874 #endif
875 }
876
877 /* ----------------------------------------------------------------------------
878 * Detect deadlock conditions and attempt to resolve them.
879 * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock (Capability **pcap, Task *task)
883 {
884 Capability *cap = *pcap;
885 /*
886 * Detect deadlock: when we have no threads to run, there are no
887 * threads blocked, waiting for I/O, or sleeping, and all the
888 * other tasks are waiting for work, we must have a deadlock of
889 * some description.
890 */
891 if ( emptyThreadQueues(cap) )
892 {
893 #if defined(THREADED_RTS)
894 /*
895 * In the threaded RTS, we only check for deadlock if there
896 * has been no activity in a complete timeslice. This means
897 * we won't eagerly start a full GC just because we don't have
898 * any threads to run currently.
899 */
900 if (recent_activity != ACTIVITY_INACTIVE) return;
901 #endif
902
903 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
904
905 // Garbage collection can release some new threads due to
906 // either (a) finalizers or (b) threads resurrected because
907 // they are unreachable and will therefore be sent an
908 // exception. Any threads thus released will be immediately
909 // runnable.
910 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
911 cap = *pcap;
912 // when force_major == rtsTrue. scheduleDoGC sets
913 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
914 // signal.
915
916 if ( !emptyRunQueue(cap) ) return;
917
918 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
919 /* If we have user-installed signal handlers, then wait
920 * for signals to arrive rather then bombing out with a
921 * deadlock.
922 */
923 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
924 debugTrace(DEBUG_sched,
925 "still deadlocked, waiting for signals...");
926
927 awaitUserSignals();
928
929 if (signals_pending()) {
930 startSignalHandlers(cap);
931 }
932
933 // either we have threads to run, or we were interrupted:
934 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
935
936 return;
937 }
938 #endif
939
940 #if !defined(THREADED_RTS)
941 /* Probably a real deadlock. Send the current main thread the
942 * Deadlock exception.
943 */
944 if (task->incall->tso) {
945 switch (task->incall->tso->why_blocked) {
946 case BlockedOnSTM:
947 case BlockedOnBlackHole:
948 case BlockedOnMsgThrowTo:
949 case BlockedOnMVar:
950 case BlockedOnMVarRead:
951 throwToSingleThreaded(cap, task->incall->tso,
952 (StgClosure *)nonTermination_closure);
953 return;
954 default:
955 barf("deadlock: main thread blocked in a strange way");
956 }
957 }
958 return;
959 #endif
960 }
961 }
962
963
964 /* ----------------------------------------------------------------------------
965 * Send pending messages (PARALLEL_HASKELL only)
966 * ------------------------------------------------------------------------- */
967
968 #if defined(PARALLEL_HASKELL)
969 static void
970 scheduleSendPendingMessages(void)
971 {
972
973 # if defined(PAR) // global Mem.Mgmt., omit for now
974 if (PendingFetches != END_BF_QUEUE) {
975 processFetches();
976 }
977 # endif
978
979 if (RtsFlags.ParFlags.BufferTime) {
980 // if we use message buffering, we must send away all message
981 // packets which have become too old...
982 sendOldBuffers();
983 }
984 }
985 #endif
986
987 /* ----------------------------------------------------------------------------
988 * Process message in the current Capability's inbox
989 * ------------------------------------------------------------------------- */
990
991 static void
992 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
993 {
994 #if defined(THREADED_RTS)
995 Message *m, *next;
996 int r;
997 Capability *cap = *pcap;
998
999 while (!emptyInbox(cap)) {
1000 if (cap->r.rCurrentNursery->link == NULL ||
1001 g0->n_new_large_words >= large_alloc_lim) {
1002 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1003 cap = *pcap;
1004 }
1005
1006 // don't use a blocking acquire; if the lock is held by
1007 // another thread then just carry on. This seems to avoid
1008 // getting stuck in a message ping-pong situation with other
1009 // processors. We'll check the inbox again later anyway.
1010 //
1011 // We should really use a more efficient queue data structure
1012 // here. The trickiness is that we must ensure a Capability
1013 // never goes idle if the inbox is non-empty, which is why we
1014 // use cap->lock (cap->lock is released as the last thing
1015 // before going idle; see Capability.c:releaseCapability()).
1016 r = TRY_ACQUIRE_LOCK(&cap->lock);
1017 if (r != 0) return;
1018
1019 m = cap->inbox;
1020 cap->inbox = (Message*)END_TSO_QUEUE;
1021
1022 RELEASE_LOCK(&cap->lock);
1023
1024 while (m != (Message*)END_TSO_QUEUE) {
1025 next = m->link;
1026 executeMessage(cap, m);
1027 m = next;
1028 }
1029 }
1030 #endif
1031 }
1032
1033 /* ----------------------------------------------------------------------------
1034 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1035 * ------------------------------------------------------------------------- */
1036
1037 #if defined(THREADED_RTS)
1038 static void
1039 scheduleActivateSpark(Capability *cap)
1040 {
1041 if (anySparks() && !cap->disabled)
1042 {
1043 createSparkThread(cap);
1044 debugTrace(DEBUG_sched, "creating a spark thread");
1045 }
1046 }
1047 #endif // PARALLEL_HASKELL || THREADED_RTS
1048
1049 /* ----------------------------------------------------------------------------
1050 * After running a thread...
1051 * ------------------------------------------------------------------------- */
1052
1053 static void
1054 schedulePostRunThread (Capability *cap, StgTSO *t)
1055 {
1056 // We have to be able to catch transactions that are in an
1057 // infinite loop as a result of seeing an inconsistent view of
1058 // memory, e.g.
1059 //
1060 // atomically $ do
1061 // [a,b] <- mapM readTVar [ta,tb]
1062 // when (a == b) loop
1063 //
1064 // and a is never equal to b given a consistent view of memory.
1065 //
1066 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1067 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1068 debugTrace(DEBUG_sched | DEBUG_stm,
1069 "trec %p found wasting its time", t);
1070
1071 // strip the stack back to the
1072 // ATOMICALLY_FRAME, aborting the (nested)
1073 // transaction, and saving the stack of any
1074 // partially-evaluated thunks on the heap.
1075 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1076
1077 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1078 }
1079 }
1080
1081 /* some statistics gathering in the parallel case */
1082 }
1083
1084 /* -----------------------------------------------------------------------------
1085 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1086 * -------------------------------------------------------------------------- */
1087
1088 static rtsBool
1089 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1090 {
1091 // did the task ask for a large block?
1092 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1093 // if so, get one and push it on the front of the nursery.
1094 bdescr *bd;
1095 W_ blocks;
1096
1097 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1098
1099 if (blocks > BLOCKS_PER_MBLOCK) {
1100 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1101 }
1102
1103 debugTrace(DEBUG_sched,
1104 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1105 (long)t->id, what_next_strs[t->what_next], blocks);
1106
1107 // don't do this if the nursery is (nearly) full, we'll GC first.
1108 if (cap->r.rCurrentNursery->link != NULL ||
1109 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1110 // if the nursery has only one block.
1111
1112 bd = allocGroup_lock(blocks);
1113 cap->r.rNursery->n_blocks += blocks;
1114
1115 // link the new group into the list
1116 bd->link = cap->r.rCurrentNursery;
1117 bd->u.back = cap->r.rCurrentNursery->u.back;
1118 if (cap->r.rCurrentNursery->u.back != NULL) {
1119 cap->r.rCurrentNursery->u.back->link = bd;
1120 } else {
1121 cap->r.rNursery->blocks = bd;
1122 }
1123 cap->r.rCurrentNursery->u.back = bd;
1124
1125 // initialise it as a nursery block. We initialise the
1126 // step, gen_no, and flags field of *every* sub-block in
1127 // this large block, because this is easier than making
1128 // sure that we always find the block head of a large
1129 // block whenever we call Bdescr() (eg. evacuate() and
1130 // isAlive() in the GC would both have to do this, at
1131 // least).
1132 {
1133 bdescr *x;
1134 for (x = bd; x < bd + blocks; x++) {
1135 initBdescr(x,g0,g0);
1136 x->free = x->start;
1137 x->flags = 0;
1138 }
1139 }
1140
1141 // This assert can be a killer if the app is doing lots
1142 // of large block allocations.
1143 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1144
1145 // now update the nursery to point to the new block
1146 cap->r.rCurrentNursery = bd;
1147
1148 // we might be unlucky and have another thread get on the
1149 // run queue before us and steal the large block, but in that
1150 // case the thread will just end up requesting another large
1151 // block.
1152 pushOnRunQueue(cap,t);
1153 return rtsFalse; /* not actually GC'ing */
1154 }
1155 }
1156
1157 if (cap->r.rHpLim == NULL || cap->context_switch) {
1158 // Sometimes we miss a context switch, e.g. when calling
1159 // primitives in a tight loop, MAYBE_GC() doesn't check the
1160 // context switch flag, and we end up waiting for a GC.
1161 // See #1984, and concurrent/should_run/1984
1162 cap->context_switch = 0;
1163 appendToRunQueue(cap,t);
1164 } else {
1165 pushOnRunQueue(cap,t);
1166 }
1167 return rtsTrue;
1168 /* actual GC is done at the end of the while loop in schedule() */
1169 }
1170
1171 /* -----------------------------------------------------------------------------
1172 * Handle a thread that returned to the scheduler with ThreadYielding
1173 * -------------------------------------------------------------------------- */
1174
1175 static rtsBool
1176 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1177 {
1178 /* put the thread back on the run queue. Then, if we're ready to
1179 * GC, check whether this is the last task to stop. If so, wake
1180 * up the GC thread. getThread will block during a GC until the
1181 * GC is finished.
1182 */
1183
1184 ASSERT(t->_link == END_TSO_QUEUE);
1185
1186 // Shortcut if we're just switching evaluators: don't bother
1187 // doing stack squeezing (which can be expensive), just run the
1188 // thread.
1189 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1190 debugTrace(DEBUG_sched,
1191 "--<< thread %ld (%s) stopped to switch evaluators",
1192 (long)t->id, what_next_strs[t->what_next]);
1193 return rtsTrue;
1194 }
1195
1196 // Reset the context switch flag. We don't do this just before
1197 // running the thread, because that would mean we would lose ticks
1198 // during GC, which can lead to unfair scheduling (a thread hogs
1199 // the CPU because the tick always arrives during GC). This way
1200 // penalises threads that do a lot of allocation, but that seems
1201 // better than the alternative.
1202 if (cap->context_switch != 0) {
1203 cap->context_switch = 0;
1204 appendToRunQueue(cap,t);
1205 } else {
1206 pushOnRunQueue(cap,t);
1207 }
1208
1209 IF_DEBUG(sanity,
1210 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1211 checkTSO(t));
1212
1213 return rtsFalse;
1214 }
1215
1216 /* -----------------------------------------------------------------------------
1217 * Handle a thread that returned to the scheduler with ThreadBlocked
1218 * -------------------------------------------------------------------------- */
1219
1220 static void
1221 scheduleHandleThreadBlocked( StgTSO *t
1222 #if !defined(DEBUG)
1223 STG_UNUSED
1224 #endif
1225 )
1226 {
1227
1228 // We don't need to do anything. The thread is blocked, and it
1229 // has tidied up its stack and placed itself on whatever queue
1230 // it needs to be on.
1231
1232 // ASSERT(t->why_blocked != NotBlocked);
1233 // Not true: for example,
1234 // - the thread may have woken itself up already, because
1235 // threadPaused() might have raised a blocked throwTo
1236 // exception, see maybePerformBlockedException().
1237
1238 #ifdef DEBUG
1239 traceThreadStatus(DEBUG_sched, t);
1240 #endif
1241 }
1242
1243 /* -----------------------------------------------------------------------------
1244 * Handle a thread that returned to the scheduler with ThreadFinished
1245 * -------------------------------------------------------------------------- */
1246
1247 static rtsBool
1248 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1249 {
1250 /* Need to check whether this was a main thread, and if so,
1251 * return with the return value.
1252 *
1253 * We also end up here if the thread kills itself with an
1254 * uncaught exception, see Exception.cmm.
1255 */
1256
1257 // blocked exceptions can now complete, even if the thread was in
1258 // blocked mode (see #2910).
1259 awakenBlockedExceptionQueue (cap, t);
1260
1261 //
1262 // Check whether the thread that just completed was a bound
1263 // thread, and if so return with the result.
1264 //
1265 // There is an assumption here that all thread completion goes
1266 // through this point; we need to make sure that if a thread
1267 // ends up in the ThreadKilled state, that it stays on the run
1268 // queue so it can be dealt with here.
1269 //
1270
1271 if (t->bound) {
1272
1273 if (t->bound != task->incall) {
1274 #if !defined(THREADED_RTS)
1275 // Must be a bound thread that is not the topmost one. Leave
1276 // it on the run queue until the stack has unwound to the
1277 // point where we can deal with this. Leaving it on the run
1278 // queue also ensures that the garbage collector knows about
1279 // this thread and its return value (it gets dropped from the
1280 // step->threads list so there's no other way to find it).
1281 appendToRunQueue(cap,t);
1282 return rtsFalse;
1283 #else
1284 // this cannot happen in the threaded RTS, because a
1285 // bound thread can only be run by the appropriate Task.
1286 barf("finished bound thread that isn't mine");
1287 #endif
1288 }
1289
1290 ASSERT(task->incall->tso == t);
1291
1292 if (t->what_next == ThreadComplete) {
1293 if (task->incall->ret) {
1294 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1295 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1296 }
1297 task->incall->stat = Success;
1298 } else {
1299 if (task->incall->ret) {
1300 *(task->incall->ret) = NULL;
1301 }
1302 if (sched_state >= SCHED_INTERRUPTING) {
1303 if (heap_overflow) {
1304 task->incall->stat = HeapExhausted;
1305 } else {
1306 task->incall->stat = Interrupted;
1307 }
1308 } else {
1309 task->incall->stat = Killed;
1310 }
1311 }
1312 #ifdef DEBUG
1313 removeThreadLabel((StgWord)task->incall->tso->id);
1314 #endif
1315
1316 // We no longer consider this thread and task to be bound to
1317 // each other. The TSO lives on until it is GC'd, but the
1318 // task is about to be released by the caller, and we don't
1319 // want anyone following the pointer from the TSO to the
1320 // defunct task (which might have already been
1321 // re-used). This was a real bug: the GC updated
1322 // tso->bound->tso which lead to a deadlock.
1323 t->bound = NULL;
1324 task->incall->tso = NULL;
1325
1326 return rtsTrue; // tells schedule() to return
1327 }
1328
1329 return rtsFalse;
1330 }
1331
1332 /* -----------------------------------------------------------------------------
1333 * Perform a heap census
1334 * -------------------------------------------------------------------------- */
1335
1336 static rtsBool
1337 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1338 {
1339 // When we have +RTS -i0 and we're heap profiling, do a census at
1340 // every GC. This lets us get repeatable runs for debugging.
1341 if (performHeapProfile ||
1342 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1343 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1344 return rtsTrue;
1345 } else {
1346 return rtsFalse;
1347 }
1348 }
1349
1350 /* -----------------------------------------------------------------------------
1351 * Start a synchronisation of all capabilities
1352 * -------------------------------------------------------------------------- */
1353
1354 // Returns:
1355 // 0 if we successfully got a sync
1356 // non-0 if there was another sync request in progress,
1357 // and we yielded to it. The value returned is the
1358 // type of the other sync request.
1359 //
1360 #if defined(THREADED_RTS)
1361 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1362 {
1363 nat prev_pending_sync;
1364
1365 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1366
1367 if (prev_pending_sync)
1368 {
1369 do {
1370 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1371 prev_pending_sync);
1372 ASSERT(*pcap);
1373 yieldCapability(pcap,task,rtsTrue);
1374 } while (pending_sync);
1375 return prev_pending_sync; // NOTE: task->cap might have changed now
1376 }
1377 else
1378 {
1379 return 0;
1380 }
1381 }
1382
1383 //
1384 // Grab all the capabilities except the one we already hold. Used
1385 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1386 // before a fork (SYNC_OTHER).
1387 //
1388 // Only call this after requestSync(), otherwise a deadlock might
1389 // ensue if another thread is trying to synchronise.
1390 //
1391 static void acquireAllCapabilities(Capability *cap, Task *task)
1392 {
1393 Capability *tmpcap;
1394 nat i;
1395
1396 for (i=0; i < n_capabilities; i++) {
1397 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1398 tmpcap = &capabilities[i];
1399 if (tmpcap != cap) {
1400 // we better hope this task doesn't get migrated to
1401 // another Capability while we're waiting for this one.
1402 // It won't, because load balancing happens while we have
1403 // all the Capabilities, but even so it's a slightly
1404 // unsavoury invariant.
1405 task->cap = tmpcap;
1406 waitForReturnCapability(&tmpcap, task);
1407 if (tmpcap->no != i) {
1408 barf("acquireAllCapabilities: got the wrong capability");
1409 }
1410 }
1411 }
1412 task->cap = cap;
1413 }
1414
1415 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1416 {
1417 nat i;
1418
1419 for (i = 0; i < n; i++) {
1420 if (cap->no != i) {
1421 task->cap = &capabilities[i];
1422 releaseCapability(&capabilities[i]);
1423 }
1424 }
1425 task->cap = cap;
1426 }
1427 #endif
1428
1429 /* -----------------------------------------------------------------------------
1430 * Perform a garbage collection if necessary
1431 * -------------------------------------------------------------------------- */
1432
1433 static void
1434 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1435 rtsBool force_major)
1436 {
1437 Capability *cap = *pcap;
1438 rtsBool heap_census;
1439 nat collect_gen;
1440 #ifdef THREADED_RTS
1441 StgWord8 gc_type;
1442 nat i, sync;
1443 StgTSO *tso;
1444 #endif
1445
1446 if (sched_state == SCHED_SHUTTING_DOWN) {
1447 // The final GC has already been done, and the system is
1448 // shutting down. We'll probably deadlock if we try to GC
1449 // now.
1450 return;
1451 }
1452
1453 heap_census = scheduleNeedHeapProfile(rtsTrue);
1454
1455 // Figure out which generation we are collecting, so that we can
1456 // decide whether this is a parallel GC or not.
1457 collect_gen = calcNeeded(force_major || heap_census, NULL);
1458
1459 #ifdef THREADED_RTS
1460 if (sched_state < SCHED_INTERRUPTING
1461 && RtsFlags.ParFlags.parGcEnabled
1462 && collect_gen >= RtsFlags.ParFlags.parGcGen
1463 && ! oldest_gen->mark)
1464 {
1465 gc_type = SYNC_GC_PAR;
1466 } else {
1467 gc_type = SYNC_GC_SEQ;
1468 }
1469
1470 // In order to GC, there must be no threads running Haskell code.
1471 // Therefore, the GC thread needs to hold *all* the capabilities,
1472 // and release them after the GC has completed.
1473 //
1474 // This seems to be the simplest way: previous attempts involved
1475 // making all the threads with capabilities give up their
1476 // capabilities and sleep except for the *last* one, which
1477 // actually did the GC. But it's quite hard to arrange for all
1478 // the other tasks to sleep and stay asleep.
1479 //
1480
1481 /* Other capabilities are prevented from running yet more Haskell
1482 threads if pending_sync is set. Tested inside
1483 yieldCapability() and releaseCapability() in Capability.c */
1484
1485 do {
1486 sync = requestSync(pcap, task, gc_type);
1487 cap = *pcap;
1488 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1489 // someone else had a pending sync request for a GC, so
1490 // let's assume GC has been done and we don't need to GC
1491 // again.
1492 return;
1493 }
1494 if (sched_state == SCHED_SHUTTING_DOWN) {
1495 // The scheduler might now be shutting down. We tested
1496 // this above, but it might have become true since then as
1497 // we yielded the capability in requestSync().
1498 return;
1499 }
1500 } while (sync);
1501
1502 // don't declare this until after we have sync'd, because
1503 // n_capabilities may change.
1504 rtsBool idle_cap[n_capabilities];
1505 #ifdef DEBUG
1506 unsigned int old_n_capabilities = n_capabilities;
1507 #endif
1508
1509 interruptAllCapabilities();
1510
1511 // The final shutdown GC is always single-threaded, because it's
1512 // possible that some of the Capabilities have no worker threads.
1513
1514 if (gc_type == SYNC_GC_SEQ)
1515 {
1516 traceEventRequestSeqGc(cap);
1517 }
1518 else
1519 {
1520 traceEventRequestParGc(cap);
1521 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1522 }
1523
1524 if (gc_type == SYNC_GC_SEQ)
1525 {
1526 // single-threaded GC: grab all the capabilities
1527 acquireAllCapabilities(cap,task);
1528 }
1529 else
1530 {
1531 // If we are load-balancing collections in this
1532 // generation, then we require all GC threads to participate
1533 // in the collection. Otherwise, we only require active
1534 // threads to participate, and we set gc_threads[i]->idle for
1535 // any idle capabilities. The rationale here is that waking
1536 // up an idle Capability takes much longer than just doing any
1537 // GC work on its behalf.
1538
1539 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1540 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1541 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1542 for (i=0; i < n_capabilities; i++) {
1543 if (capabilities[i].disabled) {
1544 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1545 } else {
1546 idle_cap[i] = rtsFalse;
1547 }
1548 }
1549 } else {
1550 for (i=0; i < n_capabilities; i++) {
1551 if (capabilities[i].disabled) {
1552 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1553 } else if (i == cap->no ||
1554 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1555 idle_cap[i] = rtsFalse;
1556 } else {
1557 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1558 if (!idle_cap[i]) {
1559 n_failed_trygrab_idles++;
1560 } else {
1561 n_idle_caps++;
1562 }
1563 }
1564 }
1565 }
1566
1567 // We set the gc_thread[i]->idle flag if that
1568 // capability/thread is not participating in this collection.
1569 // We also keep a local record of which capabilities are idle
1570 // in idle_cap[], because scheduleDoGC() is re-entrant:
1571 // another thread might start a GC as soon as we've finished
1572 // this one, and thus the gc_thread[]->idle flags are invalid
1573 // as soon as we release any threads after GC. Getting this
1574 // wrong leads to a rare and hard to debug deadlock!
1575
1576 for (i=0; i < n_capabilities; i++) {
1577 gc_threads[i]->idle = idle_cap[i];
1578 capabilities[i].idle++;
1579 }
1580
1581 // For all capabilities participating in this GC, wait until
1582 // they have stopped mutating and are standing by for GC.
1583 waitForGcThreads(cap);
1584
1585 #if defined(THREADED_RTS)
1586 // Stable point where we can do a global check on our spark counters
1587 ASSERT(checkSparkCountInvariant());
1588 #endif
1589 }
1590
1591 #endif
1592
1593 IF_DEBUG(scheduler, printAllThreads());
1594
1595 delete_threads_and_gc:
1596 /*
1597 * We now have all the capabilities; if we're in an interrupting
1598 * state, then we should take the opportunity to delete all the
1599 * threads in the system.
1600 */
1601 if (sched_state == SCHED_INTERRUPTING) {
1602 deleteAllThreads(cap);
1603 #if defined(THREADED_RTS)
1604 // Discard all the sparks from every Capability. Why?
1605 // They'll probably be GC'd anyway since we've killed all the
1606 // threads. It just avoids the GC having to do any work to
1607 // figure out that any remaining sparks are garbage.
1608 for (i = 0; i < n_capabilities; i++) {
1609 capabilities[i].spark_stats.gcd +=
1610 sparkPoolSize(capabilities[i].sparks);
1611 // No race here since all Caps are stopped.
1612 discardSparksCap(&capabilities[i]);
1613 }
1614 #endif
1615 sched_state = SCHED_SHUTTING_DOWN;
1616 }
1617
1618 /*
1619 * When there are disabled capabilities, we want to migrate any
1620 * threads away from them. Normally this happens in the
1621 * scheduler's loop, but only for unbound threads - it's really
1622 * hard for a bound thread to migrate itself. So we have another
1623 * go here.
1624 */
1625 #if defined(THREADED_RTS)
1626 for (i = enabled_capabilities; i < n_capabilities; i++) {
1627 Capability *tmp_cap, *dest_cap;
1628 tmp_cap = &capabilities[i];
1629 ASSERT(tmp_cap->disabled);
1630 if (i != cap->no) {
1631 dest_cap = &capabilities[i % enabled_capabilities];
1632 while (!emptyRunQueue(tmp_cap)) {
1633 tso = popRunQueue(tmp_cap);
1634 migrateThread(tmp_cap, tso, dest_cap);
1635 if (tso->bound) {
1636 traceTaskMigrate(tso->bound->task,
1637 tso->bound->task->cap,
1638 dest_cap);
1639 tso->bound->task->cap = dest_cap;
1640 }
1641 }
1642 }
1643 }
1644 #endif
1645
1646 #if defined(THREADED_RTS)
1647 // reset pending_sync *before* GC, so that when the GC threads
1648 // emerge they don't immediately re-enter the GC.
1649 pending_sync = 0;
1650 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1651 #else
1652 GarbageCollect(collect_gen, heap_census, 0, cap);
1653 #endif
1654
1655 traceSparkCounters(cap);
1656
1657 switch (recent_activity) {
1658 case ACTIVITY_INACTIVE:
1659 if (force_major) {
1660 // We are doing a GC because the system has been idle for a
1661 // timeslice and we need to check for deadlock. Record the
1662 // fact that we've done a GC and turn off the timer signal;
1663 // it will get re-enabled if we run any threads after the GC.
1664 recent_activity = ACTIVITY_DONE_GC;
1665 #ifndef PROFILING
1666 stopTimer();
1667 #endif
1668 break;
1669 }
1670 // fall through...
1671
1672 case ACTIVITY_MAYBE_NO:
1673 // the GC might have taken long enough for the timer to set
1674 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1675 // but we aren't necessarily deadlocked:
1676 recent_activity = ACTIVITY_YES;
1677 break;
1678
1679 case ACTIVITY_DONE_GC:
1680 // If we are actually active, the scheduler will reset the
1681 // recent_activity flag and re-enable the timer.
1682 break;
1683 }
1684
1685 #if defined(THREADED_RTS)
1686 // Stable point where we can do a global check on our spark counters
1687 ASSERT(checkSparkCountInvariant());
1688 #endif
1689
1690 // The heap census itself is done during GarbageCollect().
1691 if (heap_census) {
1692 performHeapProfile = rtsFalse;
1693 }
1694
1695 #if defined(THREADED_RTS)
1696
1697 // If n_capabilities has changed during GC, we're in trouble.
1698 ASSERT(n_capabilities == old_n_capabilities);
1699
1700 if (gc_type == SYNC_GC_PAR)
1701 {
1702 releaseGCThreads(cap);
1703 for (i = 0; i < n_capabilities; i++) {
1704 if (i != cap->no) {
1705 if (idle_cap[i]) {
1706 ASSERT(capabilities[i].running_task == task);
1707 task->cap = &capabilities[i];
1708 releaseCapability(&capabilities[i]);
1709 } else {
1710 ASSERT(capabilities[i].running_task != task);
1711 }
1712 }
1713 }
1714 task->cap = cap;
1715 }
1716 #endif
1717
1718 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1719 // GC set the heap_overflow flag, so we should proceed with
1720 // an orderly shutdown now. Ultimately we want the main
1721 // thread to return to its caller with HeapExhausted, at which
1722 // point the caller should call hs_exit(). The first step is
1723 // to delete all the threads.
1724 //
1725 // Another way to do this would be to raise an exception in
1726 // the main thread, which we really should do because it gives
1727 // the program a chance to clean up. But how do we find the
1728 // main thread? It should presumably be the same one that
1729 // gets ^C exceptions, but that's all done on the Haskell side
1730 // (GHC.TopHandler).
1731 sched_state = SCHED_INTERRUPTING;
1732 goto delete_threads_and_gc;
1733 }
1734
1735 #ifdef SPARKBALANCE
1736 /* JB
1737 Once we are all together... this would be the place to balance all
1738 spark pools. No concurrent stealing or adding of new sparks can
1739 occur. Should be defined in Sparks.c. */
1740 balanceSparkPoolsCaps(n_capabilities, capabilities);
1741 #endif
1742
1743 #if defined(THREADED_RTS)
1744 if (gc_type == SYNC_GC_SEQ) {
1745 // release our stash of capabilities.
1746 releaseAllCapabilities(n_capabilities, cap, task);
1747 }
1748 #endif
1749
1750 return;
1751 }
1752
1753 /* ---------------------------------------------------------------------------
1754 * Singleton fork(). Do not copy any running threads.
1755 * ------------------------------------------------------------------------- */
1756
1757 pid_t
1758 forkProcess(HsStablePtr *entry
1759 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1760 STG_UNUSED
1761 #endif
1762 )
1763 {
1764 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1765 pid_t pid;
1766 StgTSO* t,*next;
1767 Capability *cap;
1768 nat g;
1769 Task *task = NULL;
1770 nat i;
1771 #ifdef THREADED_RTS
1772 nat sync;
1773 #endif
1774
1775 debugTrace(DEBUG_sched, "forking!");
1776
1777 task = newBoundTask();
1778
1779 cap = NULL;
1780 waitForReturnCapability(&cap, task);
1781
1782 #ifdef THREADED_RTS
1783 do {
1784 sync = requestSync(&cap, task, SYNC_OTHER);
1785 } while (sync);
1786
1787 acquireAllCapabilities(cap,task);
1788
1789 pending_sync = 0;
1790 #endif
1791
1792 // no funny business: hold locks while we fork, otherwise if some
1793 // other thread is holding a lock when the fork happens, the data
1794 // structure protected by the lock will forever be in an
1795 // inconsistent state in the child. See also #1391.
1796 ACQUIRE_LOCK(&sched_mutex);
1797 ACQUIRE_LOCK(&sm_mutex);
1798 ACQUIRE_LOCK(&stable_mutex);
1799 ACQUIRE_LOCK(&task->lock);
1800
1801 for (i=0; i < n_capabilities; i++) {
1802 ACQUIRE_LOCK(&capabilities[i].lock);
1803 }
1804
1805 stopTimer(); // See #4074
1806
1807 #if defined(TRACING)
1808 flushEventLog(); // so that child won't inherit dirty file buffers
1809 #endif
1810
1811 pid = fork();
1812
1813 if (pid) { // parent
1814
1815 startTimer(); // #4074
1816
1817 RELEASE_LOCK(&sched_mutex);
1818 RELEASE_LOCK(&sm_mutex);
1819 RELEASE_LOCK(&stable_mutex);
1820 RELEASE_LOCK(&task->lock);
1821
1822 for (i=0; i < n_capabilities; i++) {
1823 releaseCapability_(&capabilities[i],rtsFalse);
1824 RELEASE_LOCK(&capabilities[i].lock);
1825 }
1826 boundTaskExiting(task);
1827
1828 // just return the pid
1829 return pid;
1830
1831 } else { // child
1832
1833 #if defined(THREADED_RTS)
1834 initMutex(&sched_mutex);
1835 initMutex(&sm_mutex);
1836 initMutex(&stable_mutex);
1837 initMutex(&task->lock);
1838
1839 for (i=0; i < n_capabilities; i++) {
1840 initMutex(&capabilities[i].lock);
1841 }
1842 #endif
1843
1844 #ifdef TRACING
1845 resetTracing();
1846 #endif
1847
1848 // Now, all OS threads except the thread that forked are
1849 // stopped. We need to stop all Haskell threads, including
1850 // those involved in foreign calls. Also we need to delete
1851 // all Tasks, because they correspond to OS threads that are
1852 // now gone.
1853
1854 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1855 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1856 next = t->global_link;
1857 // don't allow threads to catch the ThreadKilled
1858 // exception, but we do want to raiseAsync() because these
1859 // threads may be evaluating thunks that we need later.
1860 deleteThread_(t->cap,t);
1861
1862 // stop the GC from updating the InCall to point to
1863 // the TSO. This is only necessary because the
1864 // OSThread bound to the TSO has been killed, and
1865 // won't get a chance to exit in the usual way (see
1866 // also scheduleHandleThreadFinished).
1867 t->bound = NULL;
1868 }
1869 }
1870
1871 discardTasksExcept(task);
1872
1873 for (i=0; i < n_capabilities; i++) {
1874 cap = &capabilities[i];
1875
1876 // Empty the run queue. It seems tempting to let all the
1877 // killed threads stay on the run queue as zombies to be
1878 // cleaned up later, but some of them may correspond to
1879 // bound threads for which the corresponding Task does not
1880 // exist.
1881 truncateRunQueue(cap);
1882
1883 // Any suspended C-calling Tasks are no more, their OS threads
1884 // don't exist now:
1885 cap->suspended_ccalls = NULL;
1886
1887 #if defined(THREADED_RTS)
1888 // Wipe our spare workers list, they no longer exist. New
1889 // workers will be created if necessary.
1890 cap->spare_workers = NULL;
1891 cap->n_spare_workers = 0;
1892 cap->returning_tasks_hd = NULL;
1893 cap->returning_tasks_tl = NULL;
1894 #endif
1895
1896 // Release all caps except 0, we'll use that for starting
1897 // the IO manager and running the client action below.
1898 if (cap->no != 0) {
1899 task->cap = cap;
1900 releaseCapability(cap);
1901 }
1902 }
1903 cap = &capabilities[0];
1904 task->cap = cap;
1905
1906 // Empty the threads lists. Otherwise, the garbage
1907 // collector may attempt to resurrect some of these threads.
1908 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1909 generations[g].threads = END_TSO_QUEUE;
1910 }
1911
1912 // On Unix, all timers are reset in the child, so we need to start
1913 // the timer again.
1914 initTimer();
1915 startTimer();
1916
1917 // TODO: need to trace various other things in the child
1918 // like startup event, capabilities, process info etc
1919 traceTaskCreate(task, cap);
1920
1921 #if defined(THREADED_RTS)
1922 ioManagerStartCap(&cap);
1923 #endif
1924
1925 rts_evalStableIO(&cap, entry, NULL); // run the action
1926 rts_checkSchedStatus("forkProcess",cap);
1927
1928 rts_unlock(cap);
1929 hs_exit(); // clean up and exit
1930 stg_exit(EXIT_SUCCESS);
1931 }
1932 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1933 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1934 #endif
1935 }
1936
1937 /* ---------------------------------------------------------------------------
1938 * Changing the number of Capabilities
1939 *
1940 * Changing the number of Capabilities is very tricky! We can only do
1941 * it with the system fully stopped, so we do a full sync with
1942 * requestSync(SYNC_OTHER) and grab all the capabilities.
1943 *
1944 * Then we resize the appropriate data structures, and update all
1945 * references to the old data structures which have now moved.
1946 * Finally we release the Capabilities we are holding, and start
1947 * worker Tasks on the new Capabilities we created.
1948 *
1949 * ------------------------------------------------------------------------- */
1950
1951 void
1952 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1953 {
1954 #if !defined(THREADED_RTS)
1955 if (new_n_capabilities != 1) {
1956 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1957 }
1958 return;
1959 #elif defined(NOSMP)
1960 if (new_n_capabilities != 1) {
1961 errorBelch("setNumCapabilities: not supported on this platform");
1962 }
1963 return;
1964 #else
1965 Task *task;
1966 Capability *cap;
1967 nat sync;
1968 StgTSO* t;
1969 nat g, n;
1970 Capability *old_capabilities = NULL;
1971 nat old_n_capabilities = n_capabilities;
1972
1973 if (new_n_capabilities == enabled_capabilities) return;
1974
1975 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1976 enabled_capabilities, new_n_capabilities);
1977
1978 cap = rts_lock();
1979 task = cap->running_task;
1980
1981 do {
1982 sync = requestSync(&cap, task, SYNC_OTHER);
1983 } while (sync);
1984
1985 acquireAllCapabilities(cap,task);
1986
1987 pending_sync = 0;
1988
1989 if (new_n_capabilities < enabled_capabilities)
1990 {
1991 // Reducing the number of capabilities: we do not actually
1992 // remove the extra capabilities, we just mark them as
1993 // "disabled". This has the following effects:
1994 //
1995 // - threads on a disabled capability are migrated away by the
1996 // scheduler loop
1997 //
1998 // - disabled capabilities do not participate in GC
1999 // (see scheduleDoGC())
2000 //
2001 // - No spark threads are created on this capability
2002 // (see scheduleActivateSpark())
2003 //
2004 // - We do not attempt to migrate threads *to* a disabled
2005 // capability (see schedulePushWork()).
2006 //
2007 // but in other respects, a disabled capability remains
2008 // alive. Threads may be woken up on a disabled capability,
2009 // but they will be immediately migrated away.
2010 //
2011 // This approach is much easier than trying to actually remove
2012 // the capability; we don't have to worry about GC data
2013 // structures, the nursery, etc.
2014 //
2015 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2016 capabilities[n].disabled = rtsTrue;
2017 traceCapDisable(&capabilities[n]);
2018 }
2019 enabled_capabilities = new_n_capabilities;
2020 }
2021 else
2022 {
2023 // Increasing the number of enabled capabilities.
2024 //
2025 // enable any disabled capabilities, up to the required number
2026 for (n = enabled_capabilities;
2027 n < new_n_capabilities && n < n_capabilities; n++) {
2028 capabilities[n].disabled = rtsFalse;
2029 traceCapEnable(&capabilities[n]);
2030 }
2031 enabled_capabilities = n;
2032
2033 if (new_n_capabilities > n_capabilities) {
2034 #if defined(TRACING)
2035 // Allocate eventlog buffers for the new capabilities. Note this
2036 // must be done before calling moreCapabilities(), because that
2037 // will emit events about creating the new capabilities and adding
2038 // them to existing capsets.
2039 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2040 #endif
2041
2042 // Resize the capabilities array
2043 // NB. after this, capabilities points somewhere new. Any pointers
2044 // of type (Capability *) are now invalid.
2045 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2046
2047 // update our own cap pointer
2048 cap = &capabilities[cap->no];
2049
2050 // Resize and update storage manager data structures
2051 storageAddCapabilities(n_capabilities, new_n_capabilities);
2052
2053 // Update (Capability *) refs in the Task manager.
2054 updateCapabilityRefs();
2055
2056 // Update (Capability *) refs from TSOs
2057 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2058 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2059 t->cap = &capabilities[t->cap->no];
2060 }
2061 }
2062 }
2063 }
2064
2065 // update n_capabilities before things start running
2066 if (new_n_capabilities > n_capabilities) {
2067 n_capabilities = enabled_capabilities = new_n_capabilities;
2068 }
2069
2070 // Start worker tasks on the new Capabilities
2071 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2072
2073 // We're done: release the original Capabilities
2074 releaseAllCapabilities(old_n_capabilities, cap,task);
2075
2076 // We can't free the old array until now, because we access it
2077 // while updating pointers in updateCapabilityRefs().
2078 if (old_capabilities) {
2079 stgFree(old_capabilities);
2080 }
2081
2082 // Notify IO manager that the number of capabilities has changed.
2083 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2084
2085 rts_unlock(cap);
2086
2087 #endif // THREADED_RTS
2088 }
2089
2090
2091
2092 /* ---------------------------------------------------------------------------
2093 * Delete all the threads in the system
2094 * ------------------------------------------------------------------------- */
2095
2096 static void
2097 deleteAllThreads ( Capability *cap )
2098 {
2099 // NOTE: only safe to call if we own all capabilities.
2100
2101 StgTSO* t, *next;
2102 nat g;
2103
2104 debugTrace(DEBUG_sched,"deleting all threads");
2105 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2106 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2107 next = t->global_link;
2108 deleteThread(cap,t);
2109 }
2110 }
2111
2112 // The run queue now contains a bunch of ThreadKilled threads. We
2113 // must not throw these away: the main thread(s) will be in there
2114 // somewhere, and the main scheduler loop has to deal with it.
2115 // Also, the run queue is the only thing keeping these threads from
2116 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2117
2118 #if !defined(THREADED_RTS)
2119 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2120 ASSERT(sleeping_queue == END_TSO_QUEUE);
2121 #endif
2122 }
2123
2124 /* -----------------------------------------------------------------------------
2125 Managing the suspended_ccalls list.
2126 Locks required: sched_mutex
2127 -------------------------------------------------------------------------- */
2128
2129 STATIC_INLINE void
2130 suspendTask (Capability *cap, Task *task)
2131 {
2132 InCall *incall;
2133
2134 incall = task->incall;
2135 ASSERT(incall->next == NULL && incall->prev == NULL);
2136 incall->next = cap->suspended_ccalls;
2137 incall->prev = NULL;
2138 if (cap->suspended_ccalls) {
2139 cap->suspended_ccalls->prev = incall;
2140 }
2141 cap->suspended_ccalls = incall;
2142 }
2143
2144 STATIC_INLINE void
2145 recoverSuspendedTask (Capability *cap, Task *task)
2146 {
2147 InCall *incall;
2148
2149 incall = task->incall;
2150 if (incall->prev) {
2151 incall->prev->next = incall->next;
2152 } else {
2153 ASSERT(cap->suspended_ccalls == incall);
2154 cap->suspended_ccalls = incall->next;
2155 }
2156 if (incall->next) {
2157 incall->next->prev = incall->prev;
2158 }
2159 incall->next = incall->prev = NULL;
2160 }
2161
2162 /* ---------------------------------------------------------------------------
2163 * Suspending & resuming Haskell threads.
2164 *
2165 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2166 * its capability before calling the C function. This allows another
2167 * task to pick up the capability and carry on running Haskell
2168 * threads. It also means that if the C call blocks, it won't lock
2169 * the whole system.
2170 *
2171 * The Haskell thread making the C call is put to sleep for the
2172 * duration of the call, on the suspended_ccalling_threads queue. We
2173 * give out a token to the task, which it can use to resume the thread
2174 * on return from the C function.
2175 *
2176 * If this is an interruptible C call, this means that the FFI call may be
2177 * unceremoniously terminated and should be scheduled on an
2178 * unbound worker thread.
2179 * ------------------------------------------------------------------------- */
2180
2181 void *
2182 suspendThread (StgRegTable *reg, rtsBool interruptible)
2183 {
2184 Capability *cap;
2185 int saved_errno;
2186 StgTSO *tso;
2187 Task *task;
2188 #if mingw32_HOST_OS
2189 StgWord32 saved_winerror;
2190 #endif
2191
2192 saved_errno = errno;
2193 #if mingw32_HOST_OS
2194 saved_winerror = GetLastError();
2195 #endif
2196
2197 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2198 */
2199 cap = regTableToCapability(reg);
2200
2201 task = cap->running_task;
2202 tso = cap->r.rCurrentTSO;
2203
2204 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2205
2206 // XXX this might not be necessary --SDM
2207 tso->what_next = ThreadRunGHC;
2208
2209 threadPaused(cap,tso);
2210
2211 if (interruptible) {
2212 tso->why_blocked = BlockedOnCCall_Interruptible;
2213 } else {
2214 tso->why_blocked = BlockedOnCCall;
2215 }
2216
2217 // Hand back capability
2218 task->incall->suspended_tso = tso;
2219 task->incall->suspended_cap = cap;
2220
2221 ACQUIRE_LOCK(&cap->lock);
2222
2223 suspendTask(cap,task);
2224 cap->in_haskell = rtsFalse;
2225 releaseCapability_(cap,rtsFalse);
2226
2227 RELEASE_LOCK(&cap->lock);
2228
2229 errno = saved_errno;
2230 #if mingw32_HOST_OS
2231 SetLastError(saved_winerror);
2232 #endif
2233 return task;
2234 }
2235
2236 StgRegTable *
2237 resumeThread (void *task_)
2238 {
2239 StgTSO *tso;
2240 InCall *incall;
2241 Capability *cap;
2242 Task *task = task_;
2243 int saved_errno;
2244 #if mingw32_HOST_OS
2245 StgWord32 saved_winerror;
2246 #endif
2247
2248 saved_errno = errno;
2249 #if mingw32_HOST_OS
2250 saved_winerror = GetLastError();
2251 #endif
2252
2253 incall = task->incall;
2254 cap = incall->suspended_cap;
2255 task->cap = cap;
2256
2257 // Wait for permission to re-enter the RTS with the result.
2258 waitForReturnCapability(&cap,task);
2259 // we might be on a different capability now... but if so, our
2260 // entry on the suspended_ccalls list will also have been
2261 // migrated.
2262
2263 // Remove the thread from the suspended list
2264 recoverSuspendedTask(cap,task);
2265
2266 tso = incall->suspended_tso;
2267 incall->suspended_tso = NULL;
2268 incall->suspended_cap = NULL;
2269 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2270
2271 traceEventRunThread(cap, tso);
2272
2273 /* Reset blocking status */
2274 tso->why_blocked = NotBlocked;
2275
2276 if ((tso->flags & TSO_BLOCKEX) == 0) {
2277 // avoid locking the TSO if we don't have to
2278 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2279 maybePerformBlockedException(cap,tso);
2280 }
2281 }
2282
2283 cap->r.rCurrentTSO = tso;
2284 cap->in_haskell = rtsTrue;
2285 errno = saved_errno;
2286 #if mingw32_HOST_OS
2287 SetLastError(saved_winerror);
2288 #endif
2289
2290 /* We might have GC'd, mark the TSO dirty again */
2291 dirty_TSO(cap,tso);
2292 dirty_STACK(cap,tso->stackobj);
2293
2294 IF_DEBUG(sanity, checkTSO(tso));
2295
2296 return &cap->r;
2297 }
2298
2299 /* ---------------------------------------------------------------------------
2300 * scheduleThread()
2301 *
2302 * scheduleThread puts a thread on the end of the runnable queue.
2303 * This will usually be done immediately after a thread is created.
2304 * The caller of scheduleThread must create the thread using e.g.
2305 * createThread and push an appropriate closure
2306 * on this thread's stack before the scheduler is invoked.
2307 * ------------------------------------------------------------------------ */
2308
2309 void
2310 scheduleThread(Capability *cap, StgTSO *tso)
2311 {
2312 // The thread goes at the *end* of the run-queue, to avoid possible
2313 // starvation of any threads already on the queue.
2314 appendToRunQueue(cap,tso);
2315 }
2316
2317 void
2318 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2319 {
2320 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2321 // move this thread from now on.
2322 #if defined(THREADED_RTS)
2323 cpu %= enabled_capabilities;
2324 if (cpu == cap->no) {
2325 appendToRunQueue(cap,tso);
2326 } else {
2327 migrateThread(cap, tso, &capabilities[cpu]);
2328 }
2329 #else
2330 appendToRunQueue(cap,tso);
2331 #endif
2332 }
2333
2334 void
2335 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2336 {
2337 Task *task;
2338 DEBUG_ONLY( StgThreadID id );
2339 Capability *cap;
2340
2341 cap = *pcap;
2342
2343 // We already created/initialised the Task
2344 task = cap->running_task;
2345
2346 // This TSO is now a bound thread; make the Task and TSO
2347 // point to each other.
2348 tso->bound = task->incall;
2349 tso->cap = cap;
2350
2351 task->incall->tso = tso;
2352 task->incall->ret = ret;
2353 task->incall->stat = NoStatus;
2354
2355 appendToRunQueue(cap,tso);
2356
2357 DEBUG_ONLY( id = tso->id );
2358 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2359
2360 cap = schedule(cap,task);
2361
2362 ASSERT(task->incall->stat != NoStatus);
2363 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2364
2365 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2366 *pcap = cap;
2367 }
2368
2369 /* ----------------------------------------------------------------------------
2370 * Starting Tasks
2371 * ------------------------------------------------------------------------- */
2372
2373 #if defined(THREADED_RTS)
2374 void scheduleWorker (Capability *cap, Task *task)
2375 {
2376 // schedule() runs without a lock.
2377 cap = schedule(cap,task);
2378
2379 // On exit from schedule(), we have a Capability, but possibly not
2380 // the same one we started with.
2381
2382 // During shutdown, the requirement is that after all the
2383 // Capabilities are shut down, all workers that are shutting down
2384 // have finished workerTaskStop(). This is why we hold on to
2385 // cap->lock until we've finished workerTaskStop() below.
2386 //
2387 // There may be workers still involved in foreign calls; those
2388 // will just block in waitForReturnCapability() because the
2389 // Capability has been shut down.
2390 //
2391 ACQUIRE_LOCK(&cap->lock);
2392 releaseCapability_(cap,rtsFalse);
2393 workerTaskStop(task);
2394 RELEASE_LOCK(&cap->lock);
2395 }
2396 #endif
2397
2398 /* ---------------------------------------------------------------------------
2399 * Start new worker tasks on Capabilities from--to
2400 * -------------------------------------------------------------------------- */
2401
2402 static void
2403 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2404 {
2405 #if defined(THREADED_RTS)
2406 nat i;
2407 Capability *cap;
2408
2409 for (i = from; i < to; i++) {
2410 cap = &capabilities[i];
2411 ACQUIRE_LOCK(&cap->lock);
2412 startWorkerTask(cap);
2413 RELEASE_LOCK(&cap->lock);
2414 }
2415 #endif
2416 }
2417
2418 /* ---------------------------------------------------------------------------
2419 * initScheduler()
2420 *
2421 * Initialise the scheduler. This resets all the queues - if the
2422 * queues contained any threads, they'll be garbage collected at the
2423 * next pass.
2424 *
2425 * ------------------------------------------------------------------------ */
2426
2427 void
2428 initScheduler(void)
2429 {
2430 #if !defined(THREADED_RTS)
2431 blocked_queue_hd = END_TSO_QUEUE;
2432 blocked_queue_tl = END_TSO_QUEUE;
2433 sleeping_queue = END_TSO_QUEUE;
2434 #endif
2435
2436 sched_state = SCHED_RUNNING;
2437 recent_activity = ACTIVITY_YES;
2438
2439 #if defined(THREADED_RTS)
2440 /* Initialise the mutex and condition variables used by
2441 * the scheduler. */
2442 initMutex(&sched_mutex);
2443 #endif
2444
2445 ACQUIRE_LOCK(&sched_mutex);
2446
2447 /* A capability holds the state a native thread needs in
2448 * order to execute STG code. At least one capability is
2449 * floating around (only THREADED_RTS builds have more than one).
2450 */
2451 initCapabilities();
2452
2453 initTaskManager();
2454
2455 /*
2456 * Eagerly start one worker to run each Capability, except for
2457 * Capability 0. The idea is that we're probably going to start a
2458 * bound thread on Capability 0 pretty soon, so we don't want a
2459 * worker task hogging it.
2460 */
2461 startWorkerTasks(1, n_capabilities);
2462
2463 RELEASE_LOCK(&sched_mutex);
2464
2465 }
2466
2467 void
2468 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2469 /* see Capability.c, shutdownCapability() */
2470 {
2471 Task *task = NULL;
2472
2473 task = newBoundTask();
2474
2475 // If we haven't killed all the threads yet, do it now.
2476 if (sched_state < SCHED_SHUTTING_DOWN) {
2477 sched_state = SCHED_INTERRUPTING;
2478 Capability *cap = task->cap;
2479 waitForReturnCapability(&cap,task);
2480 scheduleDoGC(&cap,task,rtsTrue);
2481 ASSERT(task->incall->tso == NULL);
2482 releaseCapability(cap);
2483 }
2484 sched_state = SCHED_SHUTTING_DOWN;
2485
2486 shutdownCapabilities(task, wait_foreign);
2487
2488 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2489 // n_failed_trygrab_idles, n_idle_caps);
2490
2491 boundTaskExiting(task);
2492 }
2493
2494 void
2495 freeScheduler( void )
2496 {
2497 nat still_running;
2498
2499 ACQUIRE_LOCK(&sched_mutex);
2500 still_running = freeTaskManager();
2501 // We can only free the Capabilities if there are no Tasks still
2502 // running. We might have a Task about to return from a foreign
2503 // call into waitForReturnCapability(), for example (actually,
2504 // this should be the *only* thing that a still-running Task can
2505 // do at this point, and it will block waiting for the
2506 // Capability).
2507 if (still_running == 0) {
2508 freeCapabilities();
2509 if (n_capabilities != 1) {
2510 stgFree(capabilities);
2511 }
2512 }
2513 RELEASE_LOCK(&sched_mutex);
2514 #if defined(THREADED_RTS)
2515 closeMutex(&sched_mutex);
2516 #endif
2517 }
2518
2519 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2520 void *user USED_IF_NOT_THREADS)
2521 {
2522 #if !defined(THREADED_RTS)
2523 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2524 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2525 evac(user, (StgClosure **)(void *)&sleeping_queue);
2526 #endif
2527 }
2528
2529 /* -----------------------------------------------------------------------------
2530 performGC
2531
2532 This is the interface to the garbage collector from Haskell land.
2533 We provide this so that external C code can allocate and garbage
2534 collect when called from Haskell via _ccall_GC.
2535 -------------------------------------------------------------------------- */
2536
2537 static void
2538 performGC_(rtsBool force_major)
2539 {
2540 Task *task;
2541 Capability *cap = NULL;
2542
2543 // We must grab a new Task here, because the existing Task may be
2544 // associated with a particular Capability, and chained onto the
2545 // suspended_ccalls queue.
2546 task = newBoundTask();
2547
2548 // TODO: do we need to traceTask*() here?
2549
2550 waitForReturnCapability(&cap,task);
2551 scheduleDoGC(&cap,task,force_major);
2552 releaseCapability(cap);
2553 boundTaskExiting(task);
2554 }
2555
2556 void
2557 performGC(void)
2558 {
2559 performGC_(rtsFalse);
2560 }
2561
2562 void
2563 performMajorGC(void)
2564 {
2565 performGC_(rtsTrue);
2566 }
2567
2568 /* ---------------------------------------------------------------------------
2569 Interrupt execution
2570 - usually called inside a signal handler so it mustn't do anything fancy.
2571 ------------------------------------------------------------------------ */
2572
2573 void
2574 interruptStgRts(void)
2575 {
2576 sched_state = SCHED_INTERRUPTING;
2577 interruptAllCapabilities();
2578 #if defined(THREADED_RTS)
2579 wakeUpRts();
2580 #endif
2581 }
2582
2583 /* -----------------------------------------------------------------------------
2584 Wake up the RTS
2585
2586 This function causes at least one OS thread to wake up and run the
2587 scheduler loop. It is invoked when the RTS might be deadlocked, or
2588 an external event has arrived that may need servicing (eg. a
2589 keyboard interrupt).
2590
2591 In the single-threaded RTS we don't do anything here; we only have
2592 one thread anyway, and the event that caused us to want to wake up
2593 will have interrupted any blocking system call in progress anyway.
2594 -------------------------------------------------------------------------- */
2595
2596 #if defined(THREADED_RTS)
2597 void wakeUpRts(void)
2598 {
2599 // This forces the IO Manager thread to wakeup, which will
2600 // in turn ensure that some OS thread wakes up and runs the
2601 // scheduler loop, which will cause a GC and deadlock check.
2602 ioManagerWakeup();
2603 }
2604 #endif
2605
2606 /* -----------------------------------------------------------------------------
2607 Deleting threads
2608
2609 This is used for interruption (^C) and forking, and corresponds to
2610 raising an exception but without letting the thread catch the
2611 exception.
2612 -------------------------------------------------------------------------- */
2613
2614 static void
2615 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2616 {
2617 // NOTE: must only be called on a TSO that we have exclusive
2618 // access to, because we will call throwToSingleThreaded() below.
2619 // The TSO must be on the run queue of the Capability we own, or
2620 // we must own all Capabilities.
2621
2622 if (tso->why_blocked != BlockedOnCCall &&
2623 tso->why_blocked != BlockedOnCCall_Interruptible) {
2624 throwToSingleThreaded(tso->cap,tso,NULL);
2625 }
2626 }
2627
2628 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2629 static void
2630 deleteThread_(Capability *cap, StgTSO *tso)
2631 { // for forkProcess only:
2632 // like deleteThread(), but we delete threads in foreign calls, too.
2633
2634 if (tso->why_blocked == BlockedOnCCall ||
2635 tso->why_blocked == BlockedOnCCall_Interruptible) {
2636 tso->what_next = ThreadKilled;
2637 appendToRunQueue(tso->cap, tso);
2638 } else {
2639 deleteThread(cap,tso);
2640 }
2641 }
2642 #endif
2643
2644 /* -----------------------------------------------------------------------------
2645 raiseExceptionHelper
2646
2647 This function is called by the raise# primitve, just so that we can
2648 move some of the tricky bits of raising an exception from C-- into
2649 C. Who knows, it might be a useful re-useable thing here too.
2650 -------------------------------------------------------------------------- */
2651
2652 StgWord
2653 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2654 {
2655 Capability *cap = regTableToCapability(reg);
2656 StgThunk *raise_closure = NULL;
2657 StgPtr p, next;
2658 StgRetInfoTable *info;
2659 //
2660 // This closure represents the expression 'raise# E' where E
2661 // is the exception raise. It is used to overwrite all the
2662 // thunks which are currently under evaluataion.
2663 //
2664
2665 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2666 // LDV profiling: stg_raise_info has THUNK as its closure
2667 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2668 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2669 // 1 does not cause any problem unless profiling is performed.
2670 // However, when LDV profiling goes on, we need to linearly scan
2671 // small object pool, where raise_closure is stored, so we should
2672 // use MIN_UPD_SIZE.
2673 //
2674 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2675 // sizeofW(StgClosure)+1);
2676 //
2677
2678 //
2679 // Walk up the stack, looking for the catch frame. On the way,
2680 // we update any closures pointed to from update frames with the
2681 // raise closure that we just built.
2682 //
2683 p = tso->stackobj->sp;
2684 while(1) {
2685 info = get_ret_itbl((StgClosure *)p);
2686 next = p + stack_frame_sizeW((StgClosure *)p);
2687 switch (info->i.type) {
2688
2689 case UPDATE_FRAME:
2690 // Only create raise_closure if we need to.
2691 if (raise_closure == NULL) {
2692 raise_closure =
2693 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2694 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2695 raise_closure->payload[0] = exception;
2696 }
2697 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2698 (StgClosure *)raise_closure);
2699 p = next;
2700 continue;
2701
2702 case ATOMICALLY_FRAME:
2703 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2704 tso->stackobj->sp = p;
2705 return ATOMICALLY_FRAME;
2706
2707 case CATCH_FRAME:
2708 tso->stackobj->sp = p;
2709 return CATCH_FRAME;
2710
2711 case CATCH_STM_FRAME:
2712 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2713 tso->stackobj->sp = p;
2714 return CATCH_STM_FRAME;
2715
2716 case UNDERFLOW_FRAME:
2717 tso->stackobj->sp = p;
2718 threadStackUnderflow(cap,tso);
2719 p = tso->stackobj->sp;
2720 continue;
2721
2722 case STOP_FRAME:
2723 tso->stackobj->sp = p;
2724 return STOP_FRAME;
2725
2726 case CATCH_RETRY_FRAME: {
2727 StgTRecHeader *trec = tso -> trec;
2728 StgTRecHeader *outer = trec -> enclosing_trec;
2729 debugTrace(DEBUG_stm,
2730 "found CATCH_RETRY_FRAME at %p during raise", p);
2731 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2732 stmAbortTransaction(cap, trec);
2733 stmFreeAbortedTRec(cap, trec);
2734 tso -> trec = outer;
2735 p = next;
2736 continue;
2737 }
2738
2739 default:
2740 p = next;
2741 continue;
2742 }
2743 }
2744 }
2745
2746
2747 /* -----------------------------------------------------------------------------
2748 findRetryFrameHelper
2749
2750 This function is called by the retry# primitive. It traverses the stack
2751 leaving tso->sp referring to the frame which should handle the retry.
2752
2753 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2754 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2755
2756 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2757 create) because retries are not considered to be exceptions, despite the
2758 similar implementation.
2759
2760 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2761 not be created within memory transactions.
2762 -------------------------------------------------------------------------- */
2763
2764 StgWord
2765 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2766 {
2767 StgPtr p, next;
2768 StgRetInfoTable *info;
2769
2770 p = tso->stackobj->sp;
2771 while (1) {
2772 info = get_ret_itbl((StgClosure *)p);
2773 next = p + stack_frame_sizeW((StgClosure *)p);
2774 switch (info->i.type) {
2775
2776 case ATOMICALLY_FRAME:
2777 debugTrace(DEBUG_stm,
2778 "found ATOMICALLY_FRAME at %p during retry", p);
2779 tso->stackobj->sp = p;
2780 return ATOMICALLY_FRAME;
2781
2782 case CATCH_RETRY_FRAME:
2783 debugTrace(DEBUG_stm,
2784 "found CATCH_RETRY_FRAME at %p during retry", p);
2785 tso->stackobj->sp = p;
2786 return CATCH_RETRY_FRAME;
2787
2788 case CATCH_STM_FRAME: {
2789 StgTRecHeader *trec = tso -> trec;
2790 StgTRecHeader *outer = trec -> enclosing_trec;
2791 debugTrace(DEBUG_stm,
2792 "found CATCH_STM_FRAME at %p during retry", p);
2793 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2794 stmAbortTransaction(cap, trec);
2795 stmFreeAbortedTRec(cap, trec);
2796 tso -> trec = outer;
2797 p = next;
2798 continue;
2799 }
2800
2801 case UNDERFLOW_FRAME:
2802 tso->stackobj->sp = p;
2803 threadStackUnderflow(cap,tso);
2804 p = tso->stackobj->sp;
2805 continue;
2806
2807 default:
2808 ASSERT(info->i.type != CATCH_FRAME);
2809 ASSERT(info->i.type != STOP_FRAME);
2810 p = next;
2811 continue;
2812 }
2813 }
2814 }
2815
2816 /* -----------------------------------------------------------------------------
2817 resurrectThreads is called after garbage collection on the list of
2818 threads found to be garbage. Each of these threads will be woken
2819 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2820 on an MVar, or NonTermination if the thread was blocked on a Black
2821 Hole.
2822
2823 Locks: assumes we hold *all* the capabilities.
2824 -------------------------------------------------------------------------- */
2825
2826 void
2827 resurrectThreads (StgTSO *threads)
2828 {
2829 StgTSO *tso, *next;
2830 Capability *cap;
2831 generation *gen;
2832
2833 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2834 next = tso->global_link;
2835
2836 gen = Bdescr((P_)tso)->gen;
2837 tso->global_link = gen->threads;
2838 gen->threads = tso;
2839
2840 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2841
2842 // Wake up the thread on the Capability it was last on
2843 cap = tso->cap;
2844
2845 switch (tso->why_blocked) {
2846 case BlockedOnMVar:
2847 case BlockedOnMVarRead:
2848 /* Called by GC - sched_mutex lock is currently held. */
2849 throwToSingleThreaded(cap, tso,
2850 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2851 break;
2852 case BlockedOnBlackHole:
2853 throwToSingleThreaded(cap, tso,
2854 (StgClosure *)nonTermination_closure);
2855 break;
2856 case BlockedOnSTM:
2857 throwToSingleThreaded(cap, tso,
2858 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2859 break;
2860 case NotBlocked:
2861 /* This might happen if the thread was blocked on a black hole
2862 * belonging to a thread that we've just woken up (raiseAsync
2863 * can wake up threads, remember...).
2864 */
2865 continue;
2866 case BlockedOnMsgThrowTo:
2867 // This can happen if the target is masking, blocks on a
2868 // black hole, and then is found to be unreachable. In
2869 // this case, we want to let the target wake up and carry
2870 // on, and do nothing to this thread.
2871 continue;
2872 default:
2873 barf("resurrectThreads: thread blocked in a strange way: %d",
2874 tso->why_blocked);
2875 }
2876 }
2877 }