testsuite: attempt fixing fallout from 089b72f52
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsTrue);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. It might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
485 // don't want it set when not running a Haskell thread.
486 cap->r.rCurrentTSO = NULL;
487
488 // And save the current errno in this thread.
489 // XXX: possibly bogus for SMP because this thread might already
490 // be running again, see code below.
491 t->saved_errno = errno;
492 #if mingw32_HOST_OS
493 // Similarly for Windows error code
494 t->saved_winerror = GetLastError();
495 #endif
496
497 if (ret == ThreadBlocked) {
498 if (t->why_blocked == BlockedOnBlackHole) {
499 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
500 traceEventStopThread(cap, t, t->why_blocked + 6,
501 owner != NULL ? owner->id : 0);
502 } else {
503 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
504 }
505 } else {
506 traceEventStopThread(cap, t, ret, 0);
507 }
508
509 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
510 ASSERT(t->cap == cap);
511
512 // ----------------------------------------------------------------------
513
514 // Costs for the scheduler are assigned to CCS_SYSTEM
515 stopHeapProfTimer();
516 #if defined(PROFILING)
517 cap->r.rCCCS = CCS_SYSTEM;
518 #endif
519
520 schedulePostRunThread(cap,t);
521
522 ready_to_gc = rtsFalse;
523
524 switch (ret) {
525 case HeapOverflow:
526 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
527 break;
528
529 case StackOverflow:
530 // just adjust the stack for this thread, then pop it back
531 // on the run queue.
532 threadStackOverflow(cap, t);
533 pushOnRunQueue(cap,t);
534 break;
535
536 case ThreadYielding:
537 if (scheduleHandleYield(cap, t, prev_what_next)) {
538 // shortcut for switching between compiler/interpreter:
539 goto run_thread;
540 }
541 break;
542
543 case ThreadBlocked:
544 scheduleHandleThreadBlocked(t);
545 break;
546
547 case ThreadFinished:
548 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
549 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
550 break;
551
552 default:
553 barf("schedule: invalid thread return code %d", (int)ret);
554 }
555
556 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
557 scheduleDoGC(&cap,task,rtsFalse);
558 }
559 } /* end of while() */
560 }
561
562 /* -----------------------------------------------------------------------------
563 * Run queue operations
564 * -------------------------------------------------------------------------- */
565
566 void
567 removeFromRunQueue (Capability *cap, StgTSO *tso)
568 {
569 if (tso->block_info.prev == END_TSO_QUEUE) {
570 ASSERT(cap->run_queue_hd == tso);
571 cap->run_queue_hd = tso->_link;
572 } else {
573 setTSOLink(cap, tso->block_info.prev, tso->_link);
574 }
575 if (tso->_link == END_TSO_QUEUE) {
576 ASSERT(cap->run_queue_tl == tso);
577 cap->run_queue_tl = tso->block_info.prev;
578 } else {
579 setTSOPrev(cap, tso->_link, tso->block_info.prev);
580 }
581 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
582
583 IF_DEBUG(sanity, checkRunQueue(cap));
584 }
585
586 void
587 promoteInRunQueue (Capability *cap, StgTSO *tso)
588 {
589 removeFromRunQueue(cap, tso);
590 pushOnRunQueue(cap, tso);
591 }
592
593 /* ----------------------------------------------------------------------------
594 * Setting up the scheduler loop
595 * ------------------------------------------------------------------------- */
596
597 static void
598 schedulePreLoop(void)
599 {
600 // initialisation for scheduler - what cannot go into initScheduler()
601
602 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
603 win32AllocStack();
604 #endif
605 }
606
607 /* -----------------------------------------------------------------------------
608 * scheduleFindWork()
609 *
610 * Search for work to do, and handle messages from elsewhere.
611 * -------------------------------------------------------------------------- */
612
613 static void
614 scheduleFindWork (Capability **pcap)
615 {
616 scheduleStartSignalHandlers(*pcap);
617
618 scheduleProcessInbox(pcap);
619
620 scheduleCheckBlockedThreads(*pcap);
621
622 #if defined(THREADED_RTS)
623 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
624 #endif
625 }
626
627 #if defined(THREADED_RTS)
628 STATIC_INLINE rtsBool
629 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
630 {
631 // we need to yield this capability to someone else if..
632 // - another thread is initiating a GC, and we didn't just do a GC
633 // (see Note [GC livelock])
634 // - another Task is returning from a foreign call
635 // - the thread at the head of the run queue cannot be run
636 // by this Task (it is bound to another Task, or it is unbound
637 // and this task it bound).
638 //
639 // Note [GC livelock]
640 //
641 // If we are interrupted to do a GC, then we do not immediately do
642 // another one. This avoids a starvation situation where one
643 // Capability keeps forcing a GC and the other Capabilities make no
644 // progress at all.
645
646 return ((pending_sync && !didGcLast) ||
647 cap->returning_tasks_hd != NULL ||
648 (!emptyRunQueue(cap) && (task->incall->tso == NULL
649 ? peekRunQueue(cap)->bound != NULL
650 : peekRunQueue(cap)->bound != task->incall)));
651 }
652
653 // This is the single place where a Task goes to sleep. There are
654 // two reasons it might need to sleep:
655 // - there are no threads to run
656 // - we need to yield this Capability to someone else
657 // (see shouldYieldCapability())
658 //
659 // Careful: the scheduler loop is quite delicate. Make sure you run
660 // the tests in testsuite/concurrent (all ways) after modifying this,
661 // and also check the benchmarks in nofib/parallel for regressions.
662
663 static void
664 scheduleYield (Capability **pcap, Task *task)
665 {
666 Capability *cap = *pcap;
667 int didGcLast = rtsFalse;
668
669 // if we have work, and we don't need to give up the Capability, continue.
670 //
671 if (!shouldYieldCapability(cap,task,rtsFalse) &&
672 (!emptyRunQueue(cap) ||
673 !emptyInbox(cap) ||
674 sched_state >= SCHED_INTERRUPTING)) {
675 return;
676 }
677
678 // otherwise yield (sleep), and keep yielding if necessary.
679 do {
680 didGcLast = yieldCapability(&cap,task, !didGcLast);
681 }
682 while (shouldYieldCapability(cap,task,didGcLast));
683
684 // note there may still be no threads on the run queue at this
685 // point, the caller has to check.
686
687 *pcap = cap;
688 return;
689 }
690 #endif
691
692 /* -----------------------------------------------------------------------------
693 * schedulePushWork()
694 *
695 * Push work to other Capabilities if we have some.
696 * -------------------------------------------------------------------------- */
697
698 static void
699 schedulePushWork(Capability *cap USED_IF_THREADS,
700 Task *task USED_IF_THREADS)
701 {
702 /* following code not for PARALLEL_HASKELL. I kept the call general,
703 future GUM versions might use pushing in a distributed setup */
704 #if defined(THREADED_RTS)
705
706 Capability *free_caps[n_capabilities], *cap0;
707 nat i, n_free_caps;
708
709 // migration can be turned off with +RTS -qm
710 if (!RtsFlags.ParFlags.migrate) return;
711
712 // Check whether we have more threads on our run queue, or sparks
713 // in our pool, that we could hand to another Capability.
714 if (emptyRunQueue(cap)) {
715 if (sparkPoolSizeCap(cap) < 2) return;
716 } else {
717 if (singletonRunQueue(cap) &&
718 sparkPoolSizeCap(cap) < 1) return;
719 }
720
721 // First grab as many free Capabilities as we can.
722 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
723 cap0 = capabilities[i];
724 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
725 if (!emptyRunQueue(cap0)
726 || cap0->returning_tasks_hd != NULL
727 || cap0->inbox != (Message*)END_TSO_QUEUE) {
728 // it already has some work, we just grabbed it at
729 // the wrong moment. Or maybe it's deadlocked!
730 releaseCapability(cap0);
731 } else {
732 free_caps[n_free_caps++] = cap0;
733 }
734 }
735 }
736
737 // we now have n_free_caps free capabilities stashed in
738 // free_caps[]. Share our run queue equally with them. This is
739 // probably the simplest thing we could do; improvements we might
740 // want to do include:
741 //
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
745 //
746 // - giving low priority to moving long-lived threads
747
748 if (n_free_caps > 0) {
749 StgTSO *prev, *t, *next;
750 #ifdef SPARK_PUSHING
751 rtsBool pushed_to_all;
752 #endif
753
754 debugTrace(DEBUG_sched,
755 "cap %d: %s and %d free capabilities, sharing...",
756 cap->no,
757 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
758 "excess threads on run queue":"sparks to share (>=2)",
759 n_free_caps);
760
761 i = 0;
762 #ifdef SPARK_PUSHING
763 pushed_to_all = rtsFalse;
764 #endif
765
766 if (cap->run_queue_hd != END_TSO_QUEUE) {
767 prev = cap->run_queue_hd;
768 t = prev->_link;
769 prev->_link = END_TSO_QUEUE;
770 for (; t != END_TSO_QUEUE; t = next) {
771 next = t->_link;
772 t->_link = END_TSO_QUEUE;
773 if (t->bound == task->incall // don't move my bound thread
774 || tsoLocked(t)) { // don't move a locked thread
775 setTSOLink(cap, prev, t);
776 setTSOPrev(cap, t, prev);
777 prev = t;
778 } else if (i == n_free_caps) {
779 #ifdef SPARK_PUSHING
780 pushed_to_all = rtsTrue;
781 #endif
782 i = 0;
783 // keep one for us
784 setTSOLink(cap, prev, t);
785 setTSOPrev(cap, t, prev);
786 prev = t;
787 } else {
788 appendToRunQueue(free_caps[i],t);
789
790 traceEventMigrateThread (cap, t, free_caps[i]->no);
791
792 if (t->bound) { t->bound->task->cap = free_caps[i]; }
793 t->cap = free_caps[i];
794 i++;
795 }
796 }
797 cap->run_queue_tl = prev;
798
799 IF_DEBUG(sanity, checkRunQueue(cap));
800 }
801
802 #ifdef SPARK_PUSHING
803 /* JB I left this code in place, it would work but is not necessary */
804
805 // If there are some free capabilities that we didn't push any
806 // threads to, then try to push a spark to each one.
807 if (!pushed_to_all) {
808 StgClosure *spark;
809 // i is the next free capability to push to
810 for (; i < n_free_caps; i++) {
811 if (emptySparkPoolCap(free_caps[i])) {
812 spark = tryStealSpark(cap->sparks);
813 if (spark != NULL) {
814 /* TODO: if anyone wants to re-enable this code then
815 * they must consider the fizzledSpark(spark) case
816 * and update the per-cap spark statistics.
817 */
818 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
819
820 traceEventStealSpark(free_caps[i], t, cap->no);
821
822 newSpark(&(free_caps[i]->r), spark);
823 }
824 }
825 }
826 }
827 #endif /* SPARK_PUSHING */
828
829 // release the capabilities
830 for (i = 0; i < n_free_caps; i++) {
831 task->cap = free_caps[i];
832 releaseAndWakeupCapability(free_caps[i]);
833 }
834 }
835 task->cap = cap; // reset to point to our Capability.
836
837 #endif /* THREADED_RTS */
838
839 }
840
841 /* ----------------------------------------------------------------------------
842 * Start any pending signal handlers
843 * ------------------------------------------------------------------------- */
844
845 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
846 static void
847 scheduleStartSignalHandlers(Capability *cap)
848 {
849 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
850 // safe outside the lock
851 startSignalHandlers(cap);
852 }
853 }
854 #else
855 static void
856 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
857 {
858 }
859 #endif
860
861 /* ----------------------------------------------------------------------------
862 * Check for blocked threads that can be woken up.
863 * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
867 {
868 #if !defined(THREADED_RTS)
869 //
870 // Check whether any waiting threads need to be woken up. If the
871 // run queue is empty, and there are no other tasks running, we
872 // can wait indefinitely for something to happen.
873 //
874 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
875 {
876 awaitEvent (emptyRunQueue(cap));
877 }
878 #endif
879 }
880
881 /* ----------------------------------------------------------------------------
882 * Detect deadlock conditions and attempt to resolve them.
883 * ------------------------------------------------------------------------- */
884
885 static void
886 scheduleDetectDeadlock (Capability **pcap, Task *task)
887 {
888 Capability *cap = *pcap;
889 /*
890 * Detect deadlock: when we have no threads to run, there are no
891 * threads blocked, waiting for I/O, or sleeping, and all the
892 * other tasks are waiting for work, we must have a deadlock of
893 * some description.
894 */
895 if ( emptyThreadQueues(cap) )
896 {
897 #if defined(THREADED_RTS)
898 /*
899 * In the threaded RTS, we only check for deadlock if there
900 * has been no activity in a complete timeslice. This means
901 * we won't eagerly start a full GC just because we don't have
902 * any threads to run currently.
903 */
904 if (recent_activity != ACTIVITY_INACTIVE) return;
905 #endif
906
907 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
908
909 // Garbage collection can release some new threads due to
910 // either (a) finalizers or (b) threads resurrected because
911 // they are unreachable and will therefore be sent an
912 // exception. Any threads thus released will be immediately
913 // runnable.
914 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
915 cap = *pcap;
916 // when force_major == rtsTrue. scheduleDoGC sets
917 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
918 // signal.
919
920 if ( !emptyRunQueue(cap) ) return;
921
922 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
923 /* If we have user-installed signal handlers, then wait
924 * for signals to arrive rather then bombing out with a
925 * deadlock.
926 */
927 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
928 debugTrace(DEBUG_sched,
929 "still deadlocked, waiting for signals...");
930
931 awaitUserSignals();
932
933 if (signals_pending()) {
934 startSignalHandlers(cap);
935 }
936
937 // either we have threads to run, or we were interrupted:
938 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
939
940 return;
941 }
942 #endif
943
944 #if !defined(THREADED_RTS)
945 /* Probably a real deadlock. Send the current main thread the
946 * Deadlock exception.
947 */
948 if (task->incall->tso) {
949 switch (task->incall->tso->why_blocked) {
950 case BlockedOnSTM:
951 case BlockedOnBlackHole:
952 case BlockedOnMsgThrowTo:
953 case BlockedOnMVar:
954 case BlockedOnMVarRead:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Process message in the current Capability's inbox
970 * ------------------------------------------------------------------------- */
971
972 static void
973 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
974 {
975 #if defined(THREADED_RTS)
976 Message *m, *next;
977 int r;
978 Capability *cap = *pcap;
979
980 while (!emptyInbox(cap)) {
981 if (cap->r.rCurrentNursery->link == NULL ||
982 g0->n_new_large_words >= large_alloc_lim) {
983 scheduleDoGC(pcap, cap->running_task, rtsFalse);
984 cap = *pcap;
985 }
986
987 // don't use a blocking acquire; if the lock is held by
988 // another thread then just carry on. This seems to avoid
989 // getting stuck in a message ping-pong situation with other
990 // processors. We'll check the inbox again later anyway.
991 //
992 // We should really use a more efficient queue data structure
993 // here. The trickiness is that we must ensure a Capability
994 // never goes idle if the inbox is non-empty, which is why we
995 // use cap->lock (cap->lock is released as the last thing
996 // before going idle; see Capability.c:releaseCapability()).
997 r = TRY_ACQUIRE_LOCK(&cap->lock);
998 if (r != 0) return;
999
1000 m = cap->inbox;
1001 cap->inbox = (Message*)END_TSO_QUEUE;
1002
1003 RELEASE_LOCK(&cap->lock);
1004
1005 while (m != (Message*)END_TSO_QUEUE) {
1006 next = m->link;
1007 executeMessage(cap, m);
1008 m = next;
1009 }
1010 }
1011 #endif
1012 }
1013
1014 /* ----------------------------------------------------------------------------
1015 * Activate spark threads (THREADED_RTS)
1016 * ------------------------------------------------------------------------- */
1017
1018 #if defined(THREADED_RTS)
1019 static void
1020 scheduleActivateSpark(Capability *cap)
1021 {
1022 if (anySparks() && !cap->disabled)
1023 {
1024 createSparkThread(cap);
1025 debugTrace(DEBUG_sched, "creating a spark thread");
1026 }
1027 }
1028 #endif // THREADED_RTS
1029
1030 /* ----------------------------------------------------------------------------
1031 * After running a thread...
1032 * ------------------------------------------------------------------------- */
1033
1034 static void
1035 schedulePostRunThread (Capability *cap, StgTSO *t)
1036 {
1037 // We have to be able to catch transactions that are in an
1038 // infinite loop as a result of seeing an inconsistent view of
1039 // memory, e.g.
1040 //
1041 // atomically $ do
1042 // [a,b] <- mapM readTVar [ta,tb]
1043 // when (a == b) loop
1044 //
1045 // and a is never equal to b given a consistent view of memory.
1046 //
1047 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1048 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1049 debugTrace(DEBUG_sched | DEBUG_stm,
1050 "trec %p found wasting its time", t);
1051
1052 // strip the stack back to the
1053 // ATOMICALLY_FRAME, aborting the (nested)
1054 // transaction, and saving the stack of any
1055 // partially-evaluated thunks on the heap.
1056 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1057
1058 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1059 }
1060 }
1061
1062 //
1063 // If the current thread's allocation limit has run out, send it
1064 // the AllocationLimitExceeded exception.
1065
1066 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1067 // Use a throwToSelf rather than a throwToSingleThreaded, because
1068 // it correctly handles the case where the thread is currently
1069 // inside mask. Also the thread might be blocked (e.g. on an
1070 // MVar), and throwToSingleThreaded doesn't unblock it
1071 // correctly in that case.
1072 throwToSelf(cap, t, allocationLimitExceeded_closure);
1073 ASSIGN_Int64((W_*)&(t->alloc_limit),
1074 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1075 }
1076
1077 /* some statistics gathering in the parallel case */
1078 }
1079
1080 /* -----------------------------------------------------------------------------
1081 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1082 * -------------------------------------------------------------------------- */
1083
1084 static rtsBool
1085 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1086 {
1087 if (cap->r.rHpLim == NULL || cap->context_switch) {
1088 // Sometimes we miss a context switch, e.g. when calling
1089 // primitives in a tight loop, MAYBE_GC() doesn't check the
1090 // context switch flag, and we end up waiting for a GC.
1091 // See #1984, and concurrent/should_run/1984
1092 cap->context_switch = 0;
1093 appendToRunQueue(cap,t);
1094 } else {
1095 pushOnRunQueue(cap,t);
1096 }
1097
1098 // did the task ask for a large block?
1099 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1100 // if so, get one and push it on the front of the nursery.
1101 bdescr *bd;
1102 W_ blocks;
1103
1104 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1105
1106 if (blocks > BLOCKS_PER_MBLOCK) {
1107 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1108 }
1109
1110 debugTrace(DEBUG_sched,
1111 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1112 (long)t->id, what_next_strs[t->what_next], blocks);
1113
1114 // don't do this if the nursery is (nearly) full, we'll GC first.
1115 if (cap->r.rCurrentNursery->link != NULL ||
1116 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1117 // infinite loop if the
1118 // nursery has only one
1119 // block.
1120
1121 bd = allocGroup_lock(blocks);
1122 cap->r.rNursery->n_blocks += blocks;
1123
1124 // link the new group after CurrentNursery
1125 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1126
1127 // initialise it as a nursery block. We initialise the
1128 // step, gen_no, and flags field of *every* sub-block in
1129 // this large block, because this is easier than making
1130 // sure that we always find the block head of a large
1131 // block whenever we call Bdescr() (eg. evacuate() and
1132 // isAlive() in the GC would both have to do this, at
1133 // least).
1134 {
1135 bdescr *x;
1136 for (x = bd; x < bd + blocks; x++) {
1137 initBdescr(x,g0,g0);
1138 x->free = x->start;
1139 x->flags = 0;
1140 }
1141 }
1142
1143 // This assert can be a killer if the app is doing lots
1144 // of large block allocations.
1145 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1146
1147 // now update the nursery to point to the new block
1148 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1149 cap->r.rCurrentNursery = bd;
1150
1151 // we might be unlucky and have another thread get on the
1152 // run queue before us and steal the large block, but in that
1153 // case the thread will just end up requesting another large
1154 // block.
1155 return rtsFalse; /* not actually GC'ing */
1156 }
1157 }
1158
1159 // if we got here because we exceeded large_alloc_lim, then
1160 // proceed straight to GC.
1161 if (g0->n_new_large_words >= large_alloc_lim) {
1162 return rtsTrue;
1163 }
1164
1165 // Otherwise, we just ran out of space in the current nursery.
1166 // Grab another nursery if we can.
1167 if (getNewNursery(cap)) {
1168 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1169 return rtsFalse;
1170 }
1171
1172 return rtsTrue;
1173 /* actual GC is done at the end of the while loop in schedule() */
1174 }
1175
1176 /* -----------------------------------------------------------------------------
1177 * Handle a thread that returned to the scheduler with ThreadYielding
1178 * -------------------------------------------------------------------------- */
1179
1180 static rtsBool
1181 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1182 {
1183 /* put the thread back on the run queue. Then, if we're ready to
1184 * GC, check whether this is the last task to stop. If so, wake
1185 * up the GC thread. getThread will block during a GC until the
1186 * GC is finished.
1187 */
1188
1189 ASSERT(t->_link == END_TSO_QUEUE);
1190
1191 // Shortcut if we're just switching evaluators: don't bother
1192 // doing stack squeezing (which can be expensive), just run the
1193 // thread.
1194 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1195 debugTrace(DEBUG_sched,
1196 "--<< thread %ld (%s) stopped to switch evaluators",
1197 (long)t->id, what_next_strs[t->what_next]);
1198 return rtsTrue;
1199 }
1200
1201 // Reset the context switch flag. We don't do this just before
1202 // running the thread, because that would mean we would lose ticks
1203 // during GC, which can lead to unfair scheduling (a thread hogs
1204 // the CPU because the tick always arrives during GC). This way
1205 // penalises threads that do a lot of allocation, but that seems
1206 // better than the alternative.
1207 if (cap->context_switch != 0) {
1208 cap->context_switch = 0;
1209 appendToRunQueue(cap,t);
1210 } else {
1211 pushOnRunQueue(cap,t);
1212 }
1213
1214 IF_DEBUG(sanity,
1215 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1216 checkTSO(t));
1217
1218 return rtsFalse;
1219 }
1220
1221 /* -----------------------------------------------------------------------------
1222 * Handle a thread that returned to the scheduler with ThreadBlocked
1223 * -------------------------------------------------------------------------- */
1224
1225 static void
1226 scheduleHandleThreadBlocked( StgTSO *t
1227 #if !defined(DEBUG)
1228 STG_UNUSED
1229 #endif
1230 )
1231 {
1232
1233 // We don't need to do anything. The thread is blocked, and it
1234 // has tidied up its stack and placed itself on whatever queue
1235 // it needs to be on.
1236
1237 // ASSERT(t->why_blocked != NotBlocked);
1238 // Not true: for example,
1239 // - the thread may have woken itself up already, because
1240 // threadPaused() might have raised a blocked throwTo
1241 // exception, see maybePerformBlockedException().
1242
1243 #ifdef DEBUG
1244 traceThreadStatus(DEBUG_sched, t);
1245 #endif
1246 }
1247
1248 /* -----------------------------------------------------------------------------
1249 * Handle a thread that returned to the scheduler with ThreadFinished
1250 * -------------------------------------------------------------------------- */
1251
1252 static rtsBool
1253 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1254 {
1255 /* Need to check whether this was a main thread, and if so,
1256 * return with the return value.
1257 *
1258 * We also end up here if the thread kills itself with an
1259 * uncaught exception, see Exception.cmm.
1260 */
1261
1262 // blocked exceptions can now complete, even if the thread was in
1263 // blocked mode (see #2910).
1264 awakenBlockedExceptionQueue (cap, t);
1265
1266 //
1267 // Check whether the thread that just completed was a bound
1268 // thread, and if so return with the result.
1269 //
1270 // There is an assumption here that all thread completion goes
1271 // through this point; we need to make sure that if a thread
1272 // ends up in the ThreadKilled state, that it stays on the run
1273 // queue so it can be dealt with here.
1274 //
1275
1276 if (t->bound) {
1277
1278 if (t->bound != task->incall) {
1279 #if !defined(THREADED_RTS)
1280 // Must be a bound thread that is not the topmost one. Leave
1281 // it on the run queue until the stack has unwound to the
1282 // point where we can deal with this. Leaving it on the run
1283 // queue also ensures that the garbage collector knows about
1284 // this thread and its return value (it gets dropped from the
1285 // step->threads list so there's no other way to find it).
1286 appendToRunQueue(cap,t);
1287 return rtsFalse;
1288 #else
1289 // this cannot happen in the threaded RTS, because a
1290 // bound thread can only be run by the appropriate Task.
1291 barf("finished bound thread that isn't mine");
1292 #endif
1293 }
1294
1295 ASSERT(task->incall->tso == t);
1296
1297 if (t->what_next == ThreadComplete) {
1298 if (task->incall->ret) {
1299 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1300 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1301 }
1302 task->incall->stat = Success;
1303 } else {
1304 if (task->incall->ret) {
1305 *(task->incall->ret) = NULL;
1306 }
1307 if (sched_state >= SCHED_INTERRUPTING) {
1308 if (heap_overflow) {
1309 task->incall->stat = HeapExhausted;
1310 } else {
1311 task->incall->stat = Interrupted;
1312 }
1313 } else {
1314 task->incall->stat = Killed;
1315 }
1316 }
1317 #ifdef DEBUG
1318 removeThreadLabel((StgWord)task->incall->tso->id);
1319 #endif
1320
1321 // We no longer consider this thread and task to be bound to
1322 // each other. The TSO lives on until it is GC'd, but the
1323 // task is about to be released by the caller, and we don't
1324 // want anyone following the pointer from the TSO to the
1325 // defunct task (which might have already been
1326 // re-used). This was a real bug: the GC updated
1327 // tso->bound->tso which lead to a deadlock.
1328 t->bound = NULL;
1329 task->incall->tso = NULL;
1330
1331 return rtsTrue; // tells schedule() to return
1332 }
1333
1334 return rtsFalse;
1335 }
1336
1337 /* -----------------------------------------------------------------------------
1338 * Perform a heap census
1339 * -------------------------------------------------------------------------- */
1340
1341 static rtsBool
1342 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1343 {
1344 // When we have +RTS -i0 and we're heap profiling, do a census at
1345 // every GC. This lets us get repeatable runs for debugging.
1346 if (performHeapProfile ||
1347 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1348 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1349 return rtsTrue;
1350 } else {
1351 return rtsFalse;
1352 }
1353 }
1354
1355 /* -----------------------------------------------------------------------------
1356 * Start a synchronisation of all capabilities
1357 * -------------------------------------------------------------------------- */
1358
1359 // Returns:
1360 // 0 if we successfully got a sync
1361 // non-0 if there was another sync request in progress,
1362 // and we yielded to it. The value returned is the
1363 // type of the other sync request.
1364 //
1365 #if defined(THREADED_RTS)
1366 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1367 {
1368 nat prev_pending_sync;
1369
1370 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1371
1372 if (prev_pending_sync)
1373 {
1374 do {
1375 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1376 prev_pending_sync);
1377 ASSERT(*pcap);
1378 yieldCapability(pcap,task,rtsTrue);
1379 } while (pending_sync);
1380 return prev_pending_sync; // NOTE: task->cap might have changed now
1381 }
1382 else
1383 {
1384 return 0;
1385 }
1386 }
1387
1388 //
1389 // Grab all the capabilities except the one we already hold. Used
1390 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1391 // before a fork (SYNC_OTHER).
1392 //
1393 // Only call this after requestSync(), otherwise a deadlock might
1394 // ensue if another thread is trying to synchronise.
1395 //
1396 static void acquireAllCapabilities(Capability *cap, Task *task)
1397 {
1398 Capability *tmpcap;
1399 nat i;
1400
1401 for (i=0; i < n_capabilities; i++) {
1402 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1403 tmpcap = capabilities[i];
1404 if (tmpcap != cap) {
1405 // we better hope this task doesn't get migrated to
1406 // another Capability while we're waiting for this one.
1407 // It won't, because load balancing happens while we have
1408 // all the Capabilities, but even so it's a slightly
1409 // unsavoury invariant.
1410 task->cap = tmpcap;
1411 waitForCapability(&tmpcap, task);
1412 if (tmpcap->no != i) {
1413 barf("acquireAllCapabilities: got the wrong capability");
1414 }
1415 }
1416 }
1417 task->cap = cap;
1418 }
1419
1420 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1421 {
1422 nat i;
1423
1424 for (i = 0; i < n; i++) {
1425 if (cap->no != i) {
1426 task->cap = capabilities[i];
1427 releaseCapability(capabilities[i]);
1428 }
1429 }
1430 task->cap = cap;
1431 }
1432 #endif
1433
1434 /* -----------------------------------------------------------------------------
1435 * Perform a garbage collection if necessary
1436 * -------------------------------------------------------------------------- */
1437
1438 static void
1439 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1440 rtsBool force_major)
1441 {
1442 Capability *cap = *pcap;
1443 rtsBool heap_census;
1444 nat collect_gen;
1445 rtsBool major_gc;
1446 #ifdef THREADED_RTS
1447 nat gc_type;
1448 nat i, sync;
1449 StgTSO *tso;
1450 #endif
1451
1452 if (sched_state == SCHED_SHUTTING_DOWN) {
1453 // The final GC has already been done, and the system is
1454 // shutting down. We'll probably deadlock if we try to GC
1455 // now.
1456 return;
1457 }
1458
1459 heap_census = scheduleNeedHeapProfile(rtsTrue);
1460
1461 // Figure out which generation we are collecting, so that we can
1462 // decide whether this is a parallel GC or not.
1463 collect_gen = calcNeeded(force_major || heap_census, NULL);
1464 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1465
1466 #ifdef THREADED_RTS
1467 if (sched_state < SCHED_INTERRUPTING
1468 && RtsFlags.ParFlags.parGcEnabled
1469 && collect_gen >= RtsFlags.ParFlags.parGcGen
1470 && ! oldest_gen->mark)
1471 {
1472 gc_type = SYNC_GC_PAR;
1473 } else {
1474 gc_type = SYNC_GC_SEQ;
1475 }
1476
1477 // In order to GC, there must be no threads running Haskell code.
1478 // Therefore, the GC thread needs to hold *all* the capabilities,
1479 // and release them after the GC has completed.
1480 //
1481 // This seems to be the simplest way: previous attempts involved
1482 // making all the threads with capabilities give up their
1483 // capabilities and sleep except for the *last* one, which
1484 // actually did the GC. But it's quite hard to arrange for all
1485 // the other tasks to sleep and stay asleep.
1486 //
1487
1488 /* Other capabilities are prevented from running yet more Haskell
1489 threads if pending_sync is set. Tested inside
1490 yieldCapability() and releaseCapability() in Capability.c */
1491
1492 do {
1493 sync = requestSync(pcap, task, gc_type);
1494 cap = *pcap;
1495 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1496 // someone else had a pending sync request for a GC, so
1497 // let's assume GC has been done and we don't need to GC
1498 // again.
1499 return;
1500 }
1501 if (sched_state == SCHED_SHUTTING_DOWN) {
1502 // The scheduler might now be shutting down. We tested
1503 // this above, but it might have become true since then as
1504 // we yielded the capability in requestSync().
1505 return;
1506 }
1507 } while (sync);
1508
1509 // don't declare this until after we have sync'd, because
1510 // n_capabilities may change.
1511 rtsBool idle_cap[n_capabilities];
1512 #ifdef DEBUG
1513 unsigned int old_n_capabilities = n_capabilities;
1514 #endif
1515
1516 interruptAllCapabilities();
1517
1518 // The final shutdown GC is always single-threaded, because it's
1519 // possible that some of the Capabilities have no worker threads.
1520
1521 if (gc_type == SYNC_GC_SEQ)
1522 {
1523 traceEventRequestSeqGc(cap);
1524 }
1525 else
1526 {
1527 traceEventRequestParGc(cap);
1528 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1529 }
1530
1531 if (gc_type == SYNC_GC_SEQ)
1532 {
1533 // single-threaded GC: grab all the capabilities
1534 acquireAllCapabilities(cap,task);
1535 }
1536 else
1537 {
1538 // If we are load-balancing collections in this
1539 // generation, then we require all GC threads to participate
1540 // in the collection. Otherwise, we only require active
1541 // threads to participate, and we set gc_threads[i]->idle for
1542 // any idle capabilities. The rationale here is that waking
1543 // up an idle Capability takes much longer than just doing any
1544 // GC work on its behalf.
1545
1546 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1547 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1548 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1549 for (i=0; i < n_capabilities; i++) {
1550 if (capabilities[i]->disabled) {
1551 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1552 } else {
1553 idle_cap[i] = rtsFalse;
1554 }
1555 }
1556 } else {
1557 for (i=0; i < n_capabilities; i++) {
1558 if (capabilities[i]->disabled) {
1559 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1560 } else if (i == cap->no ||
1561 capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1562 idle_cap[i] = rtsFalse;
1563 } else {
1564 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1565 if (!idle_cap[i]) {
1566 n_failed_trygrab_idles++;
1567 } else {
1568 n_idle_caps++;
1569 }
1570 }
1571 }
1572 }
1573
1574 // We set the gc_thread[i]->idle flag if that
1575 // capability/thread is not participating in this collection.
1576 // We also keep a local record of which capabilities are idle
1577 // in idle_cap[], because scheduleDoGC() is re-entrant:
1578 // another thread might start a GC as soon as we've finished
1579 // this one, and thus the gc_thread[]->idle flags are invalid
1580 // as soon as we release any threads after GC. Getting this
1581 // wrong leads to a rare and hard to debug deadlock!
1582
1583 for (i=0; i < n_capabilities; i++) {
1584 gc_threads[i]->idle = idle_cap[i];
1585 capabilities[i]->idle++;
1586 }
1587
1588 // For all capabilities participating in this GC, wait until
1589 // they have stopped mutating and are standing by for GC.
1590 waitForGcThreads(cap);
1591
1592 #if defined(THREADED_RTS)
1593 // Stable point where we can do a global check on our spark counters
1594 ASSERT(checkSparkCountInvariant());
1595 #endif
1596 }
1597
1598 #endif
1599
1600 IF_DEBUG(scheduler, printAllThreads());
1601
1602 delete_threads_and_gc:
1603 /*
1604 * We now have all the capabilities; if we're in an interrupting
1605 * state, then we should take the opportunity to delete all the
1606 * threads in the system.
1607 * Checking for major_gc ensures that the last GC is major.
1608 */
1609 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1610 deleteAllThreads(cap);
1611 #if defined(THREADED_RTS)
1612 // Discard all the sparks from every Capability. Why?
1613 // They'll probably be GC'd anyway since we've killed all the
1614 // threads. It just avoids the GC having to do any work to
1615 // figure out that any remaining sparks are garbage.
1616 for (i = 0; i < n_capabilities; i++) {
1617 capabilities[i]->spark_stats.gcd +=
1618 sparkPoolSize(capabilities[i]->sparks);
1619 // No race here since all Caps are stopped.
1620 discardSparksCap(capabilities[i]);
1621 }
1622 #endif
1623 sched_state = SCHED_SHUTTING_DOWN;
1624 }
1625
1626 /*
1627 * When there are disabled capabilities, we want to migrate any
1628 * threads away from them. Normally this happens in the
1629 * scheduler's loop, but only for unbound threads - it's really
1630 * hard for a bound thread to migrate itself. So we have another
1631 * go here.
1632 */
1633 #if defined(THREADED_RTS)
1634 for (i = enabled_capabilities; i < n_capabilities; i++) {
1635 Capability *tmp_cap, *dest_cap;
1636 tmp_cap = capabilities[i];
1637 ASSERT(tmp_cap->disabled);
1638 if (i != cap->no) {
1639 dest_cap = capabilities[i % enabled_capabilities];
1640 while (!emptyRunQueue(tmp_cap)) {
1641 tso = popRunQueue(tmp_cap);
1642 migrateThread(tmp_cap, tso, dest_cap);
1643 if (tso->bound) {
1644 traceTaskMigrate(tso->bound->task,
1645 tso->bound->task->cap,
1646 dest_cap);
1647 tso->bound->task->cap = dest_cap;
1648 }
1649 }
1650 }
1651 }
1652 #endif
1653
1654 #if defined(THREADED_RTS)
1655 // reset pending_sync *before* GC, so that when the GC threads
1656 // emerge they don't immediately re-enter the GC.
1657 pending_sync = 0;
1658 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1659 #else
1660 GarbageCollect(collect_gen, heap_census, 0, cap);
1661 #endif
1662
1663 traceSparkCounters(cap);
1664
1665 switch (recent_activity) {
1666 case ACTIVITY_INACTIVE:
1667 if (force_major) {
1668 // We are doing a GC because the system has been idle for a
1669 // timeslice and we need to check for deadlock. Record the
1670 // fact that we've done a GC and turn off the timer signal;
1671 // it will get re-enabled if we run any threads after the GC.
1672 recent_activity = ACTIVITY_DONE_GC;
1673 #ifndef PROFILING
1674 stopTimer();
1675 #endif
1676 break;
1677 }
1678 // fall through...
1679
1680 case ACTIVITY_MAYBE_NO:
1681 // the GC might have taken long enough for the timer to set
1682 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1683 // but we aren't necessarily deadlocked:
1684 recent_activity = ACTIVITY_YES;
1685 break;
1686
1687 case ACTIVITY_DONE_GC:
1688 // If we are actually active, the scheduler will reset the
1689 // recent_activity flag and re-enable the timer.
1690 break;
1691 }
1692
1693 #if defined(THREADED_RTS)
1694 // Stable point where we can do a global check on our spark counters
1695 ASSERT(checkSparkCountInvariant());
1696 #endif
1697
1698 // The heap census itself is done during GarbageCollect().
1699 if (heap_census) {
1700 performHeapProfile = rtsFalse;
1701 }
1702
1703 #if defined(THREADED_RTS)
1704
1705 // If n_capabilities has changed during GC, we're in trouble.
1706 ASSERT(n_capabilities == old_n_capabilities);
1707
1708 if (gc_type == SYNC_GC_PAR)
1709 {
1710 releaseGCThreads(cap);
1711 for (i = 0; i < n_capabilities; i++) {
1712 if (i != cap->no) {
1713 if (idle_cap[i]) {
1714 ASSERT(capabilities[i]->running_task == task);
1715 task->cap = capabilities[i];
1716 releaseCapability(capabilities[i]);
1717 } else {
1718 ASSERT(capabilities[i]->running_task != task);
1719 }
1720 }
1721 }
1722 task->cap = cap;
1723 }
1724 #endif
1725
1726 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1727 // GC set the heap_overflow flag, so we should proceed with
1728 // an orderly shutdown now. Ultimately we want the main
1729 // thread to return to its caller with HeapExhausted, at which
1730 // point the caller should call hs_exit(). The first step is
1731 // to delete all the threads.
1732 //
1733 // Another way to do this would be to raise an exception in
1734 // the main thread, which we really should do because it gives
1735 // the program a chance to clean up. But how do we find the
1736 // main thread? It should presumably be the same one that
1737 // gets ^C exceptions, but that's all done on the Haskell side
1738 // (GHC.TopHandler).
1739 sched_state = SCHED_INTERRUPTING;
1740 goto delete_threads_and_gc;
1741 }
1742
1743 #ifdef SPARKBALANCE
1744 /* JB
1745 Once we are all together... this would be the place to balance all
1746 spark pools. No concurrent stealing or adding of new sparks can
1747 occur. Should be defined in Sparks.c. */
1748 balanceSparkPoolsCaps(n_capabilities, capabilities);
1749 #endif
1750
1751 #if defined(THREADED_RTS)
1752 if (gc_type == SYNC_GC_SEQ) {
1753 // release our stash of capabilities.
1754 releaseAllCapabilities(n_capabilities, cap, task);
1755 }
1756 #endif
1757
1758 return;
1759 }
1760
1761 /* ---------------------------------------------------------------------------
1762 * Singleton fork(). Do not copy any running threads.
1763 * ------------------------------------------------------------------------- */
1764
1765 pid_t
1766 forkProcess(HsStablePtr *entry
1767 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1768 STG_UNUSED
1769 #endif
1770 )
1771 {
1772 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1773 pid_t pid;
1774 StgTSO* t,*next;
1775 Capability *cap;
1776 nat g;
1777 Task *task = NULL;
1778 nat i;
1779 #ifdef THREADED_RTS
1780 nat sync;
1781 #endif
1782
1783 debugTrace(DEBUG_sched, "forking!");
1784
1785 task = newBoundTask();
1786
1787 cap = NULL;
1788 waitForCapability(&cap, task);
1789
1790 #ifdef THREADED_RTS
1791 do {
1792 sync = requestSync(&cap, task, SYNC_OTHER);
1793 } while (sync);
1794
1795 acquireAllCapabilities(cap,task);
1796
1797 pending_sync = 0;
1798 #endif
1799
1800 // no funny business: hold locks while we fork, otherwise if some
1801 // other thread is holding a lock when the fork happens, the data
1802 // structure protected by the lock will forever be in an
1803 // inconsistent state in the child. See also #1391.
1804 ACQUIRE_LOCK(&sched_mutex);
1805 ACQUIRE_LOCK(&sm_mutex);
1806 ACQUIRE_LOCK(&stable_mutex);
1807 ACQUIRE_LOCK(&task->lock);
1808
1809 for (i=0; i < n_capabilities; i++) {
1810 ACQUIRE_LOCK(&capabilities[i]->lock);
1811 }
1812
1813 #ifdef THREADED_RTS
1814 ACQUIRE_LOCK(&all_tasks_mutex);
1815 #endif
1816
1817 stopTimer(); // See #4074
1818
1819 #if defined(TRACING)
1820 flushEventLog(); // so that child won't inherit dirty file buffers
1821 #endif
1822
1823 pid = fork();
1824
1825 if (pid) { // parent
1826
1827 startTimer(); // #4074
1828
1829 RELEASE_LOCK(&sched_mutex);
1830 RELEASE_LOCK(&sm_mutex);
1831 RELEASE_LOCK(&stable_mutex);
1832 RELEASE_LOCK(&task->lock);
1833
1834 for (i=0; i < n_capabilities; i++) {
1835 releaseCapability_(capabilities[i],rtsFalse);
1836 RELEASE_LOCK(&capabilities[i]->lock);
1837 }
1838
1839 #ifdef THREADED_RTS
1840 RELEASE_LOCK(&all_tasks_mutex);
1841 #endif
1842
1843 boundTaskExiting(task);
1844
1845 // just return the pid
1846 return pid;
1847
1848 } else { // child
1849
1850 #if defined(THREADED_RTS)
1851 initMutex(&sched_mutex);
1852 initMutex(&sm_mutex);
1853 initMutex(&stable_mutex);
1854 initMutex(&task->lock);
1855
1856 for (i=0; i < n_capabilities; i++) {
1857 initMutex(&capabilities[i]->lock);
1858 }
1859
1860 initMutex(&all_tasks_mutex);
1861 #endif
1862
1863 #ifdef TRACING
1864 resetTracing();
1865 #endif
1866
1867 // Now, all OS threads except the thread that forked are
1868 // stopped. We need to stop all Haskell threads, including
1869 // those involved in foreign calls. Also we need to delete
1870 // all Tasks, because they correspond to OS threads that are
1871 // now gone.
1872
1873 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1874 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1875 next = t->global_link;
1876 // don't allow threads to catch the ThreadKilled
1877 // exception, but we do want to raiseAsync() because these
1878 // threads may be evaluating thunks that we need later.
1879 deleteThread_(t->cap,t);
1880
1881 // stop the GC from updating the InCall to point to
1882 // the TSO. This is only necessary because the
1883 // OSThread bound to the TSO has been killed, and
1884 // won't get a chance to exit in the usual way (see
1885 // also scheduleHandleThreadFinished).
1886 t->bound = NULL;
1887 }
1888 }
1889
1890 discardTasksExcept(task);
1891
1892 for (i=0; i < n_capabilities; i++) {
1893 cap = capabilities[i];
1894
1895 // Empty the run queue. It seems tempting to let all the
1896 // killed threads stay on the run queue as zombies to be
1897 // cleaned up later, but some of them may correspond to
1898 // bound threads for which the corresponding Task does not
1899 // exist.
1900 truncateRunQueue(cap);
1901
1902 // Any suspended C-calling Tasks are no more, their OS threads
1903 // don't exist now:
1904 cap->suspended_ccalls = NULL;
1905
1906 #if defined(THREADED_RTS)
1907 // Wipe our spare workers list, they no longer exist. New
1908 // workers will be created if necessary.
1909 cap->spare_workers = NULL;
1910 cap->n_spare_workers = 0;
1911 cap->returning_tasks_hd = NULL;
1912 cap->returning_tasks_tl = NULL;
1913 #endif
1914
1915 // Release all caps except 0, we'll use that for starting
1916 // the IO manager and running the client action below.
1917 if (cap->no != 0) {
1918 task->cap = cap;
1919 releaseCapability(cap);
1920 }
1921 }
1922 cap = capabilities[0];
1923 task->cap = cap;
1924
1925 // Empty the threads lists. Otherwise, the garbage
1926 // collector may attempt to resurrect some of these threads.
1927 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1928 generations[g].threads = END_TSO_QUEUE;
1929 }
1930
1931 // On Unix, all timers are reset in the child, so we need to start
1932 // the timer again.
1933 initTimer();
1934 startTimer();
1935
1936 // TODO: need to trace various other things in the child
1937 // like startup event, capabilities, process info etc
1938 traceTaskCreate(task, cap);
1939
1940 #if defined(THREADED_RTS)
1941 ioManagerStartCap(&cap);
1942 #endif
1943
1944 rts_evalStableIO(&cap, entry, NULL); // run the action
1945 rts_checkSchedStatus("forkProcess",cap);
1946
1947 rts_unlock(cap);
1948 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
1949 }
1950 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1951 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1952 #endif
1953 }
1954
1955 /* ---------------------------------------------------------------------------
1956 * Changing the number of Capabilities
1957 *
1958 * Changing the number of Capabilities is very tricky! We can only do
1959 * it with the system fully stopped, so we do a full sync with
1960 * requestSync(SYNC_OTHER) and grab all the capabilities.
1961 *
1962 * Then we resize the appropriate data structures, and update all
1963 * references to the old data structures which have now moved.
1964 * Finally we release the Capabilities we are holding, and start
1965 * worker Tasks on the new Capabilities we created.
1966 *
1967 * ------------------------------------------------------------------------- */
1968
1969 void
1970 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1971 {
1972 #if !defined(THREADED_RTS)
1973 if (new_n_capabilities != 1) {
1974 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1975 }
1976 return;
1977 #elif defined(NOSMP)
1978 if (new_n_capabilities != 1) {
1979 errorBelch("setNumCapabilities: not supported on this platform");
1980 }
1981 return;
1982 #else
1983 Task *task;
1984 Capability *cap;
1985 nat sync;
1986 nat n;
1987 Capability *old_capabilities = NULL;
1988 nat old_n_capabilities = n_capabilities;
1989
1990 if (new_n_capabilities == enabled_capabilities) return;
1991
1992 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1993 enabled_capabilities, new_n_capabilities);
1994
1995 cap = rts_lock();
1996 task = cap->running_task;
1997
1998 do {
1999 sync = requestSync(&cap, task, SYNC_OTHER);
2000 } while (sync);
2001
2002 acquireAllCapabilities(cap,task);
2003
2004 pending_sync = 0;
2005
2006 if (new_n_capabilities < enabled_capabilities)
2007 {
2008 // Reducing the number of capabilities: we do not actually
2009 // remove the extra capabilities, we just mark them as
2010 // "disabled". This has the following effects:
2011 //
2012 // - threads on a disabled capability are migrated away by the
2013 // scheduler loop
2014 //
2015 // - disabled capabilities do not participate in GC
2016 // (see scheduleDoGC())
2017 //
2018 // - No spark threads are created on this capability
2019 // (see scheduleActivateSpark())
2020 //
2021 // - We do not attempt to migrate threads *to* a disabled
2022 // capability (see schedulePushWork()).
2023 //
2024 // but in other respects, a disabled capability remains
2025 // alive. Threads may be woken up on a disabled capability,
2026 // but they will be immediately migrated away.
2027 //
2028 // This approach is much easier than trying to actually remove
2029 // the capability; we don't have to worry about GC data
2030 // structures, the nursery, etc.
2031 //
2032 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2033 capabilities[n]->disabled = rtsTrue;
2034 traceCapDisable(capabilities[n]);
2035 }
2036 enabled_capabilities = new_n_capabilities;
2037 }
2038 else
2039 {
2040 // Increasing the number of enabled capabilities.
2041 //
2042 // enable any disabled capabilities, up to the required number
2043 for (n = enabled_capabilities;
2044 n < new_n_capabilities && n < n_capabilities; n++) {
2045 capabilities[n]->disabled = rtsFalse;
2046 traceCapEnable(capabilities[n]);
2047 }
2048 enabled_capabilities = n;
2049
2050 if (new_n_capabilities > n_capabilities) {
2051 #if defined(TRACING)
2052 // Allocate eventlog buffers for the new capabilities. Note this
2053 // must be done before calling moreCapabilities(), because that
2054 // will emit events about creating the new capabilities and adding
2055 // them to existing capsets.
2056 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2057 #endif
2058
2059 // Resize the capabilities array
2060 // NB. after this, capabilities points somewhere new. Any pointers
2061 // of type (Capability *) are now invalid.
2062 moreCapabilities(n_capabilities, new_n_capabilities);
2063
2064 // Resize and update storage manager data structures
2065 storageAddCapabilities(n_capabilities, new_n_capabilities);
2066 }
2067 }
2068
2069 // update n_capabilities before things start running
2070 if (new_n_capabilities > n_capabilities) {
2071 n_capabilities = enabled_capabilities = new_n_capabilities;
2072 }
2073
2074 // Start worker tasks on the new Capabilities
2075 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2076
2077 // We're done: release the original Capabilities
2078 releaseAllCapabilities(old_n_capabilities, cap,task);
2079
2080 // We can't free the old array until now, because we access it
2081 // while updating pointers in updateCapabilityRefs().
2082 if (old_capabilities) {
2083 stgFree(old_capabilities);
2084 }
2085
2086 // Notify IO manager that the number of capabilities has changed.
2087 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2088
2089 rts_unlock(cap);
2090
2091 #endif // THREADED_RTS
2092 }
2093
2094
2095
2096 /* ---------------------------------------------------------------------------
2097 * Delete all the threads in the system
2098 * ------------------------------------------------------------------------- */
2099
2100 static void
2101 deleteAllThreads ( Capability *cap )
2102 {
2103 // NOTE: only safe to call if we own all capabilities.
2104
2105 StgTSO* t, *next;
2106 nat g;
2107
2108 debugTrace(DEBUG_sched,"deleting all threads");
2109 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2110 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2111 next = t->global_link;
2112 deleteThread(cap,t);
2113 }
2114 }
2115
2116 // The run queue now contains a bunch of ThreadKilled threads. We
2117 // must not throw these away: the main thread(s) will be in there
2118 // somewhere, and the main scheduler loop has to deal with it.
2119 // Also, the run queue is the only thing keeping these threads from
2120 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2121
2122 #if !defined(THREADED_RTS)
2123 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2124 ASSERT(sleeping_queue == END_TSO_QUEUE);
2125 #endif
2126 }
2127
2128 /* -----------------------------------------------------------------------------
2129 Managing the suspended_ccalls list.
2130 Locks required: sched_mutex
2131 -------------------------------------------------------------------------- */
2132
2133 STATIC_INLINE void
2134 suspendTask (Capability *cap, Task *task)
2135 {
2136 InCall *incall;
2137
2138 incall = task->incall;
2139 ASSERT(incall->next == NULL && incall->prev == NULL);
2140 incall->next = cap->suspended_ccalls;
2141 incall->prev = NULL;
2142 if (cap->suspended_ccalls) {
2143 cap->suspended_ccalls->prev = incall;
2144 }
2145 cap->suspended_ccalls = incall;
2146 }
2147
2148 STATIC_INLINE void
2149 recoverSuspendedTask (Capability *cap, Task *task)
2150 {
2151 InCall *incall;
2152
2153 incall = task->incall;
2154 if (incall->prev) {
2155 incall->prev->next = incall->next;
2156 } else {
2157 ASSERT(cap->suspended_ccalls == incall);
2158 cap->suspended_ccalls = incall->next;
2159 }
2160 if (incall->next) {
2161 incall->next->prev = incall->prev;
2162 }
2163 incall->next = incall->prev = NULL;
2164 }
2165
2166 /* ---------------------------------------------------------------------------
2167 * Suspending & resuming Haskell threads.
2168 *
2169 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2170 * its capability before calling the C function. This allows another
2171 * task to pick up the capability and carry on running Haskell
2172 * threads. It also means that if the C call blocks, it won't lock
2173 * the whole system.
2174 *
2175 * The Haskell thread making the C call is put to sleep for the
2176 * duration of the call, on the suspended_ccalling_threads queue. We
2177 * give out a token to the task, which it can use to resume the thread
2178 * on return from the C function.
2179 *
2180 * If this is an interruptible C call, this means that the FFI call may be
2181 * unceremoniously terminated and should be scheduled on an
2182 * unbound worker thread.
2183 * ------------------------------------------------------------------------- */
2184
2185 void *
2186 suspendThread (StgRegTable *reg, rtsBool interruptible)
2187 {
2188 Capability *cap;
2189 int saved_errno;
2190 StgTSO *tso;
2191 Task *task;
2192 #if mingw32_HOST_OS
2193 StgWord32 saved_winerror;
2194 #endif
2195
2196 saved_errno = errno;
2197 #if mingw32_HOST_OS
2198 saved_winerror = GetLastError();
2199 #endif
2200
2201 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2202 */
2203 cap = regTableToCapability(reg);
2204
2205 task = cap->running_task;
2206 tso = cap->r.rCurrentTSO;
2207
2208 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2209
2210 // XXX this might not be necessary --SDM
2211 tso->what_next = ThreadRunGHC;
2212
2213 threadPaused(cap,tso);
2214
2215 if (interruptible) {
2216 tso->why_blocked = BlockedOnCCall_Interruptible;
2217 } else {
2218 tso->why_blocked = BlockedOnCCall;
2219 }
2220
2221 // Hand back capability
2222 task->incall->suspended_tso = tso;
2223 task->incall->suspended_cap = cap;
2224
2225 // Otherwise allocate() will write to invalid memory.
2226 cap->r.rCurrentTSO = NULL;
2227
2228 ACQUIRE_LOCK(&cap->lock);
2229
2230 suspendTask(cap,task);
2231 cap->in_haskell = rtsFalse;
2232 releaseCapability_(cap,rtsFalse);
2233
2234 RELEASE_LOCK(&cap->lock);
2235
2236 errno = saved_errno;
2237 #if mingw32_HOST_OS
2238 SetLastError(saved_winerror);
2239 #endif
2240 return task;
2241 }
2242
2243 StgRegTable *
2244 resumeThread (void *task_)
2245 {
2246 StgTSO *tso;
2247 InCall *incall;
2248 Capability *cap;
2249 Task *task = task_;
2250 int saved_errno;
2251 #if mingw32_HOST_OS
2252 StgWord32 saved_winerror;
2253 #endif
2254
2255 saved_errno = errno;
2256 #if mingw32_HOST_OS
2257 saved_winerror = GetLastError();
2258 #endif
2259
2260 incall = task->incall;
2261 cap = incall->suspended_cap;
2262 task->cap = cap;
2263
2264 // Wait for permission to re-enter the RTS with the result.
2265 waitForCapability(&cap,task);
2266 // we might be on a different capability now... but if so, our
2267 // entry on the suspended_ccalls list will also have been
2268 // migrated.
2269
2270 // Remove the thread from the suspended list
2271 recoverSuspendedTask(cap,task);
2272
2273 tso = incall->suspended_tso;
2274 incall->suspended_tso = NULL;
2275 incall->suspended_cap = NULL;
2276 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2277
2278 traceEventRunThread(cap, tso);
2279
2280 /* Reset blocking status */
2281 tso->why_blocked = NotBlocked;
2282
2283 if ((tso->flags & TSO_BLOCKEX) == 0) {
2284 // avoid locking the TSO if we don't have to
2285 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2286 maybePerformBlockedException(cap,tso);
2287 }
2288 }
2289
2290 cap->r.rCurrentTSO = tso;
2291 cap->in_haskell = rtsTrue;
2292 errno = saved_errno;
2293 #if mingw32_HOST_OS
2294 SetLastError(saved_winerror);
2295 #endif
2296
2297 /* We might have GC'd, mark the TSO dirty again */
2298 dirty_TSO(cap,tso);
2299 dirty_STACK(cap,tso->stackobj);
2300
2301 IF_DEBUG(sanity, checkTSO(tso));
2302
2303 return &cap->r;
2304 }
2305
2306 /* ---------------------------------------------------------------------------
2307 * scheduleThread()
2308 *
2309 * scheduleThread puts a thread on the end of the runnable queue.
2310 * This will usually be done immediately after a thread is created.
2311 * The caller of scheduleThread must create the thread using e.g.
2312 * createThread and push an appropriate closure
2313 * on this thread's stack before the scheduler is invoked.
2314 * ------------------------------------------------------------------------ */
2315
2316 void
2317 scheduleThread(Capability *cap, StgTSO *tso)
2318 {
2319 // The thread goes at the *end* of the run-queue, to avoid possible
2320 // starvation of any threads already on the queue.
2321 appendToRunQueue(cap,tso);
2322 }
2323
2324 void
2325 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2326 {
2327 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2328 // move this thread from now on.
2329 #if defined(THREADED_RTS)
2330 cpu %= enabled_capabilities;
2331 if (cpu == cap->no) {
2332 appendToRunQueue(cap,tso);
2333 } else {
2334 migrateThread(cap, tso, capabilities[cpu]);
2335 }
2336 #else
2337 appendToRunQueue(cap,tso);
2338 #endif
2339 }
2340
2341 void
2342 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2343 {
2344 Task *task;
2345 DEBUG_ONLY( StgThreadID id );
2346 Capability *cap;
2347
2348 cap = *pcap;
2349
2350 // We already created/initialised the Task
2351 task = cap->running_task;
2352
2353 // This TSO is now a bound thread; make the Task and TSO
2354 // point to each other.
2355 tso->bound = task->incall;
2356 tso->cap = cap;
2357
2358 task->incall->tso = tso;
2359 task->incall->ret = ret;
2360 task->incall->stat = NoStatus;
2361
2362 appendToRunQueue(cap,tso);
2363
2364 DEBUG_ONLY( id = tso->id );
2365 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2366
2367 cap = schedule(cap,task);
2368
2369 ASSERT(task->incall->stat != NoStatus);
2370 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2371
2372 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2373 *pcap = cap;
2374 }
2375
2376 /* ----------------------------------------------------------------------------
2377 * Starting Tasks
2378 * ------------------------------------------------------------------------- */
2379
2380 #if defined(THREADED_RTS)
2381 void scheduleWorker (Capability *cap, Task *task)
2382 {
2383 // schedule() runs without a lock.
2384 cap = schedule(cap,task);
2385
2386 // On exit from schedule(), we have a Capability, but possibly not
2387 // the same one we started with.
2388
2389 // During shutdown, the requirement is that after all the
2390 // Capabilities are shut down, all workers that are shutting down
2391 // have finished workerTaskStop(). This is why we hold on to
2392 // cap->lock until we've finished workerTaskStop() below.
2393 //
2394 // There may be workers still involved in foreign calls; those
2395 // will just block in waitForCapability() because the
2396 // Capability has been shut down.
2397 //
2398 ACQUIRE_LOCK(&cap->lock);
2399 releaseCapability_(cap,rtsFalse);
2400 workerTaskStop(task);
2401 RELEASE_LOCK(&cap->lock);
2402 }
2403 #endif
2404
2405 /* ---------------------------------------------------------------------------
2406 * Start new worker tasks on Capabilities from--to
2407 * -------------------------------------------------------------------------- */
2408
2409 static void
2410 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2411 {
2412 #if defined(THREADED_RTS)
2413 nat i;
2414 Capability *cap;
2415
2416 for (i = from; i < to; i++) {
2417 cap = capabilities[i];
2418 ACQUIRE_LOCK(&cap->lock);
2419 startWorkerTask(cap);
2420 RELEASE_LOCK(&cap->lock);
2421 }
2422 #endif
2423 }
2424
2425 /* ---------------------------------------------------------------------------
2426 * initScheduler()
2427 *
2428 * Initialise the scheduler. This resets all the queues - if the
2429 * queues contained any threads, they'll be garbage collected at the
2430 * next pass.
2431 *
2432 * ------------------------------------------------------------------------ */
2433
2434 void
2435 initScheduler(void)
2436 {
2437 #if !defined(THREADED_RTS)
2438 blocked_queue_hd = END_TSO_QUEUE;
2439 blocked_queue_tl = END_TSO_QUEUE;
2440 sleeping_queue = END_TSO_QUEUE;
2441 #endif
2442
2443 sched_state = SCHED_RUNNING;
2444 recent_activity = ACTIVITY_YES;
2445
2446 #if defined(THREADED_RTS)
2447 /* Initialise the mutex and condition variables used by
2448 * the scheduler. */
2449 initMutex(&sched_mutex);
2450 #endif
2451
2452 ACQUIRE_LOCK(&sched_mutex);
2453
2454 /* A capability holds the state a native thread needs in
2455 * order to execute STG code. At least one capability is
2456 * floating around (only THREADED_RTS builds have more than one).
2457 */
2458 initCapabilities();
2459
2460 initTaskManager();
2461
2462 /*
2463 * Eagerly start one worker to run each Capability, except for
2464 * Capability 0. The idea is that we're probably going to start a
2465 * bound thread on Capability 0 pretty soon, so we don't want a
2466 * worker task hogging it.
2467 */
2468 startWorkerTasks(1, n_capabilities);
2469
2470 RELEASE_LOCK(&sched_mutex);
2471
2472 }
2473
2474 void
2475 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2476 /* see Capability.c, shutdownCapability() */
2477 {
2478 Task *task = NULL;
2479
2480 task = newBoundTask();
2481
2482 // If we haven't killed all the threads yet, do it now.
2483 if (sched_state < SCHED_SHUTTING_DOWN) {
2484 sched_state = SCHED_INTERRUPTING;
2485 Capability *cap = task->cap;
2486 waitForCapability(&cap,task);
2487 scheduleDoGC(&cap,task,rtsTrue);
2488 ASSERT(task->incall->tso == NULL);
2489 releaseCapability(cap);
2490 }
2491 sched_state = SCHED_SHUTTING_DOWN;
2492
2493 shutdownCapabilities(task, wait_foreign);
2494
2495 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2496 // n_failed_trygrab_idles, n_idle_caps);
2497
2498 boundTaskExiting(task);
2499 }
2500
2501 void
2502 freeScheduler( void )
2503 {
2504 nat still_running;
2505
2506 ACQUIRE_LOCK(&sched_mutex);
2507 still_running = freeTaskManager();
2508 // We can only free the Capabilities if there are no Tasks still
2509 // running. We might have a Task about to return from a foreign
2510 // call into waitForCapability(), for example (actually,
2511 // this should be the *only* thing that a still-running Task can
2512 // do at this point, and it will block waiting for the
2513 // Capability).
2514 if (still_running == 0) {
2515 freeCapabilities();
2516 }
2517 RELEASE_LOCK(&sched_mutex);
2518 #if defined(THREADED_RTS)
2519 closeMutex(&sched_mutex);
2520 #endif
2521 }
2522
2523 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2524 void *user USED_IF_NOT_THREADS)
2525 {
2526 #if !defined(THREADED_RTS)
2527 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2528 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2529 evac(user, (StgClosure **)(void *)&sleeping_queue);
2530 #endif
2531 }
2532
2533 /* -----------------------------------------------------------------------------
2534 performGC
2535
2536 This is the interface to the garbage collector from Haskell land.
2537 We provide this so that external C code can allocate and garbage
2538 collect when called from Haskell via _ccall_GC.
2539 -------------------------------------------------------------------------- */
2540
2541 static void
2542 performGC_(rtsBool force_major)
2543 {
2544 Task *task;
2545 Capability *cap = NULL;
2546
2547 // We must grab a new Task here, because the existing Task may be
2548 // associated with a particular Capability, and chained onto the
2549 // suspended_ccalls queue.
2550 task = newBoundTask();
2551
2552 // TODO: do we need to traceTask*() here?
2553
2554 waitForCapability(&cap,task);
2555 scheduleDoGC(&cap,task,force_major);
2556 releaseCapability(cap);
2557 boundTaskExiting(task);
2558 }
2559
2560 void
2561 performGC(void)
2562 {
2563 performGC_(rtsFalse);
2564 }
2565
2566 void
2567 performMajorGC(void)
2568 {
2569 performGC_(rtsTrue);
2570 }
2571
2572 /* ---------------------------------------------------------------------------
2573 Interrupt execution
2574 - usually called inside a signal handler so it mustn't do anything fancy.
2575 ------------------------------------------------------------------------ */
2576
2577 void
2578 interruptStgRts(void)
2579 {
2580 sched_state = SCHED_INTERRUPTING;
2581 interruptAllCapabilities();
2582 #if defined(THREADED_RTS)
2583 wakeUpRts();
2584 #endif
2585 }
2586
2587 /* -----------------------------------------------------------------------------
2588 Wake up the RTS
2589
2590 This function causes at least one OS thread to wake up and run the
2591 scheduler loop. It is invoked when the RTS might be deadlocked, or
2592 an external event has arrived that may need servicing (eg. a
2593 keyboard interrupt).
2594
2595 In the single-threaded RTS we don't do anything here; we only have
2596 one thread anyway, and the event that caused us to want to wake up
2597 will have interrupted any blocking system call in progress anyway.
2598 -------------------------------------------------------------------------- */
2599
2600 #if defined(THREADED_RTS)
2601 void wakeUpRts(void)
2602 {
2603 // This forces the IO Manager thread to wakeup, which will
2604 // in turn ensure that some OS thread wakes up and runs the
2605 // scheduler loop, which will cause a GC and deadlock check.
2606 ioManagerWakeup();
2607 }
2608 #endif
2609
2610 /* -----------------------------------------------------------------------------
2611 Deleting threads
2612
2613 This is used for interruption (^C) and forking, and corresponds to
2614 raising an exception but without letting the thread catch the
2615 exception.
2616 -------------------------------------------------------------------------- */
2617
2618 static void
2619 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2620 {
2621 // NOTE: must only be called on a TSO that we have exclusive
2622 // access to, because we will call throwToSingleThreaded() below.
2623 // The TSO must be on the run queue of the Capability we own, or
2624 // we must own all Capabilities.
2625
2626 if (tso->why_blocked != BlockedOnCCall &&
2627 tso->why_blocked != BlockedOnCCall_Interruptible) {
2628 throwToSingleThreaded(tso->cap,tso,NULL);
2629 }
2630 }
2631
2632 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2633 static void
2634 deleteThread_(Capability *cap, StgTSO *tso)
2635 { // for forkProcess only:
2636 // like deleteThread(), but we delete threads in foreign calls, too.
2637
2638 if (tso->why_blocked == BlockedOnCCall ||
2639 tso->why_blocked == BlockedOnCCall_Interruptible) {
2640 tso->what_next = ThreadKilled;
2641 appendToRunQueue(tso->cap, tso);
2642 } else {
2643 deleteThread(cap,tso);
2644 }
2645 }
2646 #endif
2647
2648 /* -----------------------------------------------------------------------------
2649 raiseExceptionHelper
2650
2651 This function is called by the raise# primitve, just so that we can
2652 move some of the tricky bits of raising an exception from C-- into
2653 C. Who knows, it might be a useful re-useable thing here too.
2654 -------------------------------------------------------------------------- */
2655
2656 StgWord
2657 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2658 {
2659 Capability *cap = regTableToCapability(reg);
2660 StgThunk *raise_closure = NULL;
2661 StgPtr p, next;
2662 StgRetInfoTable *info;
2663 //
2664 // This closure represents the expression 'raise# E' where E
2665 // is the exception raise. It is used to overwrite all the
2666 // thunks which are currently under evaluataion.
2667 //
2668
2669 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2670 // LDV profiling: stg_raise_info has THUNK as its closure
2671 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2672 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2673 // 1 does not cause any problem unless profiling is performed.
2674 // However, when LDV profiling goes on, we need to linearly scan
2675 // small object pool, where raise_closure is stored, so we should
2676 // use MIN_UPD_SIZE.
2677 //
2678 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2679 // sizeofW(StgClosure)+1);
2680 //
2681
2682 //
2683 // Walk up the stack, looking for the catch frame. On the way,
2684 // we update any closures pointed to from update frames with the
2685 // raise closure that we just built.
2686 //
2687 p = tso->stackobj->sp;
2688 while(1) {
2689 info = get_ret_itbl((StgClosure *)p);
2690 next = p + stack_frame_sizeW((StgClosure *)p);
2691 switch (info->i.type) {
2692
2693 case UPDATE_FRAME:
2694 // Only create raise_closure if we need to.
2695 if (raise_closure == NULL) {
2696 raise_closure =
2697 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2698 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2699 raise_closure->payload[0] = exception;
2700 }
2701 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2702 (StgClosure *)raise_closure);
2703 p = next;
2704 continue;
2705
2706 case ATOMICALLY_FRAME:
2707 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2708 tso->stackobj->sp = p;
2709 return ATOMICALLY_FRAME;
2710
2711 case CATCH_FRAME:
2712 tso->stackobj->sp = p;
2713 return CATCH_FRAME;
2714
2715 case CATCH_STM_FRAME:
2716 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2717 tso->stackobj->sp = p;
2718 return CATCH_STM_FRAME;
2719
2720 case UNDERFLOW_FRAME:
2721 tso->stackobj->sp = p;
2722 threadStackUnderflow(cap,tso);
2723 p = tso->stackobj->sp;
2724 continue;
2725
2726 case STOP_FRAME:
2727 tso->stackobj->sp = p;
2728 return STOP_FRAME;
2729
2730 case CATCH_RETRY_FRAME: {
2731 StgTRecHeader *trec = tso -> trec;
2732 StgTRecHeader *outer = trec -> enclosing_trec;
2733 debugTrace(DEBUG_stm,
2734 "found CATCH_RETRY_FRAME at %p during raise", p);
2735 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2736 stmAbortTransaction(cap, trec);
2737 stmFreeAbortedTRec(cap, trec);
2738 tso -> trec = outer;
2739 p = next;
2740 continue;
2741 }
2742
2743 default:
2744 p = next;
2745 continue;
2746 }
2747 }
2748 }
2749
2750
2751 /* -----------------------------------------------------------------------------
2752 findRetryFrameHelper
2753
2754 This function is called by the retry# primitive. It traverses the stack
2755 leaving tso->sp referring to the frame which should handle the retry.
2756
2757 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2758 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2759
2760 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2761 create) because retries are not considered to be exceptions, despite the
2762 similar implementation.
2763
2764 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2765 not be created within memory transactions.
2766 -------------------------------------------------------------------------- */
2767
2768 StgWord
2769 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2770 {
2771 StgPtr p, next;
2772 StgRetInfoTable *info;
2773
2774 p = tso->stackobj->sp;
2775 while (1) {
2776 info = get_ret_itbl((StgClosure *)p);
2777 next = p + stack_frame_sizeW((StgClosure *)p);
2778 switch (info->i.type) {
2779
2780 case ATOMICALLY_FRAME:
2781 debugTrace(DEBUG_stm,
2782 "found ATOMICALLY_FRAME at %p during retry", p);
2783 tso->stackobj->sp = p;
2784 return ATOMICALLY_FRAME;
2785
2786 case CATCH_RETRY_FRAME:
2787 debugTrace(DEBUG_stm,
2788 "found CATCH_RETRY_FRAME at %p during retry", p);
2789 tso->stackobj->sp = p;
2790 return CATCH_RETRY_FRAME;
2791
2792 case CATCH_STM_FRAME: {
2793 StgTRecHeader *trec = tso -> trec;
2794 StgTRecHeader *outer = trec -> enclosing_trec;
2795 debugTrace(DEBUG_stm,
2796 "found CATCH_STM_FRAME at %p during retry", p);
2797 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2798 stmAbortTransaction(cap, trec);
2799 stmFreeAbortedTRec(cap, trec);
2800 tso -> trec = outer;
2801 p = next;
2802 continue;
2803 }
2804
2805 case UNDERFLOW_FRAME:
2806 tso->stackobj->sp = p;
2807 threadStackUnderflow(cap,tso);
2808 p = tso->stackobj->sp;
2809 continue;
2810
2811 default:
2812 ASSERT(info->i.type != CATCH_FRAME);
2813 ASSERT(info->i.type != STOP_FRAME);
2814 p = next;
2815 continue;
2816 }
2817 }
2818 }
2819
2820 /* -----------------------------------------------------------------------------
2821 resurrectThreads is called after garbage collection on the list of
2822 threads found to be garbage. Each of these threads will be woken
2823 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2824 on an MVar, or NonTermination if the thread was blocked on a Black
2825 Hole.
2826
2827 Locks: assumes we hold *all* the capabilities.
2828 -------------------------------------------------------------------------- */
2829
2830 void
2831 resurrectThreads (StgTSO *threads)
2832 {
2833 StgTSO *tso, *next;
2834 Capability *cap;
2835 generation *gen;
2836
2837 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2838 next = tso->global_link;
2839
2840 gen = Bdescr((P_)tso)->gen;
2841 tso->global_link = gen->threads;
2842 gen->threads = tso;
2843
2844 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2845
2846 // Wake up the thread on the Capability it was last on
2847 cap = tso->cap;
2848
2849 switch (tso->why_blocked) {
2850 case BlockedOnMVar:
2851 case BlockedOnMVarRead:
2852 /* Called by GC - sched_mutex lock is currently held. */
2853 throwToSingleThreaded(cap, tso,
2854 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2855 break;
2856 case BlockedOnBlackHole:
2857 throwToSingleThreaded(cap, tso,
2858 (StgClosure *)nonTermination_closure);
2859 break;
2860 case BlockedOnSTM:
2861 throwToSingleThreaded(cap, tso,
2862 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2863 break;
2864 case NotBlocked:
2865 /* This might happen if the thread was blocked on a black hole
2866 * belonging to a thread that we've just woken up (raiseAsync
2867 * can wake up threads, remember...).
2868 */
2869 continue;
2870 case BlockedOnMsgThrowTo:
2871 // This can happen if the target is masking, blocks on a
2872 // black hole, and then is found to be unreachable. In
2873 // this case, we want to let the target wake up and carry
2874 // on, and do nothing to this thread.
2875 continue;
2876 default:
2877 barf("resurrectThreads: thread blocked in a strange way: %d",
2878 tso->why_blocked);
2879 }
2880 }
2881 }