remove unused sched_shutting_down
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 GRAN version:
177 In a GranSim setup this loop iterates over the global event queue.
178 This revolves around the global event queue, which determines what
179 to do next. Therefore, it's more complicated than either the
180 concurrent or the parallel (GUM) setup.
181 This version has been entirely removed (JB 2008/08).
182
183 GUM version:
184 GUM iterates over incoming messages.
185 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
186 and sends out a fish whenever it has nothing to do; in-between
187 doing the actual reductions (shared code below) it processes the
188 incoming messages and deals with delayed operations
189 (see PendingFetches).
190 This is not the ugliest code you could imagine, but it's bloody close.
191
192 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
193 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
194 as well as future GUM versions. This file has been refurbished to
195 only contain valid code, which is however incomplete, refers to
196 invalid includes etc.
197
198 ------------------------------------------------------------------------ */
199
200 static Capability *
201 schedule (Capability *initialCapability, Task *task)
202 {
203 StgTSO *t;
204 Capability *cap;
205 StgThreadReturnCode ret;
206 nat prev_what_next;
207 rtsBool ready_to_gc;
208 #if defined(THREADED_RTS)
209 rtsBool first = rtsTrue;
210 #endif
211
212 cap = initialCapability;
213
214 // Pre-condition: this task owns initialCapability.
215 // The sched_mutex is *NOT* held
216 // NB. on return, we still hold a capability.
217
218 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
219
220 schedulePreLoop();
221
222 // -----------------------------------------------------------
223 // Scheduler loop starts here:
224
225 while (1) {
226
227 // Check whether we have re-entered the RTS from Haskell without
228 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
229 // call).
230 if (cap->in_haskell) {
231 errorBelch("schedule: re-entered unsafely.\n"
232 " Perhaps a 'foreign import unsafe' should be 'safe'?");
233 stg_exit(EXIT_FAILURE);
234 }
235
236 // The interruption / shutdown sequence.
237 //
238 // In order to cleanly shut down the runtime, we want to:
239 // * make sure that all main threads return to their callers
240 // with the state 'Interrupted'.
241 // * clean up all OS threads assocated with the runtime
242 // * free all memory etc.
243 //
244 // So the sequence for ^C goes like this:
245 //
246 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
247 // arranges for some Capability to wake up
248 //
249 // * all threads in the system are halted, and the zombies are
250 // placed on the run queue for cleaning up. We acquire all
251 // the capabilities in order to delete the threads, this is
252 // done by scheduleDoGC() for convenience (because GC already
253 // needs to acquire all the capabilities). We can't kill
254 // threads involved in foreign calls.
255 //
256 // * somebody calls shutdownHaskell(), which calls exitScheduler()
257 //
258 // * sched_state := SCHED_SHUTTING_DOWN
259 //
260 // * all workers exit when the run queue on their capability
261 // drains. All main threads will also exit when their TSO
262 // reaches the head of the run queue and they can return.
263 //
264 // * eventually all Capabilities will shut down, and the RTS can
265 // exit.
266 //
267 // * We might be left with threads blocked in foreign calls,
268 // we should really attempt to kill these somehow (TODO);
269
270 switch (sched_state) {
271 case SCHED_RUNNING:
272 break;
273 case SCHED_INTERRUPTING:
274 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
275 /* scheduleDoGC() deletes all the threads */
276 scheduleDoGC(&cap,task,rtsFalse);
277
278 // after scheduleDoGC(), we must be shutting down. Either some
279 // other Capability did the final GC, or we did it above,
280 // either way we can fall through to the SCHED_SHUTTING_DOWN
281 // case now.
282 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
283 // fall through
284
285 case SCHED_SHUTTING_DOWN:
286 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
287 // If we are a worker, just exit. If we're a bound thread
288 // then we will exit below when we've removed our TSO from
289 // the run queue.
290 if (!isBoundTask(task) && emptyRunQueue(cap)) {
291 return cap;
292 }
293 break;
294 default:
295 barf("sched_state: %d", sched_state);
296 }
297
298 scheduleFindWork(&cap);
299
300 /* work pushing, currently relevant only for THREADED_RTS:
301 (pushes threads, wakes up idle capabilities for stealing) */
302 schedulePushWork(cap,task);
303
304 scheduleDetectDeadlock(&cap,task);
305
306 // Normally, the only way we can get here with no threads to
307 // run is if a keyboard interrupt received during
308 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
309 // Additionally, it is not fatal for the
310 // threaded RTS to reach here with no threads to run.
311 //
312 // win32: might be here due to awaitEvent() being abandoned
313 // as a result of a console event having been delivered.
314
315 #if defined(THREADED_RTS)
316 if (first)
317 {
318 // XXX: ToDo
319 // // don't yield the first time, we want a chance to run this
320 // // thread for a bit, even if there are others banging at the
321 // // door.
322 // first = rtsFalse;
323 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
324 }
325
326 scheduleYield(&cap,task);
327
328 if (emptyRunQueue(cap)) continue; // look for work again
329 #endif
330
331 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
332 if ( emptyRunQueue(cap) ) {
333 ASSERT(sched_state >= SCHED_INTERRUPTING);
334 }
335 #endif
336
337 //
338 // Get a thread to run
339 //
340 t = popRunQueue(cap);
341
342 // Sanity check the thread we're about to run. This can be
343 // expensive if there is lots of thread switching going on...
344 IF_DEBUG(sanity,checkTSO(t));
345
346 #if defined(THREADED_RTS)
347 // Check whether we can run this thread in the current task.
348 // If not, we have to pass our capability to the right task.
349 {
350 InCall *bound = t->bound;
351
352 if (bound) {
353 if (bound->task == task) {
354 // yes, the Haskell thread is bound to the current native thread
355 } else {
356 debugTrace(DEBUG_sched,
357 "thread %lu bound to another OS thread",
358 (unsigned long)t->id);
359 // no, bound to a different Haskell thread: pass to that thread
360 pushOnRunQueue(cap,t);
361 continue;
362 }
363 } else {
364 // The thread we want to run is unbound.
365 if (task->incall->tso) {
366 debugTrace(DEBUG_sched,
367 "this OS thread cannot run thread %lu",
368 (unsigned long)t->id);
369 // no, the current native thread is bound to a different
370 // Haskell thread, so pass it to any worker thread
371 pushOnRunQueue(cap,t);
372 continue;
373 }
374 }
375 }
376 #endif
377
378 // If we're shutting down, and this thread has not yet been
379 // killed, kill it now. This sometimes happens when a finalizer
380 // thread is created by the final GC, or a thread previously
381 // in a foreign call returns.
382 if (sched_state >= SCHED_INTERRUPTING &&
383 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
384 deleteThread(cap,t);
385 }
386
387 // If this capability is disabled, migrate the thread away rather
388 // than running it. NB. but not if the thread is bound: it is
389 // really hard for a bound thread to migrate itself. Believe me,
390 // I tried several ways and couldn't find a way to do it.
391 // Instead, when everything is stopped for GC, we migrate all the
392 // threads on the run queue then (see scheduleDoGC()).
393 //
394 // ToDo: what about TSO_LOCKED? Currently we're migrating those
395 // when the number of capabilities drops, but we never migrate
396 // them back if it rises again. Presumably we should, but after
397 // the thread has been migrated we no longer know what capability
398 // it was originally on.
399 #ifdef THREADED_RTS
400 if (cap->disabled && !t->bound) {
401 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
402 migrateThread(cap, t, dest_cap);
403 continue;
404 }
405 #endif
406
407 /* context switches are initiated by the timer signal, unless
408 * the user specified "context switch as often as possible", with
409 * +RTS -C0
410 */
411 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
412 && !emptyThreadQueues(cap)) {
413 cap->context_switch = 1;
414 }
415
416 run_thread:
417
418 // CurrentTSO is the thread to run. t might be different if we
419 // loop back to run_thread, so make sure to set CurrentTSO after
420 // that.
421 cap->r.rCurrentTSO = t;
422
423 startHeapProfTimer();
424
425 // ----------------------------------------------------------------------
426 // Run the current thread
427
428 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
429 ASSERT(t->cap == cap);
430 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
431
432 prev_what_next = t->what_next;
433
434 errno = t->saved_errno;
435 #if mingw32_HOST_OS
436 SetLastError(t->saved_winerror);
437 #endif
438
439 // reset the interrupt flag before running Haskell code
440 cap->interrupt = 0;
441
442 cap->in_haskell = rtsTrue;
443 cap->idle = 0;
444
445 dirty_TSO(cap,t);
446 dirty_STACK(cap,t->stackobj);
447
448 switch (recent_activity)
449 {
450 case ACTIVITY_DONE_GC: {
451 // ACTIVITY_DONE_GC means we turned off the timer signal to
452 // conserve power (see #1623). Re-enable it here.
453 nat prev;
454 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
455 if (prev == ACTIVITY_DONE_GC) {
456 #ifndef PROFILING
457 startTimer();
458 #endif
459 }
460 break;
461 }
462 case ACTIVITY_INACTIVE:
463 // If we reached ACTIVITY_INACTIVE, then don't reset it until
464 // we've done the GC. The thread running here might just be
465 // the IO manager thread that handle_tick() woke up via
466 // wakeUpRts().
467 break;
468 default:
469 recent_activity = ACTIVITY_YES;
470 }
471
472 traceEventRunThread(cap, t);
473
474 switch (prev_what_next) {
475
476 case ThreadKilled:
477 case ThreadComplete:
478 /* Thread already finished, return to scheduler. */
479 ret = ThreadFinished;
480 break;
481
482 case ThreadRunGHC:
483 {
484 StgRegTable *r;
485 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
486 cap = regTableToCapability(r);
487 ret = r->rRet;
488 break;
489 }
490
491 case ThreadInterpret:
492 cap = interpretBCO(cap);
493 ret = cap->r.rRet;
494 break;
495
496 default:
497 barf("schedule: invalid what_next field");
498 }
499
500 cap->in_haskell = rtsFalse;
501
502 // The TSO might have moved, eg. if it re-entered the RTS and a GC
503 // happened. So find the new location:
504 t = cap->r.rCurrentTSO;
505
506 // And save the current errno in this thread.
507 // XXX: possibly bogus for SMP because this thread might already
508 // be running again, see code below.
509 t->saved_errno = errno;
510 #if mingw32_HOST_OS
511 // Similarly for Windows error code
512 t->saved_winerror = GetLastError();
513 #endif
514
515 if (ret == ThreadBlocked) {
516 if (t->why_blocked == BlockedOnBlackHole) {
517 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
518 traceEventStopThread(cap, t, t->why_blocked + 6,
519 owner != NULL ? owner->id : 0);
520 } else {
521 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
522 }
523 } else {
524 traceEventStopThread(cap, t, ret, 0);
525 }
526
527 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
528 ASSERT(t->cap == cap);
529
530 // ----------------------------------------------------------------------
531
532 // Costs for the scheduler are assigned to CCS_SYSTEM
533 stopHeapProfTimer();
534 #if defined(PROFILING)
535 cap->r.rCCCS = CCS_SYSTEM;
536 #endif
537
538 schedulePostRunThread(cap,t);
539
540 ready_to_gc = rtsFalse;
541
542 switch (ret) {
543 case HeapOverflow:
544 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
545 break;
546
547 case StackOverflow:
548 // just adjust the stack for this thread, then pop it back
549 // on the run queue.
550 threadStackOverflow(cap, t);
551 pushOnRunQueue(cap,t);
552 break;
553
554 case ThreadYielding:
555 if (scheduleHandleYield(cap, t, prev_what_next)) {
556 // shortcut for switching between compiler/interpreter:
557 goto run_thread;
558 }
559 break;
560
561 case ThreadBlocked:
562 scheduleHandleThreadBlocked(t);
563 break;
564
565 case ThreadFinished:
566 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
567 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
568 break;
569
570 default:
571 barf("schedule: invalid thread return code %d", (int)ret);
572 }
573
574 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
575 scheduleDoGC(&cap,task,rtsFalse);
576 }
577 } /* end of while() */
578 }
579
580 /* -----------------------------------------------------------------------------
581 * Run queue operations
582 * -------------------------------------------------------------------------- */
583
584 void
585 removeFromRunQueue (Capability *cap, StgTSO *tso)
586 {
587 if (tso->block_info.prev == END_TSO_QUEUE) {
588 ASSERT(cap->run_queue_hd == tso);
589 cap->run_queue_hd = tso->_link;
590 } else {
591 setTSOLink(cap, tso->block_info.prev, tso->_link);
592 }
593 if (tso->_link == END_TSO_QUEUE) {
594 ASSERT(cap->run_queue_tl == tso);
595 cap->run_queue_tl = tso->block_info.prev;
596 } else {
597 setTSOPrev(cap, tso->_link, tso->block_info.prev);
598 }
599 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
600
601 IF_DEBUG(sanity, checkRunQueue(cap));
602 }
603
604 /* ----------------------------------------------------------------------------
605 * Setting up the scheduler loop
606 * ------------------------------------------------------------------------- */
607
608 static void
609 schedulePreLoop(void)
610 {
611 // initialisation for scheduler - what cannot go into initScheduler()
612
613 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
614 win32AllocStack();
615 #endif
616 }
617
618 /* -----------------------------------------------------------------------------
619 * scheduleFindWork()
620 *
621 * Search for work to do, and handle messages from elsewhere.
622 * -------------------------------------------------------------------------- */
623
624 static void
625 scheduleFindWork (Capability **pcap)
626 {
627 scheduleStartSignalHandlers(*pcap);
628
629 scheduleProcessInbox(pcap);
630
631 scheduleCheckBlockedThreads(*pcap);
632
633 #if defined(THREADED_RTS)
634 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
635 #endif
636 }
637
638 #if defined(THREADED_RTS)
639 STATIC_INLINE rtsBool
640 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
641 {
642 // we need to yield this capability to someone else if..
643 // - another thread is initiating a GC, and we didn't just do a GC
644 // (see Note [GC livelock])
645 // - another Task is returning from a foreign call
646 // - the thread at the head of the run queue cannot be run
647 // by this Task (it is bound to another Task, or it is unbound
648 // and this task it bound).
649 //
650 // Note [GC livelock]
651 //
652 // If we are interrupted to do a GC, then we do not immediately do
653 // another one. This avoids a starvation situation where one
654 // Capability keeps forcing a GC and the other Capabilities make no
655 // progress at all.
656
657 return ((pending_sync && !didGcLast) ||
658 cap->returning_tasks_hd != NULL ||
659 (!emptyRunQueue(cap) && (task->incall->tso == NULL
660 ? cap->run_queue_hd->bound != NULL
661 : cap->run_queue_hd->bound != task->incall)));
662 }
663
664 // This is the single place where a Task goes to sleep. There are
665 // two reasons it might need to sleep:
666 // - there are no threads to run
667 // - we need to yield this Capability to someone else
668 // (see shouldYieldCapability())
669 //
670 // Careful: the scheduler loop is quite delicate. Make sure you run
671 // the tests in testsuite/concurrent (all ways) after modifying this,
672 // and also check the benchmarks in nofib/parallel for regressions.
673
674 static void
675 scheduleYield (Capability **pcap, Task *task)
676 {
677 Capability *cap = *pcap;
678 int didGcLast = rtsFalse;
679
680 // if we have work, and we don't need to give up the Capability, continue.
681 //
682 if (!shouldYieldCapability(cap,task,rtsFalse) &&
683 (!emptyRunQueue(cap) ||
684 !emptyInbox(cap) ||
685 sched_state >= SCHED_INTERRUPTING)) {
686 return;
687 }
688
689 // otherwise yield (sleep), and keep yielding if necessary.
690 do {
691 didGcLast = yieldCapability(&cap,task, !didGcLast);
692 }
693 while (shouldYieldCapability(cap,task,didGcLast));
694
695 // note there may still be no threads on the run queue at this
696 // point, the caller has to check.
697
698 *pcap = cap;
699 return;
700 }
701 #endif
702
703 /* -----------------------------------------------------------------------------
704 * schedulePushWork()
705 *
706 * Push work to other Capabilities if we have some.
707 * -------------------------------------------------------------------------- */
708
709 static void
710 schedulePushWork(Capability *cap USED_IF_THREADS,
711 Task *task USED_IF_THREADS)
712 {
713 /* following code not for PARALLEL_HASKELL. I kept the call general,
714 future GUM versions might use pushing in a distributed setup */
715 #if defined(THREADED_RTS)
716
717 Capability *free_caps[n_capabilities], *cap0;
718 nat i, n_free_caps;
719
720 // migration can be turned off with +RTS -qm
721 if (!RtsFlags.ParFlags.migrate) return;
722
723 // Check whether we have more threads on our run queue, or sparks
724 // in our pool, that we could hand to another Capability.
725 if (cap->run_queue_hd == END_TSO_QUEUE) {
726 if (sparkPoolSizeCap(cap) < 2) return;
727 } else {
728 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
729 sparkPoolSizeCap(cap) < 1) return;
730 }
731
732 // First grab as many free Capabilities as we can.
733 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
734 cap0 = &capabilities[i];
735 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
736 if (!emptyRunQueue(cap0)
737 || cap0->returning_tasks_hd != NULL
738 || cap0->inbox != (Message*)END_TSO_QUEUE) {
739 // it already has some work, we just grabbed it at
740 // the wrong moment. Or maybe it's deadlocked!
741 releaseCapability(cap0);
742 } else {
743 free_caps[n_free_caps++] = cap0;
744 }
745 }
746 }
747
748 // we now have n_free_caps free capabilities stashed in
749 // free_caps[]. Share our run queue equally with them. This is
750 // probably the simplest thing we could do; improvements we might
751 // want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761 #ifdef SPARK_PUSHING
762 rtsBool pushed_to_all;
763 #endif
764
765 debugTrace(DEBUG_sched,
766 "cap %d: %s and %d free capabilities, sharing...",
767 cap->no,
768 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
769 "excess threads on run queue":"sparks to share (>=2)",
770 n_free_caps);
771
772 i = 0;
773 #ifdef SPARK_PUSHING
774 pushed_to_all = rtsFalse;
775 #endif
776
777 if (cap->run_queue_hd != END_TSO_QUEUE) {
778 prev = cap->run_queue_hd;
779 t = prev->_link;
780 prev->_link = END_TSO_QUEUE;
781 for (; t != END_TSO_QUEUE; t = next) {
782 next = t->_link;
783 t->_link = END_TSO_QUEUE;
784 if (t->bound == task->incall // don't move my bound thread
785 || tsoLocked(t)) { // don't move a locked thread
786 setTSOLink(cap, prev, t);
787 setTSOPrev(cap, t, prev);
788 prev = t;
789 } else if (i == n_free_caps) {
790 #ifdef SPARK_PUSHING
791 pushed_to_all = rtsTrue;
792 #endif
793 i = 0;
794 // keep one for us
795 setTSOLink(cap, prev, t);
796 setTSOPrev(cap, t, prev);
797 prev = t;
798 } else {
799 appendToRunQueue(free_caps[i],t);
800
801 traceEventMigrateThread (cap, t, free_caps[i]->no);
802
803 if (t->bound) { t->bound->task->cap = free_caps[i]; }
804 t->cap = free_caps[i];
805 i++;
806 }
807 }
808 cap->run_queue_tl = prev;
809
810 IF_DEBUG(sanity, checkRunQueue(cap));
811 }
812
813 #ifdef SPARK_PUSHING
814 /* JB I left this code in place, it would work but is not necessary */
815
816 // If there are some free capabilities that we didn't push any
817 // threads to, then try to push a spark to each one.
818 if (!pushed_to_all) {
819 StgClosure *spark;
820 // i is the next free capability to push to
821 for (; i < n_free_caps; i++) {
822 if (emptySparkPoolCap(free_caps[i])) {
823 spark = tryStealSpark(cap->sparks);
824 if (spark != NULL) {
825 /* TODO: if anyone wants to re-enable this code then
826 * they must consider the fizzledSpark(spark) case
827 * and update the per-cap spark statistics.
828 */
829 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
830
831 traceEventStealSpark(free_caps[i], t, cap->no);
832
833 newSpark(&(free_caps[i]->r), spark);
834 }
835 }
836 }
837 }
838 #endif /* SPARK_PUSHING */
839
840 // release the capabilities
841 for (i = 0; i < n_free_caps; i++) {
842 task->cap = free_caps[i];
843 releaseAndWakeupCapability(free_caps[i]);
844 }
845 }
846 task->cap = cap; // reset to point to our Capability.
847
848 #endif /* THREADED_RTS */
849
850 }
851
852 /* ----------------------------------------------------------------------------
853 * Start any pending signal handlers
854 * ------------------------------------------------------------------------- */
855
856 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
857 static void
858 scheduleStartSignalHandlers(Capability *cap)
859 {
860 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
861 // safe outside the lock
862 startSignalHandlers(cap);
863 }
864 }
865 #else
866 static void
867 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
868 {
869 }
870 #endif
871
872 /* ----------------------------------------------------------------------------
873 * Check for blocked threads that can be woken up.
874 * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
878 {
879 #if !defined(THREADED_RTS)
880 //
881 // Check whether any waiting threads need to be woken up. If the
882 // run queue is empty, and there are no other tasks running, we
883 // can wait indefinitely for something to happen.
884 //
885 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
886 {
887 awaitEvent (emptyRunQueue(cap));
888 }
889 #endif
890 }
891
892 /* ----------------------------------------------------------------------------
893 * Detect deadlock conditions and attempt to resolve them.
894 * ------------------------------------------------------------------------- */
895
896 static void
897 scheduleDetectDeadlock (Capability **pcap, Task *task)
898 {
899 Capability *cap = *pcap;
900 /*
901 * Detect deadlock: when we have no threads to run, there are no
902 * threads blocked, waiting for I/O, or sleeping, and all the
903 * other tasks are waiting for work, we must have a deadlock of
904 * some description.
905 */
906 if ( emptyThreadQueues(cap) )
907 {
908 #if defined(THREADED_RTS)
909 /*
910 * In the threaded RTS, we only check for deadlock if there
911 * has been no activity in a complete timeslice. This means
912 * we won't eagerly start a full GC just because we don't have
913 * any threads to run currently.
914 */
915 if (recent_activity != ACTIVITY_INACTIVE) return;
916 #endif
917
918 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
919
920 // Garbage collection can release some new threads due to
921 // either (a) finalizers or (b) threads resurrected because
922 // they are unreachable and will therefore be sent an
923 // exception. Any threads thus released will be immediately
924 // runnable.
925 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
926 cap = *pcap;
927 // when force_major == rtsTrue. scheduleDoGC sets
928 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
929 // signal.
930
931 if ( !emptyRunQueue(cap) ) return;
932
933 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
934 /* If we have user-installed signal handlers, then wait
935 * for signals to arrive rather then bombing out with a
936 * deadlock.
937 */
938 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
939 debugTrace(DEBUG_sched,
940 "still deadlocked, waiting for signals...");
941
942 awaitUserSignals();
943
944 if (signals_pending()) {
945 startSignalHandlers(cap);
946 }
947
948 // either we have threads to run, or we were interrupted:
949 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
950
951 return;
952 }
953 #endif
954
955 #if !defined(THREADED_RTS)
956 /* Probably a real deadlock. Send the current main thread the
957 * Deadlock exception.
958 */
959 if (task->incall->tso) {
960 switch (task->incall->tso->why_blocked) {
961 case BlockedOnSTM:
962 case BlockedOnBlackHole:
963 case BlockedOnMsgThrowTo:
964 case BlockedOnMVar:
965 throwToSingleThreaded(cap, task->incall->tso,
966 (StgClosure *)nonTermination_closure);
967 return;
968 default:
969 barf("deadlock: main thread blocked in a strange way");
970 }
971 }
972 return;
973 #endif
974 }
975 }
976
977
978 /* ----------------------------------------------------------------------------
979 * Send pending messages (PARALLEL_HASKELL only)
980 * ------------------------------------------------------------------------- */
981
982 #if defined(PARALLEL_HASKELL)
983 static void
984 scheduleSendPendingMessages(void)
985 {
986
987 # if defined(PAR) // global Mem.Mgmt., omit for now
988 if (PendingFetches != END_BF_QUEUE) {
989 processFetches();
990 }
991 # endif
992
993 if (RtsFlags.ParFlags.BufferTime) {
994 // if we use message buffering, we must send away all message
995 // packets which have become too old...
996 sendOldBuffers();
997 }
998 }
999 #endif
1000
1001 /* ----------------------------------------------------------------------------
1002 * Process message in the current Capability's inbox
1003 * ------------------------------------------------------------------------- */
1004
1005 static void
1006 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
1007 {
1008 #if defined(THREADED_RTS)
1009 Message *m, *next;
1010 int r;
1011 Capability *cap = *pcap;
1012
1013 while (!emptyInbox(cap)) {
1014 if (cap->r.rCurrentNursery->link == NULL ||
1015 g0->n_new_large_words >= large_alloc_lim) {
1016 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1017 cap = *pcap;
1018 }
1019
1020 // don't use a blocking acquire; if the lock is held by
1021 // another thread then just carry on. This seems to avoid
1022 // getting stuck in a message ping-pong situation with other
1023 // processors. We'll check the inbox again later anyway.
1024 //
1025 // We should really use a more efficient queue data structure
1026 // here. The trickiness is that we must ensure a Capability
1027 // never goes idle if the inbox is non-empty, which is why we
1028 // use cap->lock (cap->lock is released as the last thing
1029 // before going idle; see Capability.c:releaseCapability()).
1030 r = TRY_ACQUIRE_LOCK(&cap->lock);
1031 if (r != 0) return;
1032
1033 m = cap->inbox;
1034 cap->inbox = (Message*)END_TSO_QUEUE;
1035
1036 RELEASE_LOCK(&cap->lock);
1037
1038 while (m != (Message*)END_TSO_QUEUE) {
1039 next = m->link;
1040 executeMessage(cap, m);
1041 m = next;
1042 }
1043 }
1044 #endif
1045 }
1046
1047 /* ----------------------------------------------------------------------------
1048 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1049 * ------------------------------------------------------------------------- */
1050
1051 #if defined(THREADED_RTS)
1052 static void
1053 scheduleActivateSpark(Capability *cap)
1054 {
1055 if (anySparks() && !cap->disabled)
1056 {
1057 createSparkThread(cap);
1058 debugTrace(DEBUG_sched, "creating a spark thread");
1059 }
1060 }
1061 #endif // PARALLEL_HASKELL || THREADED_RTS
1062
1063 /* ----------------------------------------------------------------------------
1064 * After running a thread...
1065 * ------------------------------------------------------------------------- */
1066
1067 static void
1068 schedulePostRunThread (Capability *cap, StgTSO *t)
1069 {
1070 // We have to be able to catch transactions that are in an
1071 // infinite loop as a result of seeing an inconsistent view of
1072 // memory, e.g.
1073 //
1074 // atomically $ do
1075 // [a,b] <- mapM readTVar [ta,tb]
1076 // when (a == b) loop
1077 //
1078 // and a is never equal to b given a consistent view of memory.
1079 //
1080 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1081 if (!stmValidateNestOfTransactions (t -> trec)) {
1082 debugTrace(DEBUG_sched | DEBUG_stm,
1083 "trec %p found wasting its time", t);
1084
1085 // strip the stack back to the
1086 // ATOMICALLY_FRAME, aborting the (nested)
1087 // transaction, and saving the stack of any
1088 // partially-evaluated thunks on the heap.
1089 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1090
1091 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1092 }
1093 }
1094
1095 /* some statistics gathering in the parallel case */
1096 }
1097
1098 /* -----------------------------------------------------------------------------
1099 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1100 * -------------------------------------------------------------------------- */
1101
1102 static rtsBool
1103 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1104 {
1105 // did the task ask for a large block?
1106 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1107 // if so, get one and push it on the front of the nursery.
1108 bdescr *bd;
1109 W_ blocks;
1110
1111 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1112
1113 if (blocks > BLOCKS_PER_MBLOCK) {
1114 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1115 }
1116
1117 debugTrace(DEBUG_sched,
1118 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1119 (long)t->id, what_next_strs[t->what_next], blocks);
1120
1121 // don't do this if the nursery is (nearly) full, we'll GC first.
1122 if (cap->r.rCurrentNursery->link != NULL ||
1123 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1124 // if the nursery has only one block.
1125
1126 bd = allocGroup_lock(blocks);
1127 cap->r.rNursery->n_blocks += blocks;
1128
1129 // link the new group into the list
1130 bd->link = cap->r.rCurrentNursery;
1131 bd->u.back = cap->r.rCurrentNursery->u.back;
1132 if (cap->r.rCurrentNursery->u.back != NULL) {
1133 cap->r.rCurrentNursery->u.back->link = bd;
1134 } else {
1135 cap->r.rNursery->blocks = bd;
1136 }
1137 cap->r.rCurrentNursery->u.back = bd;
1138
1139 // initialise it as a nursery block. We initialise the
1140 // step, gen_no, and flags field of *every* sub-block in
1141 // this large block, because this is easier than making
1142 // sure that we always find the block head of a large
1143 // block whenever we call Bdescr() (eg. evacuate() and
1144 // isAlive() in the GC would both have to do this, at
1145 // least).
1146 {
1147 bdescr *x;
1148 for (x = bd; x < bd + blocks; x++) {
1149 initBdescr(x,g0,g0);
1150 x->free = x->start;
1151 x->flags = 0;
1152 }
1153 }
1154
1155 // This assert can be a killer if the app is doing lots
1156 // of large block allocations.
1157 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1158
1159 // now update the nursery to point to the new block
1160 cap->r.rCurrentNursery = bd;
1161
1162 // we might be unlucky and have another thread get on the
1163 // run queue before us and steal the large block, but in that
1164 // case the thread will just end up requesting another large
1165 // block.
1166 pushOnRunQueue(cap,t);
1167 return rtsFalse; /* not actually GC'ing */
1168 }
1169 }
1170
1171 if (cap->r.rHpLim == NULL || cap->context_switch) {
1172 // Sometimes we miss a context switch, e.g. when calling
1173 // primitives in a tight loop, MAYBE_GC() doesn't check the
1174 // context switch flag, and we end up waiting for a GC.
1175 // See #1984, and concurrent/should_run/1984
1176 cap->context_switch = 0;
1177 appendToRunQueue(cap,t);
1178 } else {
1179 pushOnRunQueue(cap,t);
1180 }
1181 return rtsTrue;
1182 /* actual GC is done at the end of the while loop in schedule() */
1183 }
1184
1185 /* -----------------------------------------------------------------------------
1186 * Handle a thread that returned to the scheduler with ThreadYielding
1187 * -------------------------------------------------------------------------- */
1188
1189 static rtsBool
1190 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1191 {
1192 /* put the thread back on the run queue. Then, if we're ready to
1193 * GC, check whether this is the last task to stop. If so, wake
1194 * up the GC thread. getThread will block during a GC until the
1195 * GC is finished.
1196 */
1197
1198 ASSERT(t->_link == END_TSO_QUEUE);
1199
1200 // Shortcut if we're just switching evaluators: don't bother
1201 // doing stack squeezing (which can be expensive), just run the
1202 // thread.
1203 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1204 debugTrace(DEBUG_sched,
1205 "--<< thread %ld (%s) stopped to switch evaluators",
1206 (long)t->id, what_next_strs[t->what_next]);
1207 return rtsTrue;
1208 }
1209
1210 // Reset the context switch flag. We don't do this just before
1211 // running the thread, because that would mean we would lose ticks
1212 // during GC, which can lead to unfair scheduling (a thread hogs
1213 // the CPU because the tick always arrives during GC). This way
1214 // penalises threads that do a lot of allocation, but that seems
1215 // better than the alternative.
1216 if (cap->context_switch != 0) {
1217 cap->context_switch = 0;
1218 appendToRunQueue(cap,t);
1219 } else {
1220 pushOnRunQueue(cap,t);
1221 }
1222
1223 IF_DEBUG(sanity,
1224 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1225 checkTSO(t));
1226
1227 return rtsFalse;
1228 }
1229
1230 /* -----------------------------------------------------------------------------
1231 * Handle a thread that returned to the scheduler with ThreadBlocked
1232 * -------------------------------------------------------------------------- */
1233
1234 static void
1235 scheduleHandleThreadBlocked( StgTSO *t
1236 #if !defined(DEBUG)
1237 STG_UNUSED
1238 #endif
1239 )
1240 {
1241
1242 // We don't need to do anything. The thread is blocked, and it
1243 // has tidied up its stack and placed itself on whatever queue
1244 // it needs to be on.
1245
1246 // ASSERT(t->why_blocked != NotBlocked);
1247 // Not true: for example,
1248 // - the thread may have woken itself up already, because
1249 // threadPaused() might have raised a blocked throwTo
1250 // exception, see maybePerformBlockedException().
1251
1252 #ifdef DEBUG
1253 traceThreadStatus(DEBUG_sched, t);
1254 #endif
1255 }
1256
1257 /* -----------------------------------------------------------------------------
1258 * Handle a thread that returned to the scheduler with ThreadFinished
1259 * -------------------------------------------------------------------------- */
1260
1261 static rtsBool
1262 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1263 {
1264 /* Need to check whether this was a main thread, and if so,
1265 * return with the return value.
1266 *
1267 * We also end up here if the thread kills itself with an
1268 * uncaught exception, see Exception.cmm.
1269 */
1270
1271 // blocked exceptions can now complete, even if the thread was in
1272 // blocked mode (see #2910).
1273 awakenBlockedExceptionQueue (cap, t);
1274
1275 //
1276 // Check whether the thread that just completed was a bound
1277 // thread, and if so return with the result.
1278 //
1279 // There is an assumption here that all thread completion goes
1280 // through this point; we need to make sure that if a thread
1281 // ends up in the ThreadKilled state, that it stays on the run
1282 // queue so it can be dealt with here.
1283 //
1284
1285 if (t->bound) {
1286
1287 if (t->bound != task->incall) {
1288 #if !defined(THREADED_RTS)
1289 // Must be a bound thread that is not the topmost one. Leave
1290 // it on the run queue until the stack has unwound to the
1291 // point where we can deal with this. Leaving it on the run
1292 // queue also ensures that the garbage collector knows about
1293 // this thread and its return value (it gets dropped from the
1294 // step->threads list so there's no other way to find it).
1295 appendToRunQueue(cap,t);
1296 return rtsFalse;
1297 #else
1298 // this cannot happen in the threaded RTS, because a
1299 // bound thread can only be run by the appropriate Task.
1300 barf("finished bound thread that isn't mine");
1301 #endif
1302 }
1303
1304 ASSERT(task->incall->tso == t);
1305
1306 if (t->what_next == ThreadComplete) {
1307 if (task->incall->ret) {
1308 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1309 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1310 }
1311 task->incall->stat = Success;
1312 } else {
1313 if (task->incall->ret) {
1314 *(task->incall->ret) = NULL;
1315 }
1316 if (sched_state >= SCHED_INTERRUPTING) {
1317 if (heap_overflow) {
1318 task->incall->stat = HeapExhausted;
1319 } else {
1320 task->incall->stat = Interrupted;
1321 }
1322 } else {
1323 task->incall->stat = Killed;
1324 }
1325 }
1326 #ifdef DEBUG
1327 removeThreadLabel((StgWord)task->incall->tso->id);
1328 #endif
1329
1330 // We no longer consider this thread and task to be bound to
1331 // each other. The TSO lives on until it is GC'd, but the
1332 // task is about to be released by the caller, and we don't
1333 // want anyone following the pointer from the TSO to the
1334 // defunct task (which might have already been
1335 // re-used). This was a real bug: the GC updated
1336 // tso->bound->tso which lead to a deadlock.
1337 t->bound = NULL;
1338 task->incall->tso = NULL;
1339
1340 return rtsTrue; // tells schedule() to return
1341 }
1342
1343 return rtsFalse;
1344 }
1345
1346 /* -----------------------------------------------------------------------------
1347 * Perform a heap census
1348 * -------------------------------------------------------------------------- */
1349
1350 static rtsBool
1351 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1352 {
1353 // When we have +RTS -i0 and we're heap profiling, do a census at
1354 // every GC. This lets us get repeatable runs for debugging.
1355 if (performHeapProfile ||
1356 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1357 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1358 return rtsTrue;
1359 } else {
1360 return rtsFalse;
1361 }
1362 }
1363
1364 /* -----------------------------------------------------------------------------
1365 * Start a synchronisation of all capabilities
1366 * -------------------------------------------------------------------------- */
1367
1368 // Returns:
1369 // 0 if we successfully got a sync
1370 // non-0 if there was another sync request in progress,
1371 // and we yielded to it. The value returned is the
1372 // type of the other sync request.
1373 //
1374 #if defined(THREADED_RTS)
1375 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1376 {
1377 nat prev_pending_sync;
1378
1379 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1380
1381 if (prev_pending_sync)
1382 {
1383 do {
1384 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1385 prev_pending_sync);
1386 ASSERT(*pcap);
1387 yieldCapability(pcap,task,rtsTrue);
1388 } while (pending_sync);
1389 return prev_pending_sync; // NOTE: task->cap might have changed now
1390 }
1391 else
1392 {
1393 return 0;
1394 }
1395 }
1396
1397 //
1398 // Grab all the capabilities except the one we already hold. Used
1399 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1400 // before a fork (SYNC_OTHER).
1401 //
1402 // Only call this after requestSync(), otherwise a deadlock might
1403 // ensue if another thread is trying to synchronise.
1404 //
1405 static void acquireAllCapabilities(Capability *cap, Task *task)
1406 {
1407 Capability *tmpcap;
1408 nat i;
1409
1410 for (i=0; i < n_capabilities; i++) {
1411 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1412 tmpcap = &capabilities[i];
1413 if (tmpcap != cap) {
1414 // we better hope this task doesn't get migrated to
1415 // another Capability while we're waiting for this one.
1416 // It won't, because load balancing happens while we have
1417 // all the Capabilities, but even so it's a slightly
1418 // unsavoury invariant.
1419 task->cap = tmpcap;
1420 waitForReturnCapability(&tmpcap, task);
1421 if (tmpcap->no != i) {
1422 barf("acquireAllCapabilities: got the wrong capability");
1423 }
1424 }
1425 }
1426 task->cap = cap;
1427 }
1428
1429 static void releaseAllCapabilities(Capability *cap, Task *task)
1430 {
1431 nat i;
1432
1433 for (i = 0; i < n_capabilities; i++) {
1434 if (cap->no != i) {
1435 task->cap = &capabilities[i];
1436 releaseCapability(&capabilities[i]);
1437 }
1438 }
1439 task->cap = cap;
1440 }
1441 #endif
1442
1443 /* -----------------------------------------------------------------------------
1444 * Perform a garbage collection if necessary
1445 * -------------------------------------------------------------------------- */
1446
1447 static void
1448 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1449 rtsBool force_major)
1450 {
1451 Capability *cap = *pcap;
1452 rtsBool heap_census;
1453 nat collect_gen;
1454 #ifdef THREADED_RTS
1455 rtsBool idle_cap[n_capabilities];
1456 rtsBool gc_type;
1457 nat i, sync;
1458 StgTSO *tso;
1459 #endif
1460
1461 if (sched_state == SCHED_SHUTTING_DOWN) {
1462 // The final GC has already been done, and the system is
1463 // shutting down. We'll probably deadlock if we try to GC
1464 // now.
1465 return;
1466 }
1467
1468 heap_census = scheduleNeedHeapProfile(rtsTrue);
1469
1470 // Figure out which generation we are collecting, so that we can
1471 // decide whether this is a parallel GC or not.
1472 collect_gen = calcNeeded(force_major || heap_census, NULL);
1473
1474 #ifdef THREADED_RTS
1475 if (sched_state < SCHED_INTERRUPTING
1476 && RtsFlags.ParFlags.parGcEnabled
1477 && collect_gen >= RtsFlags.ParFlags.parGcGen
1478 && ! oldest_gen->mark)
1479 {
1480 gc_type = SYNC_GC_PAR;
1481 } else {
1482 gc_type = SYNC_GC_SEQ;
1483 }
1484
1485 // In order to GC, there must be no threads running Haskell code.
1486 // Therefore, the GC thread needs to hold *all* the capabilities,
1487 // and release them after the GC has completed.
1488 //
1489 // This seems to be the simplest way: previous attempts involved
1490 // making all the threads with capabilities give up their
1491 // capabilities and sleep except for the *last* one, which
1492 // actually did the GC. But it's quite hard to arrange for all
1493 // the other tasks to sleep and stay asleep.
1494 //
1495
1496 /* Other capabilities are prevented from running yet more Haskell
1497 threads if pending_sync is set. Tested inside
1498 yieldCapability() and releaseCapability() in Capability.c */
1499
1500 do {
1501 sync = requestSync(pcap, task, gc_type);
1502 cap = *pcap;
1503 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1504 // someone else had a pending sync request for a GC, so
1505 // let's assume GC has been done and we don't need to GC
1506 // again.
1507 return;
1508 }
1509 if (sched_state == SCHED_SHUTTING_DOWN) {
1510 // The scheduler might now be shutting down. We tested
1511 // this above, but it might have become true since then as
1512 // we yielded the capability in requestSync().
1513 return;
1514 }
1515 } while (sync);
1516
1517 interruptAllCapabilities();
1518
1519 // The final shutdown GC is always single-threaded, because it's
1520 // possible that some of the Capabilities have no worker threads.
1521
1522 if (gc_type == SYNC_GC_SEQ)
1523 {
1524 traceEventRequestSeqGc(cap);
1525 }
1526 else
1527 {
1528 traceEventRequestParGc(cap);
1529 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1530 }
1531
1532 if (gc_type == SYNC_GC_SEQ)
1533 {
1534 // single-threaded GC: grab all the capabilities
1535 acquireAllCapabilities(cap,task);
1536 }
1537 else
1538 {
1539 // If we are load-balancing collections in this
1540 // generation, then we require all GC threads to participate
1541 // in the collection. Otherwise, we only require active
1542 // threads to participate, and we set gc_threads[i]->idle for
1543 // any idle capabilities. The rationale here is that waking
1544 // up an idle Capability takes much longer than just doing any
1545 // GC work on its behalf.
1546
1547 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1548 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1549 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1550 for (i=0; i < n_capabilities; i++) {
1551 if (capabilities[i].disabled) {
1552 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1553 } else {
1554 idle_cap[i] = rtsFalse;
1555 }
1556 }
1557 } else {
1558 for (i=0; i < n_capabilities; i++) {
1559 if (capabilities[i].disabled) {
1560 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1561 } else if (i == cap->no ||
1562 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1563 idle_cap[i] = rtsFalse;
1564 } else {
1565 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1566 if (!idle_cap[i]) {
1567 n_failed_trygrab_idles++;
1568 } else {
1569 n_idle_caps++;
1570 }
1571 }
1572 }
1573 }
1574
1575 // We set the gc_thread[i]->idle flag if that
1576 // capability/thread is not participating in this collection.
1577 // We also keep a local record of which capabilities are idle
1578 // in idle_cap[], because scheduleDoGC() is re-entrant:
1579 // another thread might start a GC as soon as we've finished
1580 // this one, and thus the gc_thread[]->idle flags are invalid
1581 // as soon as we release any threads after GC. Getting this
1582 // wrong leads to a rare and hard to debug deadlock!
1583
1584 for (i=0; i < n_capabilities; i++) {
1585 gc_threads[i]->idle = idle_cap[i];
1586 capabilities[i].idle++;
1587 }
1588
1589 // For all capabilities participating in this GC, wait until
1590 // they have stopped mutating and are standing by for GC.
1591 waitForGcThreads(cap);
1592
1593 #if defined(THREADED_RTS)
1594 // Stable point where we can do a global check on our spark counters
1595 ASSERT(checkSparkCountInvariant());
1596 #endif
1597 }
1598
1599 #endif
1600
1601 IF_DEBUG(scheduler, printAllThreads());
1602
1603 delete_threads_and_gc:
1604 /*
1605 * We now have all the capabilities; if we're in an interrupting
1606 * state, then we should take the opportunity to delete all the
1607 * threads in the system.
1608 */
1609 if (sched_state == SCHED_INTERRUPTING) {
1610 deleteAllThreads(cap);
1611 #if defined(THREADED_RTS)
1612 // Discard all the sparks from every Capability. Why?
1613 // They'll probably be GC'd anyway since we've killed all the
1614 // threads. It just avoids the GC having to do any work to
1615 // figure out that any remaining sparks are garbage.
1616 for (i = 0; i < n_capabilities; i++) {
1617 capabilities[i].spark_stats.gcd +=
1618 sparkPoolSize(capabilities[i].sparks);
1619 // No race here since all Caps are stopped.
1620 discardSparksCap(&capabilities[i]);
1621 }
1622 #endif
1623 sched_state = SCHED_SHUTTING_DOWN;
1624 }
1625
1626 /*
1627 * When there are disabled capabilities, we want to migrate any
1628 * threads away from them. Normally this happens in the
1629 * scheduler's loop, but only for unbound threads - it's really
1630 * hard for a bound thread to migrate itself. So we have another
1631 * go here.
1632 */
1633 #if defined(THREADED_RTS)
1634 for (i = enabled_capabilities; i < n_capabilities; i++) {
1635 Capability *tmp_cap, *dest_cap;
1636 tmp_cap = &capabilities[i];
1637 ASSERT(tmp_cap->disabled);
1638 if (i != cap->no) {
1639 dest_cap = &capabilities[i % enabled_capabilities];
1640 while (!emptyRunQueue(tmp_cap)) {
1641 tso = popRunQueue(tmp_cap);
1642 migrateThread(tmp_cap, tso, dest_cap);
1643 if (tso->bound) {
1644 traceTaskMigrate(tso->bound->task,
1645 tso->bound->task->cap,
1646 dest_cap);
1647 tso->bound->task->cap = dest_cap;
1648 }
1649 }
1650 }
1651 }
1652 #endif
1653
1654 #if defined(THREADED_RTS)
1655 // reset pending_sync *before* GC, so that when the GC threads
1656 // emerge they don't immediately re-enter the GC.
1657 pending_sync = 0;
1658 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1659 #else
1660 GarbageCollect(collect_gen, heap_census, 0, cap);
1661 #endif
1662
1663 traceSparkCounters(cap);
1664
1665 switch (recent_activity) {
1666 case ACTIVITY_INACTIVE:
1667 if (force_major) {
1668 // We are doing a GC because the system has been idle for a
1669 // timeslice and we need to check for deadlock. Record the
1670 // fact that we've done a GC and turn off the timer signal;
1671 // it will get re-enabled if we run any threads after the GC.
1672 recent_activity = ACTIVITY_DONE_GC;
1673 #ifndef PROFILING
1674 stopTimer();
1675 #endif
1676 break;
1677 }
1678 // fall through...
1679
1680 case ACTIVITY_MAYBE_NO:
1681 // the GC might have taken long enough for the timer to set
1682 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1683 // but we aren't necessarily deadlocked:
1684 recent_activity = ACTIVITY_YES;
1685 break;
1686
1687 case ACTIVITY_DONE_GC:
1688 // If we are actually active, the scheduler will reset the
1689 // recent_activity flag and re-enable the timer.
1690 break;
1691 }
1692
1693 #if defined(THREADED_RTS)
1694 // Stable point where we can do a global check on our spark counters
1695 ASSERT(checkSparkCountInvariant());
1696 #endif
1697
1698 // The heap census itself is done during GarbageCollect().
1699 if (heap_census) {
1700 performHeapProfile = rtsFalse;
1701 }
1702
1703 #if defined(THREADED_RTS)
1704 if (gc_type == SYNC_GC_PAR)
1705 {
1706 releaseGCThreads(cap);
1707 for (i = 0; i < n_capabilities; i++) {
1708 if (i != cap->no) {
1709 if (idle_cap[i]) {
1710 ASSERT(capabilities[i].running_task == task);
1711 task->cap = &capabilities[i];
1712 releaseCapability(&capabilities[i]);
1713 } else {
1714 ASSERT(capabilities[i].running_task != task);
1715 }
1716 }
1717 }
1718 task->cap = cap;
1719 }
1720 #endif
1721
1722 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1723 // GC set the heap_overflow flag, so we should proceed with
1724 // an orderly shutdown now. Ultimately we want the main
1725 // thread to return to its caller with HeapExhausted, at which
1726 // point the caller should call hs_exit(). The first step is
1727 // to delete all the threads.
1728 //
1729 // Another way to do this would be to raise an exception in
1730 // the main thread, which we really should do because it gives
1731 // the program a chance to clean up. But how do we find the
1732 // main thread? It should presumably be the same one that
1733 // gets ^C exceptions, but that's all done on the Haskell side
1734 // (GHC.TopHandler).
1735 sched_state = SCHED_INTERRUPTING;
1736 goto delete_threads_and_gc;
1737 }
1738
1739 #ifdef SPARKBALANCE
1740 /* JB
1741 Once we are all together... this would be the place to balance all
1742 spark pools. No concurrent stealing or adding of new sparks can
1743 occur. Should be defined in Sparks.c. */
1744 balanceSparkPoolsCaps(n_capabilities, capabilities);
1745 #endif
1746
1747 #if defined(THREADED_RTS)
1748 if (gc_type == SYNC_GC_SEQ) {
1749 // release our stash of capabilities.
1750 releaseAllCapabilities(cap, task);
1751 }
1752 #endif
1753
1754 return;
1755 }
1756
1757 /* ---------------------------------------------------------------------------
1758 * Singleton fork(). Do not copy any running threads.
1759 * ------------------------------------------------------------------------- */
1760
1761 pid_t
1762 forkProcess(HsStablePtr *entry
1763 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1764 STG_UNUSED
1765 #endif
1766 )
1767 {
1768 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1769 pid_t pid;
1770 StgTSO* t,*next;
1771 Capability *cap;
1772 nat g;
1773 Task *task = NULL;
1774 nat i;
1775 #ifdef THREADED_RTS
1776 nat sync;
1777 #endif
1778
1779 debugTrace(DEBUG_sched, "forking!");
1780
1781 task = newBoundTask();
1782
1783 cap = NULL;
1784 waitForReturnCapability(&cap, task);
1785
1786 #ifdef THREADED_RTS
1787 do {
1788 sync = requestSync(&cap, task, SYNC_OTHER);
1789 } while (sync);
1790
1791 acquireAllCapabilities(cap,task);
1792
1793 pending_sync = 0;
1794 #endif
1795
1796 // no funny business: hold locks while we fork, otherwise if some
1797 // other thread is holding a lock when the fork happens, the data
1798 // structure protected by the lock will forever be in an
1799 // inconsistent state in the child. See also #1391.
1800 ACQUIRE_LOCK(&sched_mutex);
1801 ACQUIRE_LOCK(&sm_mutex);
1802 ACQUIRE_LOCK(&stable_mutex);
1803 ACQUIRE_LOCK(&task->lock);
1804
1805 for (i=0; i < n_capabilities; i++) {
1806 ACQUIRE_LOCK(&capabilities[i].lock);
1807 }
1808
1809 stopTimer(); // See #4074
1810
1811 #if defined(TRACING)
1812 flushEventLog(); // so that child won't inherit dirty file buffers
1813 #endif
1814
1815 pid = fork();
1816
1817 if (pid) { // parent
1818
1819 startTimer(); // #4074
1820
1821 RELEASE_LOCK(&sched_mutex);
1822 RELEASE_LOCK(&sm_mutex);
1823 RELEASE_LOCK(&stable_mutex);
1824 RELEASE_LOCK(&task->lock);
1825
1826 for (i=0; i < n_capabilities; i++) {
1827 releaseCapability_(&capabilities[i],rtsFalse);
1828 RELEASE_LOCK(&capabilities[i].lock);
1829 }
1830 boundTaskExiting(task);
1831
1832 // just return the pid
1833 return pid;
1834
1835 } else { // child
1836
1837 #if defined(THREADED_RTS)
1838 initMutex(&sched_mutex);
1839 initMutex(&sm_mutex);
1840 initMutex(&stable_mutex);
1841 initMutex(&task->lock);
1842
1843 for (i=0; i < n_capabilities; i++) {
1844 initMutex(&capabilities[i].lock);
1845 }
1846 #endif
1847
1848 #ifdef TRACING
1849 resetTracing();
1850 #endif
1851
1852 // Now, all OS threads except the thread that forked are
1853 // stopped. We need to stop all Haskell threads, including
1854 // those involved in foreign calls. Also we need to delete
1855 // all Tasks, because they correspond to OS threads that are
1856 // now gone.
1857
1858 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1859 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1860 next = t->global_link;
1861 // don't allow threads to catch the ThreadKilled
1862 // exception, but we do want to raiseAsync() because these
1863 // threads may be evaluating thunks that we need later.
1864 deleteThread_(t->cap,t);
1865
1866 // stop the GC from updating the InCall to point to
1867 // the TSO. This is only necessary because the
1868 // OSThread bound to the TSO has been killed, and
1869 // won't get a chance to exit in the usual way (see
1870 // also scheduleHandleThreadFinished).
1871 t->bound = NULL;
1872 }
1873 }
1874
1875 discardTasksExcept(task);
1876
1877 for (i=0; i < n_capabilities; i++) {
1878 cap = &capabilities[i];
1879
1880 // Empty the run queue. It seems tempting to let all the
1881 // killed threads stay on the run queue as zombies to be
1882 // cleaned up later, but some of them may correspond to
1883 // bound threads for which the corresponding Task does not
1884 // exist.
1885 cap->run_queue_hd = END_TSO_QUEUE;
1886 cap->run_queue_tl = END_TSO_QUEUE;
1887
1888 // Any suspended C-calling Tasks are no more, their OS threads
1889 // don't exist now:
1890 cap->suspended_ccalls = NULL;
1891
1892 #if defined(THREADED_RTS)
1893 // Wipe our spare workers list, they no longer exist. New
1894 // workers will be created if necessary.
1895 cap->spare_workers = NULL;
1896 cap->n_spare_workers = 0;
1897 cap->returning_tasks_hd = NULL;
1898 cap->returning_tasks_tl = NULL;
1899 #endif
1900
1901 // Release all caps except 0, we'll use that for starting
1902 // the IO manager and running the client action below.
1903 if (cap->no != 0) {
1904 task->cap = cap;
1905 releaseCapability(cap);
1906 }
1907 }
1908 cap = &capabilities[0];
1909 task->cap = cap;
1910
1911 // Empty the threads lists. Otherwise, the garbage
1912 // collector may attempt to resurrect some of these threads.
1913 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1914 generations[g].threads = END_TSO_QUEUE;
1915 }
1916
1917 // On Unix, all timers are reset in the child, so we need to start
1918 // the timer again.
1919 initTimer();
1920 startTimer();
1921
1922 // TODO: need to trace various other things in the child
1923 // like startup event, capabilities, process info etc
1924 traceTaskCreate(task, cap);
1925
1926 #if defined(THREADED_RTS)
1927 ioManagerStartCap(&cap);
1928 #endif
1929
1930 rts_evalStableIO(&cap, entry, NULL); // run the action
1931 rts_checkSchedStatus("forkProcess",cap);
1932
1933 rts_unlock(cap);
1934 hs_exit(); // clean up and exit
1935 stg_exit(EXIT_SUCCESS);
1936 }
1937 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1938 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1939 #endif
1940 }
1941
1942 /* ---------------------------------------------------------------------------
1943 * Changing the number of Capabilities
1944 *
1945 * Changing the number of Capabilities is very tricky! We can only do
1946 * it with the system fully stopped, so we do a full sync with
1947 * requestSync(SYNC_OTHER) and grab all the capabilities.
1948 *
1949 * Then we resize the appropriate data structures, and update all
1950 * references to the old data structures which have now moved.
1951 * Finally we release the Capabilities we are holding, and start
1952 * worker Tasks on the new Capabilities we created.
1953 *
1954 * ------------------------------------------------------------------------- */
1955
1956 void
1957 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1958 {
1959 #if !defined(THREADED_RTS)
1960 if (new_n_capabilities != 1) {
1961 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1962 }
1963 return;
1964 #elif defined(NOSMP)
1965 if (new_n_capabilities != 1) {
1966 errorBelch("setNumCapabilities: not supported on this platform");
1967 }
1968 return;
1969 #else
1970 Task *task;
1971 Capability *cap;
1972 nat sync;
1973 StgTSO* t;
1974 nat g, n;
1975 Capability *old_capabilities = NULL;
1976
1977 if (new_n_capabilities == enabled_capabilities) return;
1978
1979 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1980 enabled_capabilities, new_n_capabilities);
1981
1982 cap = rts_lock();
1983 task = cap->running_task;
1984
1985 do {
1986 sync = requestSync(&cap, task, SYNC_OTHER);
1987 } while (sync);
1988
1989 acquireAllCapabilities(cap,task);
1990
1991 pending_sync = 0;
1992
1993 if (new_n_capabilities < enabled_capabilities)
1994 {
1995 // Reducing the number of capabilities: we do not actually
1996 // remove the extra capabilities, we just mark them as
1997 // "disabled". This has the following effects:
1998 //
1999 // - threads on a disabled capability are migrated away by the
2000 // scheduler loop
2001 //
2002 // - disabled capabilities do not participate in GC
2003 // (see scheduleDoGC())
2004 //
2005 // - No spark threads are created on this capability
2006 // (see scheduleActivateSpark())
2007 //
2008 // - We do not attempt to migrate threads *to* a disabled
2009 // capability (see schedulePushWork()).
2010 //
2011 // but in other respects, a disabled capability remains
2012 // alive. Threads may be woken up on a disabled capability,
2013 // but they will be immediately migrated away.
2014 //
2015 // This approach is much easier than trying to actually remove
2016 // the capability; we don't have to worry about GC data
2017 // structures, the nursery, etc.
2018 //
2019 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2020 capabilities[n].disabled = rtsTrue;
2021 traceCapDisable(&capabilities[n]);
2022 }
2023 enabled_capabilities = new_n_capabilities;
2024 }
2025 else
2026 {
2027 // Increasing the number of enabled capabilities.
2028 //
2029 // enable any disabled capabilities, up to the required number
2030 for (n = enabled_capabilities;
2031 n < new_n_capabilities && n < n_capabilities; n++) {
2032 capabilities[n].disabled = rtsFalse;
2033 traceCapEnable(&capabilities[n]);
2034 }
2035 enabled_capabilities = n;
2036
2037 if (new_n_capabilities > n_capabilities) {
2038 #if defined(TRACING)
2039 // Allocate eventlog buffers for the new capabilities. Note this
2040 // must be done before calling moreCapabilities(), because that
2041 // will emit events about creating the new capabilities and adding
2042 // them to existing capsets.
2043 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2044 #endif
2045
2046 // Resize the capabilities array
2047 // NB. after this, capabilities points somewhere new. Any pointers
2048 // of type (Capability *) are now invalid.
2049 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2050
2051 // update our own cap pointer
2052 cap = &capabilities[cap->no];
2053
2054 // Resize and update storage manager data structures
2055 storageAddCapabilities(n_capabilities, new_n_capabilities);
2056
2057 // Update (Capability *) refs in the Task manager.
2058 updateCapabilityRefs();
2059
2060 // Update (Capability *) refs from TSOs
2061 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2062 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2063 t->cap = &capabilities[t->cap->no];
2064 }
2065 }
2066 }
2067 }
2068
2069 // We're done: release the original Capabilities
2070 releaseAllCapabilities(cap,task);
2071
2072 // Start worker tasks on the new Capabilities
2073 startWorkerTasks(n_capabilities, new_n_capabilities);
2074
2075 // finally, update n_capabilities
2076 if (new_n_capabilities > n_capabilities) {
2077 n_capabilities = enabled_capabilities = new_n_capabilities;
2078 }
2079
2080 // We can't free the old array until now, because we access it
2081 // while updating pointers in updateCapabilityRefs().
2082 if (old_capabilities) {
2083 stgFree(old_capabilities);
2084 }
2085
2086 rts_unlock(cap);
2087
2088 #endif // THREADED_RTS
2089 }
2090
2091
2092
2093 /* ---------------------------------------------------------------------------
2094 * Delete all the threads in the system
2095 * ------------------------------------------------------------------------- */
2096
2097 static void
2098 deleteAllThreads ( Capability *cap )
2099 {
2100 // NOTE: only safe to call if we own all capabilities.
2101
2102 StgTSO* t, *next;
2103 nat g;
2104
2105 debugTrace(DEBUG_sched,"deleting all threads");
2106 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2107 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2108 next = t->global_link;
2109 deleteThread(cap,t);
2110 }
2111 }
2112
2113 // The run queue now contains a bunch of ThreadKilled threads. We
2114 // must not throw these away: the main thread(s) will be in there
2115 // somewhere, and the main scheduler loop has to deal with it.
2116 // Also, the run queue is the only thing keeping these threads from
2117 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2118
2119 #if !defined(THREADED_RTS)
2120 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2121 ASSERT(sleeping_queue == END_TSO_QUEUE);
2122 #endif
2123 }
2124
2125 /* -----------------------------------------------------------------------------
2126 Managing the suspended_ccalls list.
2127 Locks required: sched_mutex
2128 -------------------------------------------------------------------------- */
2129
2130 STATIC_INLINE void
2131 suspendTask (Capability *cap, Task *task)
2132 {
2133 InCall *incall;
2134
2135 incall = task->incall;
2136 ASSERT(incall->next == NULL && incall->prev == NULL);
2137 incall->next = cap->suspended_ccalls;
2138 incall->prev = NULL;
2139 if (cap->suspended_ccalls) {
2140 cap->suspended_ccalls->prev = incall;
2141 }
2142 cap->suspended_ccalls = incall;
2143 }
2144
2145 STATIC_INLINE void
2146 recoverSuspendedTask (Capability *cap, Task *task)
2147 {
2148 InCall *incall;
2149
2150 incall = task->incall;
2151 if (incall->prev) {
2152 incall->prev->next = incall->next;
2153 } else {
2154 ASSERT(cap->suspended_ccalls == incall);
2155 cap->suspended_ccalls = incall->next;
2156 }
2157 if (incall->next) {
2158 incall->next->prev = incall->prev;
2159 }
2160 incall->next = incall->prev = NULL;
2161 }
2162
2163 /* ---------------------------------------------------------------------------
2164 * Suspending & resuming Haskell threads.
2165 *
2166 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2167 * its capability before calling the C function. This allows another
2168 * task to pick up the capability and carry on running Haskell
2169 * threads. It also means that if the C call blocks, it won't lock
2170 * the whole system.
2171 *
2172 * The Haskell thread making the C call is put to sleep for the
2173 * duration of the call, on the suspended_ccalling_threads queue. We
2174 * give out a token to the task, which it can use to resume the thread
2175 * on return from the C function.
2176 *
2177 * If this is an interruptible C call, this means that the FFI call may be
2178 * unceremoniously terminated and should be scheduled on an
2179 * unbound worker thread.
2180 * ------------------------------------------------------------------------- */
2181
2182 void *
2183 suspendThread (StgRegTable *reg, rtsBool interruptible)
2184 {
2185 Capability *cap;
2186 int saved_errno;
2187 StgTSO *tso;
2188 Task *task;
2189 #if mingw32_HOST_OS
2190 StgWord32 saved_winerror;
2191 #endif
2192
2193 saved_errno = errno;
2194 #if mingw32_HOST_OS
2195 saved_winerror = GetLastError();
2196 #endif
2197
2198 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2199 */
2200 cap = regTableToCapability(reg);
2201
2202 task = cap->running_task;
2203 tso = cap->r.rCurrentTSO;
2204
2205 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2206
2207 // XXX this might not be necessary --SDM
2208 tso->what_next = ThreadRunGHC;
2209
2210 threadPaused(cap,tso);
2211
2212 if (interruptible) {
2213 tso->why_blocked = BlockedOnCCall_Interruptible;
2214 } else {
2215 tso->why_blocked = BlockedOnCCall;
2216 }
2217
2218 // Hand back capability
2219 task->incall->suspended_tso = tso;
2220 task->incall->suspended_cap = cap;
2221
2222 ACQUIRE_LOCK(&cap->lock);
2223
2224 suspendTask(cap,task);
2225 cap->in_haskell = rtsFalse;
2226 releaseCapability_(cap,rtsFalse);
2227
2228 RELEASE_LOCK(&cap->lock);
2229
2230 errno = saved_errno;
2231 #if mingw32_HOST_OS
2232 SetLastError(saved_winerror);
2233 #endif
2234 return task;
2235 }
2236
2237 StgRegTable *
2238 resumeThread (void *task_)
2239 {
2240 StgTSO *tso;
2241 InCall *incall;
2242 Capability *cap;
2243 Task *task = task_;
2244 int saved_errno;
2245 #if mingw32_HOST_OS
2246 StgWord32 saved_winerror;
2247 #endif
2248
2249 saved_errno = errno;
2250 #if mingw32_HOST_OS
2251 saved_winerror = GetLastError();
2252 #endif
2253
2254 incall = task->incall;
2255 cap = incall->suspended_cap;
2256 task->cap = cap;
2257
2258 // Wait for permission to re-enter the RTS with the result.
2259 waitForReturnCapability(&cap,task);
2260 // we might be on a different capability now... but if so, our
2261 // entry on the suspended_ccalls list will also have been
2262 // migrated.
2263
2264 // Remove the thread from the suspended list
2265 recoverSuspendedTask(cap,task);
2266
2267 tso = incall->suspended_tso;
2268 incall->suspended_tso = NULL;
2269 incall->suspended_cap = NULL;
2270 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2271
2272 traceEventRunThread(cap, tso);
2273
2274 /* Reset blocking status */
2275 tso->why_blocked = NotBlocked;
2276
2277 if ((tso->flags & TSO_BLOCKEX) == 0) {
2278 // avoid locking the TSO if we don't have to
2279 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2280 maybePerformBlockedException(cap,tso);
2281 }
2282 }
2283
2284 cap->r.rCurrentTSO = tso;
2285 cap->in_haskell = rtsTrue;
2286 errno = saved_errno;
2287 #if mingw32_HOST_OS
2288 SetLastError(saved_winerror);
2289 #endif
2290
2291 /* We might have GC'd, mark the TSO dirty again */
2292 dirty_TSO(cap,tso);
2293 dirty_STACK(cap,tso->stackobj);
2294
2295 IF_DEBUG(sanity, checkTSO(tso));
2296
2297 return &cap->r;
2298 }
2299
2300 /* ---------------------------------------------------------------------------
2301 * scheduleThread()
2302 *
2303 * scheduleThread puts a thread on the end of the runnable queue.
2304 * This will usually be done immediately after a thread is created.
2305 * The caller of scheduleThread must create the thread using e.g.
2306 * createThread and push an appropriate closure
2307 * on this thread's stack before the scheduler is invoked.
2308 * ------------------------------------------------------------------------ */
2309
2310 void
2311 scheduleThread(Capability *cap, StgTSO *tso)
2312 {
2313 // The thread goes at the *end* of the run-queue, to avoid possible
2314 // starvation of any threads already on the queue.
2315 appendToRunQueue(cap,tso);
2316 }
2317
2318 void
2319 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2320 {
2321 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2322 // move this thread from now on.
2323 #if defined(THREADED_RTS)
2324 cpu %= enabled_capabilities;
2325 if (cpu == cap->no) {
2326 appendToRunQueue(cap,tso);
2327 } else {
2328 migrateThread(cap, tso, &capabilities[cpu]);
2329 }
2330 #else
2331 appendToRunQueue(cap,tso);
2332 #endif
2333 }
2334
2335 void
2336 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2337 {
2338 Task *task;
2339 DEBUG_ONLY( StgThreadID id );
2340 Capability *cap;
2341
2342 cap = *pcap;
2343
2344 // We already created/initialised the Task
2345 task = cap->running_task;
2346
2347 // This TSO is now a bound thread; make the Task and TSO
2348 // point to each other.
2349 tso->bound = task->incall;
2350 tso->cap = cap;
2351
2352 task->incall->tso = tso;
2353 task->incall->ret = ret;
2354 task->incall->stat = NoStatus;
2355
2356 appendToRunQueue(cap,tso);
2357
2358 DEBUG_ONLY( id = tso->id );
2359 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2360
2361 cap = schedule(cap,task);
2362
2363 ASSERT(task->incall->stat != NoStatus);
2364 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2365
2366 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2367 *pcap = cap;
2368 }
2369
2370 /* ----------------------------------------------------------------------------
2371 * Starting Tasks
2372 * ------------------------------------------------------------------------- */
2373
2374 #if defined(THREADED_RTS)
2375 void scheduleWorker (Capability *cap, Task *task)
2376 {
2377 // schedule() runs without a lock.
2378 cap = schedule(cap,task);
2379
2380 // On exit from schedule(), we have a Capability, but possibly not
2381 // the same one we started with.
2382
2383 // During shutdown, the requirement is that after all the
2384 // Capabilities are shut down, all workers that are shutting down
2385 // have finished workerTaskStop(). This is why we hold on to
2386 // cap->lock until we've finished workerTaskStop() below.
2387 //
2388 // There may be workers still involved in foreign calls; those
2389 // will just block in waitForReturnCapability() because the
2390 // Capability has been shut down.
2391 //
2392 ACQUIRE_LOCK(&cap->lock);
2393 releaseCapability_(cap,rtsFalse);
2394 workerTaskStop(task);
2395 RELEASE_LOCK(&cap->lock);
2396 }
2397 #endif
2398
2399 /* ---------------------------------------------------------------------------
2400 * Start new worker tasks on Capabilities from--to
2401 * -------------------------------------------------------------------------- */
2402
2403 static void
2404 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2405 {
2406 #if defined(THREADED_RTS)
2407 nat i;
2408 Capability *cap;
2409
2410 for (i = from; i < to; i++) {
2411 cap = &capabilities[i];
2412 ACQUIRE_LOCK(&cap->lock);
2413 startWorkerTask(cap);
2414 RELEASE_LOCK(&cap->lock);
2415 }
2416 #endif
2417 }
2418
2419 /* ---------------------------------------------------------------------------
2420 * initScheduler()
2421 *
2422 * Initialise the scheduler. This resets all the queues - if the
2423 * queues contained any threads, they'll be garbage collected at the
2424 * next pass.
2425 *
2426 * ------------------------------------------------------------------------ */
2427
2428 void
2429 initScheduler(void)
2430 {
2431 #if !defined(THREADED_RTS)
2432 blocked_queue_hd = END_TSO_QUEUE;
2433 blocked_queue_tl = END_TSO_QUEUE;
2434 sleeping_queue = END_TSO_QUEUE;
2435 #endif
2436
2437 sched_state = SCHED_RUNNING;
2438 recent_activity = ACTIVITY_YES;
2439
2440 #if defined(THREADED_RTS)
2441 /* Initialise the mutex and condition variables used by
2442 * the scheduler. */
2443 initMutex(&sched_mutex);
2444 #endif
2445
2446 ACQUIRE_LOCK(&sched_mutex);
2447
2448 /* A capability holds the state a native thread needs in
2449 * order to execute STG code. At least one capability is
2450 * floating around (only THREADED_RTS builds have more than one).
2451 */
2452 initCapabilities();
2453
2454 initTaskManager();
2455
2456 /*
2457 * Eagerly start one worker to run each Capability, except for
2458 * Capability 0. The idea is that we're probably going to start a
2459 * bound thread on Capability 0 pretty soon, so we don't want a
2460 * worker task hogging it.
2461 */
2462 startWorkerTasks(1, n_capabilities);
2463
2464 RELEASE_LOCK(&sched_mutex);
2465
2466 }
2467
2468 void
2469 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2470 /* see Capability.c, shutdownCapability() */
2471 {
2472 Task *task = NULL;
2473
2474 task = newBoundTask();
2475
2476 // If we haven't killed all the threads yet, do it now.
2477 if (sched_state < SCHED_SHUTTING_DOWN) {
2478 sched_state = SCHED_INTERRUPTING;
2479 Capability *cap = task->cap;
2480 waitForReturnCapability(&cap,task);
2481 scheduleDoGC(&cap,task,rtsTrue);
2482 ASSERT(task->incall->tso == NULL);
2483 releaseCapability(cap);
2484 }
2485 sched_state = SCHED_SHUTTING_DOWN;
2486
2487 shutdownCapabilities(task, wait_foreign);
2488
2489 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2490 // n_failed_trygrab_idles, n_idle_caps);
2491
2492 boundTaskExiting(task);
2493 }
2494
2495 void
2496 freeScheduler( void )
2497 {
2498 nat still_running;
2499
2500 ACQUIRE_LOCK(&sched_mutex);
2501 still_running = freeTaskManager();
2502 // We can only free the Capabilities if there are no Tasks still
2503 // running. We might have a Task about to return from a foreign
2504 // call into waitForReturnCapability(), for example (actually,
2505 // this should be the *only* thing that a still-running Task can
2506 // do at this point, and it will block waiting for the
2507 // Capability).
2508 if (still_running == 0) {
2509 freeCapabilities();
2510 if (n_capabilities != 1) {
2511 stgFree(capabilities);
2512 }
2513 }
2514 RELEASE_LOCK(&sched_mutex);
2515 #if defined(THREADED_RTS)
2516 closeMutex(&sched_mutex);
2517 #endif
2518 }
2519
2520 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2521 void *user USED_IF_NOT_THREADS)
2522 {
2523 #if !defined(THREADED_RTS)
2524 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2525 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2526 evac(user, (StgClosure **)(void *)&sleeping_queue);
2527 #endif
2528 }
2529
2530 /* -----------------------------------------------------------------------------
2531 performGC
2532
2533 This is the interface to the garbage collector from Haskell land.
2534 We provide this so that external C code can allocate and garbage
2535 collect when called from Haskell via _ccall_GC.
2536 -------------------------------------------------------------------------- */
2537
2538 static void
2539 performGC_(rtsBool force_major)
2540 {
2541 Task *task;
2542 Capability *cap = NULL;
2543
2544 // We must grab a new Task here, because the existing Task may be
2545 // associated with a particular Capability, and chained onto the
2546 // suspended_ccalls queue.
2547 task = newBoundTask();
2548
2549 // TODO: do we need to traceTask*() here?
2550
2551 waitForReturnCapability(&cap,task);
2552 scheduleDoGC(&cap,task,force_major);
2553 releaseCapability(cap);
2554 boundTaskExiting(task);
2555 }
2556
2557 void
2558 performGC(void)
2559 {
2560 performGC_(rtsFalse);
2561 }
2562
2563 void
2564 performMajorGC(void)
2565 {
2566 performGC_(rtsTrue);
2567 }
2568
2569 /* ---------------------------------------------------------------------------
2570 Interrupt execution
2571 - usually called inside a signal handler so it mustn't do anything fancy.
2572 ------------------------------------------------------------------------ */
2573
2574 void
2575 interruptStgRts(void)
2576 {
2577 sched_state = SCHED_INTERRUPTING;
2578 interruptAllCapabilities();
2579 #if defined(THREADED_RTS)
2580 wakeUpRts();
2581 #endif
2582 }
2583
2584 /* -----------------------------------------------------------------------------
2585 Wake up the RTS
2586
2587 This function causes at least one OS thread to wake up and run the
2588 scheduler loop. It is invoked when the RTS might be deadlocked, or
2589 an external event has arrived that may need servicing (eg. a
2590 keyboard interrupt).
2591
2592 In the single-threaded RTS we don't do anything here; we only have
2593 one thread anyway, and the event that caused us to want to wake up
2594 will have interrupted any blocking system call in progress anyway.
2595 -------------------------------------------------------------------------- */
2596
2597 #if defined(THREADED_RTS)
2598 void wakeUpRts(void)
2599 {
2600 // This forces the IO Manager thread to wakeup, which will
2601 // in turn ensure that some OS thread wakes up and runs the
2602 // scheduler loop, which will cause a GC and deadlock check.
2603 ioManagerWakeup();
2604 }
2605 #endif
2606
2607 /* -----------------------------------------------------------------------------
2608 Deleting threads
2609
2610 This is used for interruption (^C) and forking, and corresponds to
2611 raising an exception but without letting the thread catch the
2612 exception.
2613 -------------------------------------------------------------------------- */
2614
2615 static void
2616 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2617 {
2618 // NOTE: must only be called on a TSO that we have exclusive
2619 // access to, because we will call throwToSingleThreaded() below.
2620 // The TSO must be on the run queue of the Capability we own, or
2621 // we must own all Capabilities.
2622
2623 if (tso->why_blocked != BlockedOnCCall &&
2624 tso->why_blocked != BlockedOnCCall_Interruptible) {
2625 throwToSingleThreaded(tso->cap,tso,NULL);
2626 }
2627 }
2628
2629 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2630 static void
2631 deleteThread_(Capability *cap, StgTSO *tso)
2632 { // for forkProcess only:
2633 // like deleteThread(), but we delete threads in foreign calls, too.
2634
2635 if (tso->why_blocked == BlockedOnCCall ||
2636 tso->why_blocked == BlockedOnCCall_Interruptible) {
2637 tso->what_next = ThreadKilled;
2638 appendToRunQueue(tso->cap, tso);
2639 } else {
2640 deleteThread(cap,tso);
2641 }
2642 }
2643 #endif
2644
2645 /* -----------------------------------------------------------------------------
2646 raiseExceptionHelper
2647
2648 This function is called by the raise# primitve, just so that we can
2649 move some of the tricky bits of raising an exception from C-- into
2650 C. Who knows, it might be a useful re-useable thing here too.
2651 -------------------------------------------------------------------------- */
2652
2653 StgWord
2654 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2655 {
2656 Capability *cap = regTableToCapability(reg);
2657 StgThunk *raise_closure = NULL;
2658 StgPtr p, next;
2659 StgRetInfoTable *info;
2660 //
2661 // This closure represents the expression 'raise# E' where E
2662 // is the exception raise. It is used to overwrite all the
2663 // thunks which are currently under evaluataion.
2664 //
2665
2666 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2667 // LDV profiling: stg_raise_info has THUNK as its closure
2668 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2669 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2670 // 1 does not cause any problem unless profiling is performed.
2671 // However, when LDV profiling goes on, we need to linearly scan
2672 // small object pool, where raise_closure is stored, so we should
2673 // use MIN_UPD_SIZE.
2674 //
2675 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2676 // sizeofW(StgClosure)+1);
2677 //
2678
2679 //
2680 // Walk up the stack, looking for the catch frame. On the way,
2681 // we update any closures pointed to from update frames with the
2682 // raise closure that we just built.
2683 //
2684 p = tso->stackobj->sp;
2685 while(1) {
2686 info = get_ret_itbl((StgClosure *)p);
2687 next = p + stack_frame_sizeW((StgClosure *)p);
2688 switch (info->i.type) {
2689
2690 case UPDATE_FRAME:
2691 // Only create raise_closure if we need to.
2692 if (raise_closure == NULL) {
2693 raise_closure =
2694 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2695 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2696 raise_closure->payload[0] = exception;
2697 }
2698 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2699 (StgClosure *)raise_closure);
2700 p = next;
2701 continue;
2702
2703 case ATOMICALLY_FRAME:
2704 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2705 tso->stackobj->sp = p;
2706 return ATOMICALLY_FRAME;
2707
2708 case CATCH_FRAME:
2709 tso->stackobj->sp = p;
2710 return CATCH_FRAME;
2711
2712 case CATCH_STM_FRAME:
2713 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2714 tso->stackobj->sp = p;
2715 return CATCH_STM_FRAME;
2716
2717 case UNDERFLOW_FRAME:
2718 tso->stackobj->sp = p;
2719 threadStackUnderflow(cap,tso);
2720 p = tso->stackobj->sp;
2721 continue;
2722
2723 case STOP_FRAME:
2724 tso->stackobj->sp = p;
2725 return STOP_FRAME;
2726
2727 case CATCH_RETRY_FRAME:
2728 default:
2729 p = next;
2730 continue;
2731 }
2732 }
2733 }
2734
2735
2736 /* -----------------------------------------------------------------------------
2737 findRetryFrameHelper
2738
2739 This function is called by the retry# primitive. It traverses the stack
2740 leaving tso->sp referring to the frame which should handle the retry.
2741
2742 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2743 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2744
2745 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2746 create) because retries are not considered to be exceptions, despite the
2747 similar implementation.
2748
2749 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2750 not be created within memory transactions.
2751 -------------------------------------------------------------------------- */
2752
2753 StgWord
2754 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2755 {
2756 StgPtr p, next;
2757 StgRetInfoTable *info;
2758
2759 p = tso->stackobj->sp;
2760 while (1) {
2761 info = get_ret_itbl((StgClosure *)p);
2762 next = p + stack_frame_sizeW((StgClosure *)p);
2763 switch (info->i.type) {
2764
2765 case ATOMICALLY_FRAME:
2766 debugTrace(DEBUG_stm,
2767 "found ATOMICALLY_FRAME at %p during retry", p);
2768 tso->stackobj->sp = p;
2769 return ATOMICALLY_FRAME;
2770
2771 case CATCH_RETRY_FRAME:
2772 debugTrace(DEBUG_stm,
2773 "found CATCH_RETRY_FRAME at %p during retry", p);
2774 tso->stackobj->sp = p;
2775 return CATCH_RETRY_FRAME;
2776
2777 case CATCH_STM_FRAME: {
2778 StgTRecHeader *trec = tso -> trec;
2779 StgTRecHeader *outer = trec -> enclosing_trec;
2780 debugTrace(DEBUG_stm,
2781 "found CATCH_STM_FRAME at %p during retry", p);
2782 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2783 stmAbortTransaction(cap, trec);
2784 stmFreeAbortedTRec(cap, trec);
2785 tso -> trec = outer;
2786 p = next;
2787 continue;
2788 }
2789
2790 case UNDERFLOW_FRAME:
2791 threadStackUnderflow(cap,tso);
2792 p = tso->stackobj->sp;
2793 continue;
2794
2795 default:
2796 ASSERT(info->i.type != CATCH_FRAME);
2797 ASSERT(info->i.type != STOP_FRAME);
2798 p = next;
2799 continue;
2800 }
2801 }
2802 }
2803
2804 /* -----------------------------------------------------------------------------
2805 resurrectThreads is called after garbage collection on the list of
2806 threads found to be garbage. Each of these threads will be woken
2807 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2808 on an MVar, or NonTermination if the thread was blocked on a Black
2809 Hole.
2810
2811 Locks: assumes we hold *all* the capabilities.
2812 -------------------------------------------------------------------------- */
2813
2814 void
2815 resurrectThreads (StgTSO *threads)
2816 {
2817 StgTSO *tso, *next;
2818 Capability *cap;
2819 generation *gen;
2820
2821 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2822 next = tso->global_link;
2823
2824 gen = Bdescr((P_)tso)->gen;
2825 tso->global_link = gen->threads;
2826 gen->threads = tso;
2827
2828 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2829
2830 // Wake up the thread on the Capability it was last on
2831 cap = tso->cap;
2832
2833 switch (tso->why_blocked) {
2834 case BlockedOnMVar:
2835 /* Called by GC - sched_mutex lock is currently held. */
2836 throwToSingleThreaded(cap, tso,
2837 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2838 break;
2839 case BlockedOnBlackHole:
2840 throwToSingleThreaded(cap, tso,
2841 (StgClosure *)nonTermination_closure);
2842 break;
2843 case BlockedOnSTM:
2844 throwToSingleThreaded(cap, tso,
2845 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2846 break;
2847 case NotBlocked:
2848 /* This might happen if the thread was blocked on a black hole
2849 * belonging to a thread that we've just woken up (raiseAsync
2850 * can wake up threads, remember...).
2851 */
2852 continue;
2853 case BlockedOnMsgThrowTo:
2854 // This can happen if the target is masking, blocks on a
2855 // black hole, and then is found to be unreachable. In
2856 // this case, we want to let the target wake up and carry
2857 // on, and do nothing to this thread.
2858 continue;
2859 default:
2860 barf("resurrectThreads: thread blocked in a strange way: %d",
2861 tso->why_blocked);
2862 }
2863 }
2864 }