Merge remote-tracking branch 'origin/master'
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability **pcap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability **cap);
149 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163
164 static void deleteThread (Capability *cap, StgTSO *tso);
165 static void deleteAllThreads (Capability *cap);
166
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability *cap, StgTSO *tso);
169 #endif
170
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
173
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
176
177 * out of heap space
178 * timer expires (thread yields)
179 * thread blocks
180 * thread ends
181 * stack overflow
182
183 GRAN version:
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
189
190 GUM version:
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
198
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
204
205 ------------------------------------------------------------------------ */
206
207 static Capability *
208 schedule (Capability *initialCapability, Task *task)
209 {
210 StgTSO *t;
211 Capability *cap;
212 StgThreadReturnCode ret;
213 nat prev_what_next;
214 rtsBool ready_to_gc;
215 #if defined(THREADED_RTS)
216 rtsBool first = rtsTrue;
217 #endif
218
219 cap = initialCapability;
220
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
224
225 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226
227 schedulePreLoop();
228
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
231
232 while (1) {
233
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
236 // call).
237 if (cap->in_haskell) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE);
241 }
242
243 // The interruption / shutdown sequence.
244 //
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
250 //
251 // So the sequence for ^C goes like this:
252 //
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
255 //
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
262 //
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
264 //
265 // * sched_state := SCHED_SHUTTING_DOWN
266 //
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
270 //
271 // * eventually all Capabilities will shut down, and the RTS can
272 // exit.
273 //
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
276
277 switch (sched_state) {
278 case SCHED_RUNNING:
279 break;
280 case SCHED_INTERRUPTING:
281 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap,task,rtsFalse);
284
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
288 // case now.
289 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
290 // fall through
291
292 case SCHED_SHUTTING_DOWN:
293 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
296 // the run queue.
297 if (!isBoundTask(task) && emptyRunQueue(cap)) {
298 return cap;
299 }
300 break;
301 default:
302 barf("sched_state: %d", sched_state);
303 }
304
305 scheduleFindWork(&cap);
306
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap,task);
310
311 scheduleDetectDeadlock(&cap,task);
312
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
318 //
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
321
322 #if defined(THREADED_RTS)
323 if (first)
324 {
325 // XXX: ToDo
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
328 // // door.
329 // first = rtsFalse;
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
331 }
332
333 scheduleYield(&cap,task);
334
335 if (emptyRunQueue(cap)) continue; // look for work again
336 #endif
337
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap) ) {
340 ASSERT(sched_state >= SCHED_INTERRUPTING);
341 }
342 #endif
343
344 //
345 // Get a thread to run
346 //
347 t = popRunQueue(cap);
348
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity,checkTSO(t));
352
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
356 {
357 InCall *bound = t->bound;
358
359 if (bound) {
360 if (bound->task == task) {
361 // yes, the Haskell thread is bound to the current native thread
362 } else {
363 debugTrace(DEBUG_sched,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t->id);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap,t);
368 continue;
369 }
370 } else {
371 // The thread we want to run is unbound.
372 if (task->incall->tso) {
373 debugTrace(DEBUG_sched,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t->id);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap,t);
379 continue;
380 }
381 }
382 }
383 #endif
384
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state >= SCHED_INTERRUPTING &&
390 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391 deleteThread(cap,t);
392 }
393
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
400 //
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
406 #ifdef THREADED_RTS
407 if (cap->disabled && !t->bound) {
408 Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
409 migrateThread(cap, t, dest_cap);
410 continue;
411 }
412 #endif
413
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
416 * +RTS -C0
417 */
418 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
419 && !emptyThreadQueues(cap)) {
420 cap->context_switch = 1;
421 }
422
423 run_thread:
424
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
427 // that.
428 cap->r.rCurrentTSO = t;
429
430 startHeapProfTimer();
431
432 // ----------------------------------------------------------------------
433 // Run the current thread
434
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436 ASSERT(t->cap == cap);
437 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
438
439 prev_what_next = t->what_next;
440
441 errno = t->saved_errno;
442 #if mingw32_HOST_OS
443 SetLastError(t->saved_winerror);
444 #endif
445
446 // reset the interrupt flag before running Haskell code
447 cap->interrupt = 0;
448
449 cap->in_haskell = rtsTrue;
450 cap->idle = 0;
451
452 dirty_TSO(cap,t);
453 dirty_STACK(cap,t->stackobj);
454
455 switch (recent_activity)
456 {
457 case ACTIVITY_DONE_GC: {
458 // ACTIVITY_DONE_GC means we turned off the timer signal to
459 // conserve power (see #1623). Re-enable it here.
460 nat prev;
461 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
462 #ifndef PROFILING
463 if (prev == ACTIVITY_DONE_GC) {
464 startTimer();
465 }
466 #endif
467 break;
468 }
469 case ACTIVITY_INACTIVE:
470 // If we reached ACTIVITY_INACTIVE, then don't reset it until
471 // we've done the GC. The thread running here might just be
472 // the IO manager thread that handle_tick() woke up via
473 // wakeUpRts().
474 break;
475 default:
476 recent_activity = ACTIVITY_YES;
477 }
478
479 traceEventRunThread(cap, t);
480
481 switch (prev_what_next) {
482
483 case ThreadKilled:
484 case ThreadComplete:
485 /* Thread already finished, return to scheduler. */
486 ret = ThreadFinished;
487 break;
488
489 case ThreadRunGHC:
490 {
491 StgRegTable *r;
492 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
493 cap = regTableToCapability(r);
494 ret = r->rRet;
495 break;
496 }
497
498 case ThreadInterpret:
499 cap = interpretBCO(cap);
500 ret = cap->r.rRet;
501 break;
502
503 default:
504 barf("schedule: invalid what_next field");
505 }
506
507 cap->in_haskell = rtsFalse;
508
509 // The TSO might have moved, eg. if it re-entered the RTS and a GC
510 // happened. So find the new location:
511 t = cap->r.rCurrentTSO;
512
513 // And save the current errno in this thread.
514 // XXX: possibly bogus for SMP because this thread might already
515 // be running again, see code below.
516 t->saved_errno = errno;
517 #if mingw32_HOST_OS
518 // Similarly for Windows error code
519 t->saved_winerror = GetLastError();
520 #endif
521
522 if (ret == ThreadBlocked) {
523 if (t->why_blocked == BlockedOnBlackHole) {
524 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
525 traceEventStopThread(cap, t, t->why_blocked + 6,
526 owner != NULL ? owner->id : 0);
527 } else {
528 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
529 }
530 } else {
531 traceEventStopThread(cap, t, ret, 0);
532 }
533
534 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
535 ASSERT(t->cap == cap);
536
537 // ----------------------------------------------------------------------
538
539 // Costs for the scheduler are assigned to CCS_SYSTEM
540 stopHeapProfTimer();
541 #if defined(PROFILING)
542 cap->r.rCCCS = CCS_SYSTEM;
543 #endif
544
545 schedulePostRunThread(cap,t);
546
547 ready_to_gc = rtsFalse;
548
549 switch (ret) {
550 case HeapOverflow:
551 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
552 break;
553
554 case StackOverflow:
555 // just adjust the stack for this thread, then pop it back
556 // on the run queue.
557 threadStackOverflow(cap, t);
558 pushOnRunQueue(cap,t);
559 break;
560
561 case ThreadYielding:
562 if (scheduleHandleYield(cap, t, prev_what_next)) {
563 // shortcut for switching between compiler/interpreter:
564 goto run_thread;
565 }
566 break;
567
568 case ThreadBlocked:
569 scheduleHandleThreadBlocked(t);
570 break;
571
572 case ThreadFinished:
573 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
574 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
575 break;
576
577 default:
578 barf("schedule: invalid thread return code %d", (int)ret);
579 }
580
581 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
582 scheduleDoGC(&cap,task,rtsFalse);
583 }
584 } /* end of while() */
585 }
586
587 /* -----------------------------------------------------------------------------
588 * Run queue operations
589 * -------------------------------------------------------------------------- */
590
591 void
592 removeFromRunQueue (Capability *cap, StgTSO *tso)
593 {
594 if (tso->block_info.prev == END_TSO_QUEUE) {
595 ASSERT(cap->run_queue_hd == tso);
596 cap->run_queue_hd = tso->_link;
597 } else {
598 setTSOLink(cap, tso->block_info.prev, tso->_link);
599 }
600 if (tso->_link == END_TSO_QUEUE) {
601 ASSERT(cap->run_queue_tl == tso);
602 cap->run_queue_tl = tso->block_info.prev;
603 } else {
604 setTSOPrev(cap, tso->_link, tso->block_info.prev);
605 }
606 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
607
608 IF_DEBUG(sanity, checkRunQueue(cap));
609 }
610
611 /* ----------------------------------------------------------------------------
612 * Setting up the scheduler loop
613 * ------------------------------------------------------------------------- */
614
615 static void
616 schedulePreLoop(void)
617 {
618 // initialisation for scheduler - what cannot go into initScheduler()
619
620 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
621 win32AllocStack();
622 #endif
623 }
624
625 /* -----------------------------------------------------------------------------
626 * scheduleFindWork()
627 *
628 * Search for work to do, and handle messages from elsewhere.
629 * -------------------------------------------------------------------------- */
630
631 static void
632 scheduleFindWork (Capability **pcap)
633 {
634 scheduleStartSignalHandlers(*pcap);
635
636 scheduleProcessInbox(pcap);
637
638 scheduleCheckBlockedThreads(*pcap);
639
640 #if defined(THREADED_RTS)
641 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
642 #endif
643 }
644
645 #if defined(THREADED_RTS)
646 STATIC_INLINE rtsBool
647 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
648 {
649 // we need to yield this capability to someone else if..
650 // - another thread is initiating a GC, and we didn't just do a GC
651 // (see Note [GC livelock])
652 // - another Task is returning from a foreign call
653 // - the thread at the head of the run queue cannot be run
654 // by this Task (it is bound to another Task, or it is unbound
655 // and this task it bound).
656 //
657 // Note [GC livelock]
658 //
659 // If we are interrupted to do a GC, then we do not immediately do
660 // another one. This avoids a starvation situation where one
661 // Capability keeps forcing a GC and the other Capabilities make no
662 // progress at all.
663
664 return ((pending_sync && !didGcLast) ||
665 cap->returning_tasks_hd != NULL ||
666 (!emptyRunQueue(cap) && (task->incall->tso == NULL
667 ? cap->run_queue_hd->bound != NULL
668 : cap->run_queue_hd->bound != task->incall)));
669 }
670
671 // This is the single place where a Task goes to sleep. There are
672 // two reasons it might need to sleep:
673 // - there are no threads to run
674 // - we need to yield this Capability to someone else
675 // (see shouldYieldCapability())
676 //
677 // Careful: the scheduler loop is quite delicate. Make sure you run
678 // the tests in testsuite/concurrent (all ways) after modifying this,
679 // and also check the benchmarks in nofib/parallel for regressions.
680
681 static void
682 scheduleYield (Capability **pcap, Task *task)
683 {
684 Capability *cap = *pcap;
685 int didGcLast = rtsFalse;
686
687 // if we have work, and we don't need to give up the Capability, continue.
688 //
689 if (!shouldYieldCapability(cap,task,rtsFalse) &&
690 (!emptyRunQueue(cap) ||
691 !emptyInbox(cap) ||
692 sched_state >= SCHED_INTERRUPTING)) {
693 return;
694 }
695
696 // otherwise yield (sleep), and keep yielding if necessary.
697 do {
698 didGcLast = yieldCapability(&cap,task, !didGcLast);
699 }
700 while (shouldYieldCapability(cap,task,didGcLast));
701
702 // note there may still be no threads on the run queue at this
703 // point, the caller has to check.
704
705 *pcap = cap;
706 return;
707 }
708 #endif
709
710 /* -----------------------------------------------------------------------------
711 * schedulePushWork()
712 *
713 * Push work to other Capabilities if we have some.
714 * -------------------------------------------------------------------------- */
715
716 static void
717 schedulePushWork(Capability *cap USED_IF_THREADS,
718 Task *task USED_IF_THREADS)
719 {
720 /* following code not for PARALLEL_HASKELL. I kept the call general,
721 future GUM versions might use pushing in a distributed setup */
722 #if defined(THREADED_RTS)
723
724 Capability *free_caps[n_capabilities], *cap0;
725 nat i, n_free_caps;
726
727 // migration can be turned off with +RTS -qm
728 if (!RtsFlags.ParFlags.migrate) return;
729
730 // Check whether we have more threads on our run queue, or sparks
731 // in our pool, that we could hand to another Capability.
732 if (cap->run_queue_hd == END_TSO_QUEUE) {
733 if (sparkPoolSizeCap(cap) < 2) return;
734 } else {
735 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
736 sparkPoolSizeCap(cap) < 1) return;
737 }
738
739 // First grab as many free Capabilities as we can.
740 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
741 cap0 = &capabilities[i];
742 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
743 if (!emptyRunQueue(cap0)
744 || cap0->returning_tasks_hd != NULL
745 || cap0->inbox != (Message*)END_TSO_QUEUE) {
746 // it already has some work, we just grabbed it at
747 // the wrong moment. Or maybe it's deadlocked!
748 releaseCapability(cap0);
749 } else {
750 free_caps[n_free_caps++] = cap0;
751 }
752 }
753 }
754
755 // we now have n_free_caps free capabilities stashed in
756 // free_caps[]. Share our run queue equally with them. This is
757 // probably the simplest thing we could do; improvements we might
758 // want to do include:
759 //
760 // - giving high priority to moving relatively new threads, on
761 // the gournds that they haven't had time to build up a
762 // working set in the cache on this CPU/Capability.
763 //
764 // - giving low priority to moving long-lived threads
765
766 if (n_free_caps > 0) {
767 StgTSO *prev, *t, *next;
768 #ifdef SPARK_PUSHING
769 rtsBool pushed_to_all;
770 #endif
771
772 debugTrace(DEBUG_sched,
773 "cap %d: %s and %d free capabilities, sharing...",
774 cap->no,
775 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
776 "excess threads on run queue":"sparks to share (>=2)",
777 n_free_caps);
778
779 i = 0;
780 #ifdef SPARK_PUSHING
781 pushed_to_all = rtsFalse;
782 #endif
783
784 if (cap->run_queue_hd != END_TSO_QUEUE) {
785 prev = cap->run_queue_hd;
786 t = prev->_link;
787 prev->_link = END_TSO_QUEUE;
788 for (; t != END_TSO_QUEUE; t = next) {
789 next = t->_link;
790 t->_link = END_TSO_QUEUE;
791 if (t->bound == task->incall // don't move my bound thread
792 || tsoLocked(t)) { // don't move a locked thread
793 setTSOLink(cap, prev, t);
794 setTSOPrev(cap, t, prev);
795 prev = t;
796 } else if (i == n_free_caps) {
797 #ifdef SPARK_PUSHING
798 pushed_to_all = rtsTrue;
799 #endif
800 i = 0;
801 // keep one for us
802 setTSOLink(cap, prev, t);
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 } else {
806 appendToRunQueue(free_caps[i],t);
807
808 traceEventMigrateThread (cap, t, free_caps[i]->no);
809
810 if (t->bound) { t->bound->task->cap = free_caps[i]; }
811 t->cap = free_caps[i];
812 i++;
813 }
814 }
815 cap->run_queue_tl = prev;
816
817 IF_DEBUG(sanity, checkRunQueue(cap));
818 }
819
820 #ifdef SPARK_PUSHING
821 /* JB I left this code in place, it would work but is not necessary */
822
823 // If there are some free capabilities that we didn't push any
824 // threads to, then try to push a spark to each one.
825 if (!pushed_to_all) {
826 StgClosure *spark;
827 // i is the next free capability to push to
828 for (; i < n_free_caps; i++) {
829 if (emptySparkPoolCap(free_caps[i])) {
830 spark = tryStealSpark(cap->sparks);
831 if (spark != NULL) {
832 /* TODO: if anyone wants to re-enable this code then
833 * they must consider the fizzledSpark(spark) case
834 * and update the per-cap spark statistics.
835 */
836 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
837
838 traceEventStealSpark(free_caps[i], t, cap->no);
839
840 newSpark(&(free_caps[i]->r), spark);
841 }
842 }
843 }
844 }
845 #endif /* SPARK_PUSHING */
846
847 // release the capabilities
848 for (i = 0; i < n_free_caps; i++) {
849 task->cap = free_caps[i];
850 releaseAndWakeupCapability(free_caps[i]);
851 }
852 }
853 task->cap = cap; // reset to point to our Capability.
854
855 #endif /* THREADED_RTS */
856
857 }
858
859 /* ----------------------------------------------------------------------------
860 * Start any pending signal handlers
861 * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868 // safe outside the lock
869 startSignalHandlers(cap);
870 }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880 * Check for blocked threads that can be woken up.
881 * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887 //
888 // Check whether any waiting threads need to be woken up. If the
889 // run queue is empty, and there are no other tasks running, we
890 // can wait indefinitely for something to happen.
891 //
892 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893 {
894 awaitEvent (emptyRunQueue(cap));
895 }
896 #endif
897 }
898
899 /* ----------------------------------------------------------------------------
900 * Detect deadlock conditions and attempt to resolve them.
901 * ------------------------------------------------------------------------- */
902
903 static void
904 scheduleDetectDeadlock (Capability **pcap, Task *task)
905 {
906 Capability *cap = *pcap;
907 /*
908 * Detect deadlock: when we have no threads to run, there are no
909 * threads blocked, waiting for I/O, or sleeping, and all the
910 * other tasks are waiting for work, we must have a deadlock of
911 * some description.
912 */
913 if ( emptyThreadQueues(cap) )
914 {
915 #if defined(THREADED_RTS)
916 /*
917 * In the threaded RTS, we only check for deadlock if there
918 * has been no activity in a complete timeslice. This means
919 * we won't eagerly start a full GC just because we don't have
920 * any threads to run currently.
921 */
922 if (recent_activity != ACTIVITY_INACTIVE) return;
923 #endif
924
925 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
926
927 // Garbage collection can release some new threads due to
928 // either (a) finalizers or (b) threads resurrected because
929 // they are unreachable and will therefore be sent an
930 // exception. Any threads thus released will be immediately
931 // runnable.
932 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
933 cap = *pcap;
934 // when force_major == rtsTrue. scheduleDoGC sets
935 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
936 // signal.
937
938 if ( !emptyRunQueue(cap) ) return;
939
940 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
941 /* If we have user-installed signal handlers, then wait
942 * for signals to arrive rather then bombing out with a
943 * deadlock.
944 */
945 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
946 debugTrace(DEBUG_sched,
947 "still deadlocked, waiting for signals...");
948
949 awaitUserSignals();
950
951 if (signals_pending()) {
952 startSignalHandlers(cap);
953 }
954
955 // either we have threads to run, or we were interrupted:
956 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
957
958 return;
959 }
960 #endif
961
962 #if !defined(THREADED_RTS)
963 /* Probably a real deadlock. Send the current main thread the
964 * Deadlock exception.
965 */
966 if (task->incall->tso) {
967 switch (task->incall->tso->why_blocked) {
968 case BlockedOnSTM:
969 case BlockedOnBlackHole:
970 case BlockedOnMsgThrowTo:
971 case BlockedOnMVar:
972 throwToSingleThreaded(cap, task->incall->tso,
973 (StgClosure *)nonTermination_closure);
974 return;
975 default:
976 barf("deadlock: main thread blocked in a strange way");
977 }
978 }
979 return;
980 #endif
981 }
982 }
983
984
985 /* ----------------------------------------------------------------------------
986 * Send pending messages (PARALLEL_HASKELL only)
987 * ------------------------------------------------------------------------- */
988
989 #if defined(PARALLEL_HASKELL)
990 static void
991 scheduleSendPendingMessages(void)
992 {
993
994 # if defined(PAR) // global Mem.Mgmt., omit for now
995 if (PendingFetches != END_BF_QUEUE) {
996 processFetches();
997 }
998 # endif
999
1000 if (RtsFlags.ParFlags.BufferTime) {
1001 // if we use message buffering, we must send away all message
1002 // packets which have become too old...
1003 sendOldBuffers();
1004 }
1005 }
1006 #endif
1007
1008 /* ----------------------------------------------------------------------------
1009 * Process message in the current Capability's inbox
1010 * ------------------------------------------------------------------------- */
1011
1012 static void
1013 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
1014 {
1015 #if defined(THREADED_RTS)
1016 Message *m, *next;
1017 int r;
1018 Capability *cap = *pcap;
1019
1020 while (!emptyInbox(cap)) {
1021 if (cap->r.rCurrentNursery->link == NULL ||
1022 g0->n_new_large_words >= large_alloc_lim) {
1023 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1024 cap = *pcap;
1025 }
1026
1027 // don't use a blocking acquire; if the lock is held by
1028 // another thread then just carry on. This seems to avoid
1029 // getting stuck in a message ping-pong situation with other
1030 // processors. We'll check the inbox again later anyway.
1031 //
1032 // We should really use a more efficient queue data structure
1033 // here. The trickiness is that we must ensure a Capability
1034 // never goes idle if the inbox is non-empty, which is why we
1035 // use cap->lock (cap->lock is released as the last thing
1036 // before going idle; see Capability.c:releaseCapability()).
1037 r = TRY_ACQUIRE_LOCK(&cap->lock);
1038 if (r != 0) return;
1039
1040 m = cap->inbox;
1041 cap->inbox = (Message*)END_TSO_QUEUE;
1042
1043 RELEASE_LOCK(&cap->lock);
1044
1045 while (m != (Message*)END_TSO_QUEUE) {
1046 next = m->link;
1047 executeMessage(cap, m);
1048 m = next;
1049 }
1050 }
1051 #endif
1052 }
1053
1054 /* ----------------------------------------------------------------------------
1055 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1056 * ------------------------------------------------------------------------- */
1057
1058 #if defined(THREADED_RTS)
1059 static void
1060 scheduleActivateSpark(Capability *cap)
1061 {
1062 if (anySparks() && !cap->disabled)
1063 {
1064 createSparkThread(cap);
1065 debugTrace(DEBUG_sched, "creating a spark thread");
1066 }
1067 }
1068 #endif // PARALLEL_HASKELL || THREADED_RTS
1069
1070 /* ----------------------------------------------------------------------------
1071 * After running a thread...
1072 * ------------------------------------------------------------------------- */
1073
1074 static void
1075 schedulePostRunThread (Capability *cap, StgTSO *t)
1076 {
1077 // We have to be able to catch transactions that are in an
1078 // infinite loop as a result of seeing an inconsistent view of
1079 // memory, e.g.
1080 //
1081 // atomically $ do
1082 // [a,b] <- mapM readTVar [ta,tb]
1083 // when (a == b) loop
1084 //
1085 // and a is never equal to b given a consistent view of memory.
1086 //
1087 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1088 if (!stmValidateNestOfTransactions (t -> trec)) {
1089 debugTrace(DEBUG_sched | DEBUG_stm,
1090 "trec %p found wasting its time", t);
1091
1092 // strip the stack back to the
1093 // ATOMICALLY_FRAME, aborting the (nested)
1094 // transaction, and saving the stack of any
1095 // partially-evaluated thunks on the heap.
1096 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1097
1098 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1099 }
1100 }
1101
1102 /* some statistics gathering in the parallel case */
1103 }
1104
1105 /* -----------------------------------------------------------------------------
1106 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1107 * -------------------------------------------------------------------------- */
1108
1109 static rtsBool
1110 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1111 {
1112 // did the task ask for a large block?
1113 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1114 // if so, get one and push it on the front of the nursery.
1115 bdescr *bd;
1116 W_ blocks;
1117
1118 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1119
1120 if (blocks > BLOCKS_PER_MBLOCK) {
1121 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1122 }
1123
1124 debugTrace(DEBUG_sched,
1125 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1126 (long)t->id, what_next_strs[t->what_next], blocks);
1127
1128 // don't do this if the nursery is (nearly) full, we'll GC first.
1129 if (cap->r.rCurrentNursery->link != NULL ||
1130 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1131 // if the nursery has only one block.
1132
1133 bd = allocGroup_lock(blocks);
1134 cap->r.rNursery->n_blocks += blocks;
1135
1136 // link the new group into the list
1137 bd->link = cap->r.rCurrentNursery;
1138 bd->u.back = cap->r.rCurrentNursery->u.back;
1139 if (cap->r.rCurrentNursery->u.back != NULL) {
1140 cap->r.rCurrentNursery->u.back->link = bd;
1141 } else {
1142 cap->r.rNursery->blocks = bd;
1143 }
1144 cap->r.rCurrentNursery->u.back = bd;
1145
1146 // initialise it as a nursery block. We initialise the
1147 // step, gen_no, and flags field of *every* sub-block in
1148 // this large block, because this is easier than making
1149 // sure that we always find the block head of a large
1150 // block whenever we call Bdescr() (eg. evacuate() and
1151 // isAlive() in the GC would both have to do this, at
1152 // least).
1153 {
1154 bdescr *x;
1155 for (x = bd; x < bd + blocks; x++) {
1156 initBdescr(x,g0,g0);
1157 x->free = x->start;
1158 x->flags = 0;
1159 }
1160 }
1161
1162 // This assert can be a killer if the app is doing lots
1163 // of large block allocations.
1164 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1165
1166 // now update the nursery to point to the new block
1167 cap->r.rCurrentNursery = bd;
1168
1169 // we might be unlucky and have another thread get on the
1170 // run queue before us and steal the large block, but in that
1171 // case the thread will just end up requesting another large
1172 // block.
1173 pushOnRunQueue(cap,t);
1174 return rtsFalse; /* not actually GC'ing */
1175 }
1176 }
1177
1178 if (cap->r.rHpLim == NULL || cap->context_switch) {
1179 // Sometimes we miss a context switch, e.g. when calling
1180 // primitives in a tight loop, MAYBE_GC() doesn't check the
1181 // context switch flag, and we end up waiting for a GC.
1182 // See #1984, and concurrent/should_run/1984
1183 cap->context_switch = 0;
1184 appendToRunQueue(cap,t);
1185 } else {
1186 pushOnRunQueue(cap,t);
1187 }
1188 return rtsTrue;
1189 /* actual GC is done at the end of the while loop in schedule() */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1198 {
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1202 * GC is finished.
1203 */
1204
1205 ASSERT(t->_link == END_TSO_QUEUE);
1206
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1209 // thread.
1210 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1211 debugTrace(DEBUG_sched,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t->id, what_next_strs[t->what_next]);
1214 return rtsTrue;
1215 }
1216
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap->context_switch != 0) {
1224 cap->context_switch = 0;
1225 appendToRunQueue(cap,t);
1226 } else {
1227 pushOnRunQueue(cap,t);
1228 }
1229
1230 IF_DEBUG(sanity,
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232 checkTSO(t));
1233
1234 return rtsFalse;
1235 }
1236
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1240
1241 static void
1242 scheduleHandleThreadBlocked( StgTSO *t
1243 #if !defined(DEBUG)
1244 STG_UNUSED
1245 #endif
1246 )
1247 {
1248
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1252
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1258
1259 #ifdef DEBUG
1260 traceThreadStatus(DEBUG_sched, t);
1261 #endif
1262 }
1263
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1267
1268 static rtsBool
1269 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1270 {
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1273 *
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1276 */
1277
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap, t);
1281
1282 //
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1285 //
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1290 //
1291
1292 if (t->bound) {
1293
1294 if (t->bound != task->incall) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap,t);
1303 return rtsFalse;
1304 #else
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1308 #endif
1309 }
1310
1311 ASSERT(task->incall->tso == t);
1312
1313 if (t->what_next == ThreadComplete) {
1314 if (task->incall->ret) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1317 }
1318 task->incall->stat = Success;
1319 } else {
1320 if (task->incall->ret) {
1321 *(task->incall->ret) = NULL;
1322 }
1323 if (sched_state >= SCHED_INTERRUPTING) {
1324 if (heap_overflow) {
1325 task->incall->stat = HeapExhausted;
1326 } else {
1327 task->incall->stat = Interrupted;
1328 }
1329 } else {
1330 task->incall->stat = Killed;
1331 }
1332 }
1333 #ifdef DEBUG
1334 removeThreadLabel((StgWord)task->incall->tso->id);
1335 #endif
1336
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1344 t->bound = NULL;
1345 task->incall->tso = NULL;
1346
1347 return rtsTrue; // tells schedule() to return
1348 }
1349
1350 return rtsFalse;
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1356
1357 static rtsBool
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1359 {
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile ||
1363 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1364 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1365 return rtsTrue;
1366 } else {
1367 return rtsFalse;
1368 }
1369 }
1370
1371 /* -----------------------------------------------------------------------------
1372 * Start a synchronisation of all capabilities
1373 * -------------------------------------------------------------------------- */
1374
1375 // Returns:
1376 // 0 if we successfully got a sync
1377 // non-0 if there was another sync request in progress,
1378 // and we yielded to it. The value returned is the
1379 // type of the other sync request.
1380 //
1381 #if defined(THREADED_RTS)
1382 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1383 {
1384 nat prev_pending_sync;
1385
1386 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1387
1388 if (prev_pending_sync)
1389 {
1390 do {
1391 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1392 prev_pending_sync);
1393 ASSERT(*pcap);
1394 yieldCapability(pcap,task,rtsTrue);
1395 } while (pending_sync);
1396 return prev_pending_sync; // NOTE: task->cap might have changed now
1397 }
1398 else
1399 {
1400 return 0;
1401 }
1402 }
1403
1404 //
1405 // Grab all the capabilities except the one we already hold. Used
1406 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1407 // before a fork (SYNC_OTHER).
1408 //
1409 // Only call this after requestSync(), otherwise a deadlock might
1410 // ensue if another thread is trying to synchronise.
1411 //
1412 static void acquireAllCapabilities(Capability *cap, Task *task)
1413 {
1414 Capability *tmpcap;
1415 nat i;
1416
1417 for (i=0; i < n_capabilities; i++) {
1418 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1419 tmpcap = &capabilities[i];
1420 if (tmpcap != cap) {
1421 // we better hope this task doesn't get migrated to
1422 // another Capability while we're waiting for this one.
1423 // It won't, because load balancing happens while we have
1424 // all the Capabilities, but even so it's a slightly
1425 // unsavoury invariant.
1426 task->cap = tmpcap;
1427 waitForReturnCapability(&tmpcap, task);
1428 if (tmpcap->no != i) {
1429 barf("acquireAllCapabilities: got the wrong capability");
1430 }
1431 }
1432 }
1433 task->cap = cap;
1434 }
1435
1436 static void releaseAllCapabilities(Capability *cap, Task *task)
1437 {
1438 nat i;
1439
1440 for (i = 0; i < n_capabilities; i++) {
1441 if (cap->no != i) {
1442 task->cap = &capabilities[i];
1443 releaseCapability(&capabilities[i]);
1444 }
1445 }
1446 task->cap = cap;
1447 }
1448 #endif
1449
1450 /* -----------------------------------------------------------------------------
1451 * Perform a garbage collection if necessary
1452 * -------------------------------------------------------------------------- */
1453
1454 static void
1455 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1456 rtsBool force_major)
1457 {
1458 Capability *cap = *pcap;
1459 rtsBool heap_census;
1460 nat collect_gen;
1461 #ifdef THREADED_RTS
1462 rtsBool idle_cap[n_capabilities];
1463 rtsBool gc_type;
1464 nat i, sync;
1465 StgTSO *tso;
1466 #endif
1467
1468 if (sched_state == SCHED_SHUTTING_DOWN) {
1469 // The final GC has already been done, and the system is
1470 // shutting down. We'll probably deadlock if we try to GC
1471 // now.
1472 return;
1473 }
1474
1475 heap_census = scheduleNeedHeapProfile(rtsTrue);
1476
1477 // Figure out which generation we are collecting, so that we can
1478 // decide whether this is a parallel GC or not.
1479 collect_gen = calcNeeded(force_major || heap_census, NULL);
1480
1481 #ifdef THREADED_RTS
1482 if (sched_state < SCHED_INTERRUPTING
1483 && RtsFlags.ParFlags.parGcEnabled
1484 && collect_gen >= RtsFlags.ParFlags.parGcGen
1485 && ! oldest_gen->mark)
1486 {
1487 gc_type = SYNC_GC_PAR;
1488 } else {
1489 gc_type = SYNC_GC_SEQ;
1490 }
1491
1492 // In order to GC, there must be no threads running Haskell code.
1493 // Therefore, the GC thread needs to hold *all* the capabilities,
1494 // and release them after the GC has completed.
1495 //
1496 // This seems to be the simplest way: previous attempts involved
1497 // making all the threads with capabilities give up their
1498 // capabilities and sleep except for the *last* one, which
1499 // actually did the GC. But it's quite hard to arrange for all
1500 // the other tasks to sleep and stay asleep.
1501 //
1502
1503 /* Other capabilities are prevented from running yet more Haskell
1504 threads if pending_sync is set. Tested inside
1505 yieldCapability() and releaseCapability() in Capability.c */
1506
1507 do {
1508 sync = requestSync(pcap, task, gc_type);
1509 cap = *pcap;
1510 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1511 // someone else had a pending sync request for a GC, so
1512 // let's assume GC has been done and we don't need to GC
1513 // again.
1514 return;
1515 }
1516 if (sched_state == SCHED_SHUTTING_DOWN) {
1517 // The scheduler might now be shutting down. We tested
1518 // this above, but it might have become true since then as
1519 // we yielded the capability in requestSync().
1520 return;
1521 }
1522 } while (sync);
1523
1524 interruptAllCapabilities();
1525
1526 // The final shutdown GC is always single-threaded, because it's
1527 // possible that some of the Capabilities have no worker threads.
1528
1529 if (gc_type == SYNC_GC_SEQ)
1530 {
1531 traceEventRequestSeqGc(cap);
1532 }
1533 else
1534 {
1535 traceEventRequestParGc(cap);
1536 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1537 }
1538
1539 if (gc_type == SYNC_GC_SEQ)
1540 {
1541 // single-threaded GC: grab all the capabilities
1542 acquireAllCapabilities(cap,task);
1543 }
1544 else
1545 {
1546 // If we are load-balancing collections in this
1547 // generation, then we require all GC threads to participate
1548 // in the collection. Otherwise, we only require active
1549 // threads to participate, and we set gc_threads[i]->idle for
1550 // any idle capabilities. The rationale here is that waking
1551 // up an idle Capability takes much longer than just doing any
1552 // GC work on its behalf.
1553
1554 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1555 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1556 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1557 for (i=0; i < n_capabilities; i++) {
1558 if (capabilities[i].disabled) {
1559 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1560 } else {
1561 idle_cap[i] = rtsFalse;
1562 }
1563 }
1564 } else {
1565 for (i=0; i < n_capabilities; i++) {
1566 if (capabilities[i].disabled) {
1567 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1568 } else if (i == cap->no ||
1569 capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1570 idle_cap[i] = rtsFalse;
1571 } else {
1572 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1573 if (!idle_cap[i]) {
1574 n_failed_trygrab_idles++;
1575 } else {
1576 n_idle_caps++;
1577 }
1578 }
1579 }
1580 }
1581
1582 // We set the gc_thread[i]->idle flag if that
1583 // capability/thread is not participating in this collection.
1584 // We also keep a local record of which capabilities are idle
1585 // in idle_cap[], because scheduleDoGC() is re-entrant:
1586 // another thread might start a GC as soon as we've finished
1587 // this one, and thus the gc_thread[]->idle flags are invalid
1588 // as soon as we release any threads after GC. Getting this
1589 // wrong leads to a rare and hard to debug deadlock!
1590
1591 for (i=0; i < n_capabilities; i++) {
1592 gc_threads[i]->idle = idle_cap[i];
1593 capabilities[i].idle++;
1594 }
1595
1596 // For all capabilities participating in this GC, wait until
1597 // they have stopped mutating and are standing by for GC.
1598 waitForGcThreads(cap);
1599
1600 #if defined(THREADED_RTS)
1601 // Stable point where we can do a global check on our spark counters
1602 ASSERT(checkSparkCountInvariant());
1603 #endif
1604 }
1605
1606 #endif
1607
1608 IF_DEBUG(scheduler, printAllThreads());
1609
1610 delete_threads_and_gc:
1611 /*
1612 * We now have all the capabilities; if we're in an interrupting
1613 * state, then we should take the opportunity to delete all the
1614 * threads in the system.
1615 */
1616 if (sched_state == SCHED_INTERRUPTING) {
1617 deleteAllThreads(cap);
1618 #if defined(THREADED_RTS)
1619 // Discard all the sparks from every Capability. Why?
1620 // They'll probably be GC'd anyway since we've killed all the
1621 // threads. It just avoids the GC having to do any work to
1622 // figure out that any remaining sparks are garbage.
1623 for (i = 0; i < n_capabilities; i++) {
1624 capabilities[i].spark_stats.gcd +=
1625 sparkPoolSize(capabilities[i].sparks);
1626 // No race here since all Caps are stopped.
1627 discardSparksCap(&capabilities[i]);
1628 }
1629 #endif
1630 sched_state = SCHED_SHUTTING_DOWN;
1631 }
1632
1633 /*
1634 * When there are disabled capabilities, we want to migrate any
1635 * threads away from them. Normally this happens in the
1636 * scheduler's loop, but only for unbound threads - it's really
1637 * hard for a bound thread to migrate itself. So we have another
1638 * go here.
1639 */
1640 #if defined(THREADED_RTS)
1641 for (i = enabled_capabilities; i < n_capabilities; i++) {
1642 Capability *tmp_cap, *dest_cap;
1643 tmp_cap = &capabilities[i];
1644 ASSERT(tmp_cap->disabled);
1645 if (i != cap->no) {
1646 dest_cap = &capabilities[i % enabled_capabilities];
1647 while (!emptyRunQueue(tmp_cap)) {
1648 tso = popRunQueue(tmp_cap);
1649 migrateThread(tmp_cap, tso, dest_cap);
1650 if (tso->bound) {
1651 traceTaskMigrate(tso->bound->task,
1652 tso->bound->task->cap,
1653 dest_cap);
1654 tso->bound->task->cap = dest_cap;
1655 }
1656 }
1657 }
1658 }
1659 #endif
1660
1661 #if defined(THREADED_RTS)
1662 // reset pending_sync *before* GC, so that when the GC threads
1663 // emerge they don't immediately re-enter the GC.
1664 pending_sync = 0;
1665 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1666 #else
1667 GarbageCollect(collect_gen, heap_census, 0, cap);
1668 #endif
1669
1670 traceSparkCounters(cap);
1671
1672 switch (recent_activity) {
1673 case ACTIVITY_INACTIVE:
1674 if (force_major) {
1675 // We are doing a GC because the system has been idle for a
1676 // timeslice and we need to check for deadlock. Record the
1677 // fact that we've done a GC and turn off the timer signal;
1678 // it will get re-enabled if we run any threads after the GC.
1679 recent_activity = ACTIVITY_DONE_GC;
1680 #ifndef PROFILING
1681 stopTimer();
1682 #endif
1683 break;
1684 }
1685 // fall through...
1686
1687 case ACTIVITY_MAYBE_NO:
1688 // the GC might have taken long enough for the timer to set
1689 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1690 // but we aren't necessarily deadlocked:
1691 recent_activity = ACTIVITY_YES;
1692 break;
1693
1694 case ACTIVITY_DONE_GC:
1695 // If we are actually active, the scheduler will reset the
1696 // recent_activity flag and re-enable the timer.
1697 break;
1698 }
1699
1700 #if defined(THREADED_RTS)
1701 // Stable point where we can do a global check on our spark counters
1702 ASSERT(checkSparkCountInvariant());
1703 #endif
1704
1705 // The heap census itself is done during GarbageCollect().
1706 if (heap_census) {
1707 performHeapProfile = rtsFalse;
1708 }
1709
1710 #if defined(THREADED_RTS)
1711 if (gc_type == SYNC_GC_PAR)
1712 {
1713 releaseGCThreads(cap);
1714 for (i = 0; i < n_capabilities; i++) {
1715 if (i != cap->no) {
1716 if (idle_cap[i]) {
1717 ASSERT(capabilities[i].running_task == task);
1718 task->cap = &capabilities[i];
1719 releaseCapability(&capabilities[i]);
1720 } else {
1721 ASSERT(capabilities[i].running_task != task);
1722 }
1723 }
1724 }
1725 task->cap = cap;
1726 }
1727 #endif
1728
1729 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1730 // GC set the heap_overflow flag, so we should proceed with
1731 // an orderly shutdown now. Ultimately we want the main
1732 // thread to return to its caller with HeapExhausted, at which
1733 // point the caller should call hs_exit(). The first step is
1734 // to delete all the threads.
1735 //
1736 // Another way to do this would be to raise an exception in
1737 // the main thread, which we really should do because it gives
1738 // the program a chance to clean up. But how do we find the
1739 // main thread? It should presumably be the same one that
1740 // gets ^C exceptions, but that's all done on the Haskell side
1741 // (GHC.TopHandler).
1742 sched_state = SCHED_INTERRUPTING;
1743 goto delete_threads_and_gc;
1744 }
1745
1746 #ifdef SPARKBALANCE
1747 /* JB
1748 Once we are all together... this would be the place to balance all
1749 spark pools. No concurrent stealing or adding of new sparks can
1750 occur. Should be defined in Sparks.c. */
1751 balanceSparkPoolsCaps(n_capabilities, capabilities);
1752 #endif
1753
1754 #if defined(THREADED_RTS)
1755 if (gc_type == SYNC_GC_SEQ) {
1756 // release our stash of capabilities.
1757 releaseAllCapabilities(cap, task);
1758 }
1759 #endif
1760
1761 return;
1762 }
1763
1764 /* ---------------------------------------------------------------------------
1765 * Singleton fork(). Do not copy any running threads.
1766 * ------------------------------------------------------------------------- */
1767
1768 pid_t
1769 forkProcess(HsStablePtr *entry
1770 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1771 STG_UNUSED
1772 #endif
1773 )
1774 {
1775 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1776 pid_t pid;
1777 StgTSO* t,*next;
1778 Capability *cap;
1779 nat g;
1780 Task *task = NULL;
1781 nat i;
1782 #ifdef THREADED_RTS
1783 nat sync;
1784 #endif
1785
1786 debugTrace(DEBUG_sched, "forking!");
1787
1788 task = newBoundTask();
1789
1790 cap = NULL;
1791 waitForReturnCapability(&cap, task);
1792
1793 #ifdef THREADED_RTS
1794 do {
1795 sync = requestSync(&cap, task, SYNC_OTHER);
1796 } while (sync);
1797
1798 acquireAllCapabilities(cap,task);
1799
1800 pending_sync = 0;
1801 #endif
1802
1803 // no funny business: hold locks while we fork, otherwise if some
1804 // other thread is holding a lock when the fork happens, the data
1805 // structure protected by the lock will forever be in an
1806 // inconsistent state in the child. See also #1391.
1807 ACQUIRE_LOCK(&sched_mutex);
1808 ACQUIRE_LOCK(&sm_mutex);
1809 ACQUIRE_LOCK(&stable_mutex);
1810 ACQUIRE_LOCK(&task->lock);
1811
1812 for (i=0; i < n_capabilities; i++) {
1813 ACQUIRE_LOCK(&capabilities[i].lock);
1814 }
1815
1816 stopTimer(); // See #4074
1817
1818 #if defined(TRACING)
1819 flushEventLog(); // so that child won't inherit dirty file buffers
1820 #endif
1821
1822 pid = fork();
1823
1824 if (pid) { // parent
1825
1826 startTimer(); // #4074
1827
1828 RELEASE_LOCK(&sched_mutex);
1829 RELEASE_LOCK(&sm_mutex);
1830 RELEASE_LOCK(&stable_mutex);
1831 RELEASE_LOCK(&task->lock);
1832
1833 for (i=0; i < n_capabilities; i++) {
1834 releaseCapability_(&capabilities[i],rtsFalse);
1835 RELEASE_LOCK(&capabilities[i].lock);
1836 }
1837 boundTaskExiting(task);
1838
1839 // just return the pid
1840 return pid;
1841
1842 } else { // child
1843
1844 #if defined(THREADED_RTS)
1845 initMutex(&sched_mutex);
1846 initMutex(&sm_mutex);
1847 initMutex(&stable_mutex);
1848 initMutex(&task->lock);
1849
1850 for (i=0; i < n_capabilities; i++) {
1851 initMutex(&capabilities[i].lock);
1852 }
1853 #endif
1854
1855 #ifdef TRACING
1856 resetTracing();
1857 #endif
1858
1859 // Now, all OS threads except the thread that forked are
1860 // stopped. We need to stop all Haskell threads, including
1861 // those involved in foreign calls. Also we need to delete
1862 // all Tasks, because they correspond to OS threads that are
1863 // now gone.
1864
1865 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1866 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1867 next = t->global_link;
1868 // don't allow threads to catch the ThreadKilled
1869 // exception, but we do want to raiseAsync() because these
1870 // threads may be evaluating thunks that we need later.
1871 deleteThread_(t->cap,t);
1872
1873 // stop the GC from updating the InCall to point to
1874 // the TSO. This is only necessary because the
1875 // OSThread bound to the TSO has been killed, and
1876 // won't get a chance to exit in the usual way (see
1877 // also scheduleHandleThreadFinished).
1878 t->bound = NULL;
1879 }
1880 }
1881
1882 discardTasksExcept(task);
1883
1884 for (i=0; i < n_capabilities; i++) {
1885 cap = &capabilities[i];
1886
1887 // Empty the run queue. It seems tempting to let all the
1888 // killed threads stay on the run queue as zombies to be
1889 // cleaned up later, but some of them may correspond to
1890 // bound threads for which the corresponding Task does not
1891 // exist.
1892 cap->run_queue_hd = END_TSO_QUEUE;
1893 cap->run_queue_tl = END_TSO_QUEUE;
1894
1895 // Any suspended C-calling Tasks are no more, their OS threads
1896 // don't exist now:
1897 cap->suspended_ccalls = NULL;
1898
1899 #if defined(THREADED_RTS)
1900 // Wipe our spare workers list, they no longer exist. New
1901 // workers will be created if necessary.
1902 cap->spare_workers = NULL;
1903 cap->n_spare_workers = 0;
1904 cap->returning_tasks_hd = NULL;
1905 cap->returning_tasks_tl = NULL;
1906 #endif
1907
1908 // Release all caps except 0, we'll use that for starting
1909 // the IO manager and running the client action below.
1910 if (cap->no != 0) {
1911 task->cap = cap;
1912 releaseCapability(cap);
1913 }
1914 }
1915 cap = &capabilities[0];
1916 task->cap = cap;
1917
1918 // Empty the threads lists. Otherwise, the garbage
1919 // collector may attempt to resurrect some of these threads.
1920 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1921 generations[g].threads = END_TSO_QUEUE;
1922 }
1923
1924 // On Unix, all timers are reset in the child, so we need to start
1925 // the timer again.
1926 initTimer();
1927 startTimer();
1928
1929 // TODO: need to trace various other things in the child
1930 // like startup event, capabilities, process info etc
1931 traceTaskCreate(task, cap);
1932
1933 #if defined(THREADED_RTS)
1934 ioManagerStartCap(&cap);
1935 #endif
1936
1937 rts_evalStableIO(&cap, entry, NULL); // run the action
1938 rts_checkSchedStatus("forkProcess",cap);
1939
1940 rts_unlock(cap);
1941 hs_exit(); // clean up and exit
1942 stg_exit(EXIT_SUCCESS);
1943 }
1944 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1945 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1946 #endif
1947 }
1948
1949 /* ---------------------------------------------------------------------------
1950 * Changing the number of Capabilities
1951 *
1952 * Changing the number of Capabilities is very tricky! We can only do
1953 * it with the system fully stopped, so we do a full sync with
1954 * requestSync(SYNC_OTHER) and grab all the capabilities.
1955 *
1956 * Then we resize the appropriate data structures, and update all
1957 * references to the old data structures which have now moved.
1958 * Finally we release the Capabilities we are holding, and start
1959 * worker Tasks on the new Capabilities we created.
1960 *
1961 * ------------------------------------------------------------------------- */
1962
1963 void
1964 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1965 {
1966 #if !defined(THREADED_RTS)
1967 if (new_n_capabilities != 1) {
1968 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1969 }
1970 return;
1971 #elif defined(NOSMP)
1972 if (new_n_capabilities != 1) {
1973 errorBelch("setNumCapabilities: not supported on this platform");
1974 }
1975 return;
1976 #else
1977 Task *task;
1978 Capability *cap;
1979 nat sync;
1980 StgTSO* t;
1981 nat g, n;
1982 Capability *old_capabilities = NULL;
1983
1984 if (new_n_capabilities == enabled_capabilities) return;
1985
1986 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1987 enabled_capabilities, new_n_capabilities);
1988
1989 cap = rts_lock();
1990 task = cap->running_task;
1991
1992 do {
1993 sync = requestSync(&cap, task, SYNC_OTHER);
1994 } while (sync);
1995
1996 acquireAllCapabilities(cap,task);
1997
1998 pending_sync = 0;
1999
2000 if (new_n_capabilities < enabled_capabilities)
2001 {
2002 // Reducing the number of capabilities: we do not actually
2003 // remove the extra capabilities, we just mark them as
2004 // "disabled". This has the following effects:
2005 //
2006 // - threads on a disabled capability are migrated away by the
2007 // scheduler loop
2008 //
2009 // - disabled capabilities do not participate in GC
2010 // (see scheduleDoGC())
2011 //
2012 // - No spark threads are created on this capability
2013 // (see scheduleActivateSpark())
2014 //
2015 // - We do not attempt to migrate threads *to* a disabled
2016 // capability (see schedulePushWork()).
2017 //
2018 // but in other respects, a disabled capability remains
2019 // alive. Threads may be woken up on a disabled capability,
2020 // but they will be immediately migrated away.
2021 //
2022 // This approach is much easier than trying to actually remove
2023 // the capability; we don't have to worry about GC data
2024 // structures, the nursery, etc.
2025 //
2026 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2027 capabilities[n].disabled = rtsTrue;
2028 traceCapDisable(&capabilities[n]);
2029 }
2030 enabled_capabilities = new_n_capabilities;
2031 }
2032 else
2033 {
2034 // Increasing the number of enabled capabilities.
2035 //
2036 // enable any disabled capabilities, up to the required number
2037 for (n = enabled_capabilities;
2038 n < new_n_capabilities && n < n_capabilities; n++) {
2039 capabilities[n].disabled = rtsFalse;
2040 traceCapEnable(&capabilities[n]);
2041 }
2042 enabled_capabilities = n;
2043
2044 if (new_n_capabilities > n_capabilities) {
2045 #if defined(TRACING)
2046 // Allocate eventlog buffers for the new capabilities. Note this
2047 // must be done before calling moreCapabilities(), because that
2048 // will emit events about creating the new capabilities and adding
2049 // them to existing capsets.
2050 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2051 #endif
2052
2053 // Resize the capabilities array
2054 // NB. after this, capabilities points somewhere new. Any pointers
2055 // of type (Capability *) are now invalid.
2056 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
2057
2058 // update our own cap pointer
2059 cap = &capabilities[cap->no];
2060
2061 // Resize and update storage manager data structures
2062 storageAddCapabilities(n_capabilities, new_n_capabilities);
2063
2064 // Update (Capability *) refs in the Task manager.
2065 updateCapabilityRefs();
2066
2067 // Update (Capability *) refs from TSOs
2068 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2069 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
2070 t->cap = &capabilities[t->cap->no];
2071 }
2072 }
2073 }
2074 }
2075
2076 // We're done: release the original Capabilities
2077 releaseAllCapabilities(cap,task);
2078
2079 // Start worker tasks on the new Capabilities
2080 startWorkerTasks(n_capabilities, new_n_capabilities);
2081
2082 // finally, update n_capabilities
2083 if (new_n_capabilities > n_capabilities) {
2084 n_capabilities = enabled_capabilities = new_n_capabilities;
2085 }
2086
2087 // We can't free the old array until now, because we access it
2088 // while updating pointers in updateCapabilityRefs().
2089 if (old_capabilities) {
2090 stgFree(old_capabilities);
2091 }
2092
2093 rts_unlock(cap);
2094
2095 #endif // THREADED_RTS
2096 }
2097
2098
2099
2100 /* ---------------------------------------------------------------------------
2101 * Delete all the threads in the system
2102 * ------------------------------------------------------------------------- */
2103
2104 static void
2105 deleteAllThreads ( Capability *cap )
2106 {
2107 // NOTE: only safe to call if we own all capabilities.
2108
2109 StgTSO* t, *next;
2110 nat g;
2111
2112 debugTrace(DEBUG_sched,"deleting all threads");
2113 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2114 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2115 next = t->global_link;
2116 deleteThread(cap,t);
2117 }
2118 }
2119
2120 // The run queue now contains a bunch of ThreadKilled threads. We
2121 // must not throw these away: the main thread(s) will be in there
2122 // somewhere, and the main scheduler loop has to deal with it.
2123 // Also, the run queue is the only thing keeping these threads from
2124 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2125
2126 #if !defined(THREADED_RTS)
2127 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2128 ASSERT(sleeping_queue == END_TSO_QUEUE);
2129 #endif
2130 }
2131
2132 /* -----------------------------------------------------------------------------
2133 Managing the suspended_ccalls list.
2134 Locks required: sched_mutex
2135 -------------------------------------------------------------------------- */
2136
2137 STATIC_INLINE void
2138 suspendTask (Capability *cap, Task *task)
2139 {
2140 InCall *incall;
2141
2142 incall = task->incall;
2143 ASSERT(incall->next == NULL && incall->prev == NULL);
2144 incall->next = cap->suspended_ccalls;
2145 incall->prev = NULL;
2146 if (cap->suspended_ccalls) {
2147 cap->suspended_ccalls->prev = incall;
2148 }
2149 cap->suspended_ccalls = incall;
2150 }
2151
2152 STATIC_INLINE void
2153 recoverSuspendedTask (Capability *cap, Task *task)
2154 {
2155 InCall *incall;
2156
2157 incall = task->incall;
2158 if (incall->prev) {
2159 incall->prev->next = incall->next;
2160 } else {
2161 ASSERT(cap->suspended_ccalls == incall);
2162 cap->suspended_ccalls = incall->next;
2163 }
2164 if (incall->next) {
2165 incall->next->prev = incall->prev;
2166 }
2167 incall->next = incall->prev = NULL;
2168 }
2169
2170 /* ---------------------------------------------------------------------------
2171 * Suspending & resuming Haskell threads.
2172 *
2173 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2174 * its capability before calling the C function. This allows another
2175 * task to pick up the capability and carry on running Haskell
2176 * threads. It also means that if the C call blocks, it won't lock
2177 * the whole system.
2178 *
2179 * The Haskell thread making the C call is put to sleep for the
2180 * duration of the call, on the suspended_ccalling_threads queue. We
2181 * give out a token to the task, which it can use to resume the thread
2182 * on return from the C function.
2183 *
2184 * If this is an interruptible C call, this means that the FFI call may be
2185 * unceremoniously terminated and should be scheduled on an
2186 * unbound worker thread.
2187 * ------------------------------------------------------------------------- */
2188
2189 void *
2190 suspendThread (StgRegTable *reg, rtsBool interruptible)
2191 {
2192 Capability *cap;
2193 int saved_errno;
2194 StgTSO *tso;
2195 Task *task;
2196 #if mingw32_HOST_OS
2197 StgWord32 saved_winerror;
2198 #endif
2199
2200 saved_errno = errno;
2201 #if mingw32_HOST_OS
2202 saved_winerror = GetLastError();
2203 #endif
2204
2205 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2206 */
2207 cap = regTableToCapability(reg);
2208
2209 task = cap->running_task;
2210 tso = cap->r.rCurrentTSO;
2211
2212 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2213
2214 // XXX this might not be necessary --SDM
2215 tso->what_next = ThreadRunGHC;
2216
2217 threadPaused(cap,tso);
2218
2219 if (interruptible) {
2220 tso->why_blocked = BlockedOnCCall_Interruptible;
2221 } else {
2222 tso->why_blocked = BlockedOnCCall;
2223 }
2224
2225 // Hand back capability
2226 task->incall->suspended_tso = tso;
2227 task->incall->suspended_cap = cap;
2228
2229 ACQUIRE_LOCK(&cap->lock);
2230
2231 suspendTask(cap,task);
2232 cap->in_haskell = rtsFalse;
2233 releaseCapability_(cap,rtsFalse);
2234
2235 RELEASE_LOCK(&cap->lock);
2236
2237 errno = saved_errno;
2238 #if mingw32_HOST_OS
2239 SetLastError(saved_winerror);
2240 #endif
2241 return task;
2242 }
2243
2244 StgRegTable *
2245 resumeThread (void *task_)
2246 {
2247 StgTSO *tso;
2248 InCall *incall;
2249 Capability *cap;
2250 Task *task = task_;
2251 int saved_errno;
2252 #if mingw32_HOST_OS
2253 StgWord32 saved_winerror;
2254 #endif
2255
2256 saved_errno = errno;
2257 #if mingw32_HOST_OS
2258 saved_winerror = GetLastError();
2259 #endif
2260
2261 incall = task->incall;
2262 cap = incall->suspended_cap;
2263 task->cap = cap;
2264
2265 // Wait for permission to re-enter the RTS with the result.
2266 waitForReturnCapability(&cap,task);
2267 // we might be on a different capability now... but if so, our
2268 // entry on the suspended_ccalls list will also have been
2269 // migrated.
2270
2271 // Remove the thread from the suspended list
2272 recoverSuspendedTask(cap,task);
2273
2274 tso = incall->suspended_tso;
2275 incall->suspended_tso = NULL;
2276 incall->suspended_cap = NULL;
2277 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2278
2279 traceEventRunThread(cap, tso);
2280
2281 /* Reset blocking status */
2282 tso->why_blocked = NotBlocked;
2283
2284 if ((tso->flags & TSO_BLOCKEX) == 0) {
2285 // avoid locking the TSO if we don't have to
2286 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2287 maybePerformBlockedException(cap,tso);
2288 }
2289 }
2290
2291 cap->r.rCurrentTSO = tso;
2292 cap->in_haskell = rtsTrue;
2293 errno = saved_errno;
2294 #if mingw32_HOST_OS
2295 SetLastError(saved_winerror);
2296 #endif
2297
2298 /* We might have GC'd, mark the TSO dirty again */
2299 dirty_TSO(cap,tso);
2300 dirty_STACK(cap,tso->stackobj);
2301
2302 IF_DEBUG(sanity, checkTSO(tso));
2303
2304 return &cap->r;
2305 }
2306
2307 /* ---------------------------------------------------------------------------
2308 * scheduleThread()
2309 *
2310 * scheduleThread puts a thread on the end of the runnable queue.
2311 * This will usually be done immediately after a thread is created.
2312 * The caller of scheduleThread must create the thread using e.g.
2313 * createThread and push an appropriate closure
2314 * on this thread's stack before the scheduler is invoked.
2315 * ------------------------------------------------------------------------ */
2316
2317 void
2318 scheduleThread(Capability *cap, StgTSO *tso)
2319 {
2320 // The thread goes at the *end* of the run-queue, to avoid possible
2321 // starvation of any threads already on the queue.
2322 appendToRunQueue(cap,tso);
2323 }
2324
2325 void
2326 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2327 {
2328 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2329 // move this thread from now on.
2330 #if defined(THREADED_RTS)
2331 cpu %= enabled_capabilities;
2332 if (cpu == cap->no) {
2333 appendToRunQueue(cap,tso);
2334 } else {
2335 migrateThread(cap, tso, &capabilities[cpu]);
2336 }
2337 #else
2338 appendToRunQueue(cap,tso);
2339 #endif
2340 }
2341
2342 void
2343 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2344 {
2345 Task *task;
2346 DEBUG_ONLY( StgThreadID id );
2347 Capability *cap;
2348
2349 cap = *pcap;
2350
2351 // We already created/initialised the Task
2352 task = cap->running_task;
2353
2354 // This TSO is now a bound thread; make the Task and TSO
2355 // point to each other.
2356 tso->bound = task->incall;
2357 tso->cap = cap;
2358
2359 task->incall->tso = tso;
2360 task->incall->ret = ret;
2361 task->incall->stat = NoStatus;
2362
2363 appendToRunQueue(cap,tso);
2364
2365 DEBUG_ONLY( id = tso->id );
2366 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2367
2368 cap = schedule(cap,task);
2369
2370 ASSERT(task->incall->stat != NoStatus);
2371 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2372
2373 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2374 *pcap = cap;
2375 }
2376
2377 /* ----------------------------------------------------------------------------
2378 * Starting Tasks
2379 * ------------------------------------------------------------------------- */
2380
2381 #if defined(THREADED_RTS)
2382 void scheduleWorker (Capability *cap, Task *task)
2383 {
2384 // schedule() runs without a lock.
2385 cap = schedule(cap,task);
2386
2387 // On exit from schedule(), we have a Capability, but possibly not
2388 // the same one we started with.
2389
2390 // During shutdown, the requirement is that after all the
2391 // Capabilities are shut down, all workers that are shutting down
2392 // have finished workerTaskStop(). This is why we hold on to
2393 // cap->lock until we've finished workerTaskStop() below.
2394 //
2395 // There may be workers still involved in foreign calls; those
2396 // will just block in waitForReturnCapability() because the
2397 // Capability has been shut down.
2398 //
2399 ACQUIRE_LOCK(&cap->lock);
2400 releaseCapability_(cap,rtsFalse);
2401 workerTaskStop(task);
2402 RELEASE_LOCK(&cap->lock);
2403 }
2404 #endif
2405
2406 /* ---------------------------------------------------------------------------
2407 * Start new worker tasks on Capabilities from--to
2408 * -------------------------------------------------------------------------- */
2409
2410 static void
2411 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2412 {
2413 #if defined(THREADED_RTS)
2414 nat i;
2415 Capability *cap;
2416
2417 for (i = from; i < to; i++) {
2418 cap = &capabilities[i];
2419 ACQUIRE_LOCK(&cap->lock);
2420 startWorkerTask(cap);
2421 RELEASE_LOCK(&cap->lock);
2422 }
2423 #endif
2424 }
2425
2426 /* ---------------------------------------------------------------------------
2427 * initScheduler()
2428 *
2429 * Initialise the scheduler. This resets all the queues - if the
2430 * queues contained any threads, they'll be garbage collected at the
2431 * next pass.
2432 *
2433 * ------------------------------------------------------------------------ */
2434
2435 void
2436 initScheduler(void)
2437 {
2438 #if !defined(THREADED_RTS)
2439 blocked_queue_hd = END_TSO_QUEUE;
2440 blocked_queue_tl = END_TSO_QUEUE;
2441 sleeping_queue = END_TSO_QUEUE;
2442 #endif
2443
2444 sched_state = SCHED_RUNNING;
2445 recent_activity = ACTIVITY_YES;
2446
2447 #if defined(THREADED_RTS)
2448 /* Initialise the mutex and condition variables used by
2449 * the scheduler. */
2450 initMutex(&sched_mutex);
2451 #endif
2452
2453 ACQUIRE_LOCK(&sched_mutex);
2454
2455 /* A capability holds the state a native thread needs in
2456 * order to execute STG code. At least one capability is
2457 * floating around (only THREADED_RTS builds have more than one).
2458 */
2459 initCapabilities();
2460
2461 initTaskManager();
2462
2463 /*
2464 * Eagerly start one worker to run each Capability, except for
2465 * Capability 0. The idea is that we're probably going to start a
2466 * bound thread on Capability 0 pretty soon, so we don't want a
2467 * worker task hogging it.
2468 */
2469 startWorkerTasks(1, n_capabilities);
2470
2471 RELEASE_LOCK(&sched_mutex);
2472
2473 }
2474
2475 void
2476 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2477 /* see Capability.c, shutdownCapability() */
2478 {
2479 Task *task = NULL;
2480
2481 task = newBoundTask();
2482
2483 // If we haven't killed all the threads yet, do it now.
2484 if (sched_state < SCHED_SHUTTING_DOWN) {
2485 sched_state = SCHED_INTERRUPTING;
2486 Capability *cap = task->cap;
2487 waitForReturnCapability(&cap,task);
2488 scheduleDoGC(&cap,task,rtsTrue);
2489 ASSERT(task->incall->tso == NULL);
2490 releaseCapability(cap);
2491 }
2492 sched_state = SCHED_SHUTTING_DOWN;
2493
2494 shutdownCapabilities(task, wait_foreign);
2495
2496 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2497 // n_failed_trygrab_idles, n_idle_caps);
2498
2499 boundTaskExiting(task);
2500 }
2501
2502 void
2503 freeScheduler( void )
2504 {
2505 nat still_running;
2506
2507 ACQUIRE_LOCK(&sched_mutex);
2508 still_running = freeTaskManager();
2509 // We can only free the Capabilities if there are no Tasks still
2510 // running. We might have a Task about to return from a foreign
2511 // call into waitForReturnCapability(), for example (actually,
2512 // this should be the *only* thing that a still-running Task can
2513 // do at this point, and it will block waiting for the
2514 // Capability).
2515 if (still_running == 0) {
2516 freeCapabilities();
2517 if (n_capabilities != 1) {
2518 stgFree(capabilities);
2519 }
2520 }
2521 RELEASE_LOCK(&sched_mutex);
2522 #if defined(THREADED_RTS)
2523 closeMutex(&sched_mutex);
2524 #endif
2525 }
2526
2527 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2528 void *user USED_IF_NOT_THREADS)
2529 {
2530 #if !defined(THREADED_RTS)
2531 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2532 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2533 evac(user, (StgClosure **)(void *)&sleeping_queue);
2534 #endif
2535 }
2536
2537 /* -----------------------------------------------------------------------------
2538 performGC
2539
2540 This is the interface to the garbage collector from Haskell land.
2541 We provide this so that external C code can allocate and garbage
2542 collect when called from Haskell via _ccall_GC.
2543 -------------------------------------------------------------------------- */
2544
2545 static void
2546 performGC_(rtsBool force_major)
2547 {
2548 Task *task;
2549 Capability *cap = NULL;
2550
2551 // We must grab a new Task here, because the existing Task may be
2552 // associated with a particular Capability, and chained onto the
2553 // suspended_ccalls queue.
2554 task = newBoundTask();
2555
2556 // TODO: do we need to traceTask*() here?
2557
2558 waitForReturnCapability(&cap,task);
2559 scheduleDoGC(&cap,task,force_major);
2560 releaseCapability(cap);
2561 boundTaskExiting(task);
2562 }
2563
2564 void
2565 performGC(void)
2566 {
2567 performGC_(rtsFalse);
2568 }
2569
2570 void
2571 performMajorGC(void)
2572 {
2573 performGC_(rtsTrue);
2574 }
2575
2576 /* ---------------------------------------------------------------------------
2577 Interrupt execution
2578 - usually called inside a signal handler so it mustn't do anything fancy.
2579 ------------------------------------------------------------------------ */
2580
2581 void
2582 interruptStgRts(void)
2583 {
2584 sched_state = SCHED_INTERRUPTING;
2585 interruptAllCapabilities();
2586 #if defined(THREADED_RTS)
2587 wakeUpRts();
2588 #endif
2589 }
2590
2591 /* -----------------------------------------------------------------------------
2592 Wake up the RTS
2593
2594 This function causes at least one OS thread to wake up and run the
2595 scheduler loop. It is invoked when the RTS might be deadlocked, or
2596 an external event has arrived that may need servicing (eg. a
2597 keyboard interrupt).
2598
2599 In the single-threaded RTS we don't do anything here; we only have
2600 one thread anyway, and the event that caused us to want to wake up
2601 will have interrupted any blocking system call in progress anyway.
2602 -------------------------------------------------------------------------- */
2603
2604 #if defined(THREADED_RTS)
2605 void wakeUpRts(void)
2606 {
2607 // This forces the IO Manager thread to wakeup, which will
2608 // in turn ensure that some OS thread wakes up and runs the
2609 // scheduler loop, which will cause a GC and deadlock check.
2610 ioManagerWakeup();
2611 }
2612 #endif
2613
2614 /* -----------------------------------------------------------------------------
2615 Deleting threads
2616
2617 This is used for interruption (^C) and forking, and corresponds to
2618 raising an exception but without letting the thread catch the
2619 exception.
2620 -------------------------------------------------------------------------- */
2621
2622 static void
2623 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2624 {
2625 // NOTE: must only be called on a TSO that we have exclusive
2626 // access to, because we will call throwToSingleThreaded() below.
2627 // The TSO must be on the run queue of the Capability we own, or
2628 // we must own all Capabilities.
2629
2630 if (tso->why_blocked != BlockedOnCCall &&
2631 tso->why_blocked != BlockedOnCCall_Interruptible) {
2632 throwToSingleThreaded(tso->cap,tso,NULL);
2633 }
2634 }
2635
2636 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2637 static void
2638 deleteThread_(Capability *cap, StgTSO *tso)
2639 { // for forkProcess only:
2640 // like deleteThread(), but we delete threads in foreign calls, too.
2641
2642 if (tso->why_blocked == BlockedOnCCall ||
2643 tso->why_blocked == BlockedOnCCall_Interruptible) {
2644 tso->what_next = ThreadKilled;
2645 appendToRunQueue(tso->cap, tso);
2646 } else {
2647 deleteThread(cap,tso);
2648 }
2649 }
2650 #endif
2651
2652 /* -----------------------------------------------------------------------------
2653 raiseExceptionHelper
2654
2655 This function is called by the raise# primitve, just so that we can
2656 move some of the tricky bits of raising an exception from C-- into
2657 C. Who knows, it might be a useful re-useable thing here too.
2658 -------------------------------------------------------------------------- */
2659
2660 StgWord
2661 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2662 {
2663 Capability *cap = regTableToCapability(reg);
2664 StgThunk *raise_closure = NULL;
2665 StgPtr p, next;
2666 StgRetInfoTable *info;
2667 //
2668 // This closure represents the expression 'raise# E' where E
2669 // is the exception raise. It is used to overwrite all the
2670 // thunks which are currently under evaluataion.
2671 //
2672
2673 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2674 // LDV profiling: stg_raise_info has THUNK as its closure
2675 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2676 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2677 // 1 does not cause any problem unless profiling is performed.
2678 // However, when LDV profiling goes on, we need to linearly scan
2679 // small object pool, where raise_closure is stored, so we should
2680 // use MIN_UPD_SIZE.
2681 //
2682 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2683 // sizeofW(StgClosure)+1);
2684 //
2685
2686 //
2687 // Walk up the stack, looking for the catch frame. On the way,
2688 // we update any closures pointed to from update frames with the
2689 // raise closure that we just built.
2690 //
2691 p = tso->stackobj->sp;
2692 while(1) {
2693 info = get_ret_itbl((StgClosure *)p);
2694 next = p + stack_frame_sizeW((StgClosure *)p);
2695 switch (info->i.type) {
2696
2697 case UPDATE_FRAME:
2698 // Only create raise_closure if we need to.
2699 if (raise_closure == NULL) {
2700 raise_closure =
2701 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2702 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2703 raise_closure->payload[0] = exception;
2704 }
2705 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2706 (StgClosure *)raise_closure);
2707 p = next;
2708 continue;
2709
2710 case ATOMICALLY_FRAME:
2711 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2712 tso->stackobj->sp = p;
2713 return ATOMICALLY_FRAME;
2714
2715 case CATCH_FRAME:
2716 tso->stackobj->sp = p;
2717 return CATCH_FRAME;
2718
2719 case CATCH_STM_FRAME:
2720 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2721 tso->stackobj->sp = p;
2722 return CATCH_STM_FRAME;
2723
2724 case UNDERFLOW_FRAME:
2725 tso->stackobj->sp = p;
2726 threadStackUnderflow(cap,tso);
2727 p = tso->stackobj->sp;
2728 continue;
2729
2730 case STOP_FRAME:
2731 tso->stackobj->sp = p;
2732 return STOP_FRAME;
2733
2734 case CATCH_RETRY_FRAME:
2735 default:
2736 p = next;
2737 continue;
2738 }
2739 }
2740 }
2741
2742
2743 /* -----------------------------------------------------------------------------
2744 findRetryFrameHelper
2745
2746 This function is called by the retry# primitive. It traverses the stack
2747 leaving tso->sp referring to the frame which should handle the retry.
2748
2749 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2750 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2751
2752 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2753 create) because retries are not considered to be exceptions, despite the
2754 similar implementation.
2755
2756 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2757 not be created within memory transactions.
2758 -------------------------------------------------------------------------- */
2759
2760 StgWord
2761 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2762 {
2763 StgPtr p, next;
2764 StgRetInfoTable *info;
2765
2766 p = tso->stackobj->sp;
2767 while (1) {
2768 info = get_ret_itbl((StgClosure *)p);
2769 next = p + stack_frame_sizeW((StgClosure *)p);
2770 switch (info->i.type) {
2771
2772 case ATOMICALLY_FRAME:
2773 debugTrace(DEBUG_stm,
2774 "found ATOMICALLY_FRAME at %p during retry", p);
2775 tso->stackobj->sp = p;
2776 return ATOMICALLY_FRAME;
2777
2778 case CATCH_RETRY_FRAME:
2779 debugTrace(DEBUG_stm,
2780 "found CATCH_RETRY_FRAME at %p during retrry", p);
2781 tso->stackobj->sp = p;
2782 return CATCH_RETRY_FRAME;
2783
2784 case CATCH_STM_FRAME: {
2785 StgTRecHeader *trec = tso -> trec;
2786 StgTRecHeader *outer = trec -> enclosing_trec;
2787 debugTrace(DEBUG_stm,
2788 "found CATCH_STM_FRAME at %p during retry", p);
2789 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2790 stmAbortTransaction(cap, trec);
2791 stmFreeAbortedTRec(cap, trec);
2792 tso -> trec = outer;
2793 p = next;
2794 continue;
2795 }
2796
2797 case UNDERFLOW_FRAME:
2798 threadStackUnderflow(cap,tso);
2799 p = tso->stackobj->sp;
2800 continue;
2801
2802 default:
2803 ASSERT(info->i.type != CATCH_FRAME);
2804 ASSERT(info->i.type != STOP_FRAME);
2805 p = next;
2806 continue;
2807 }
2808 }
2809 }
2810
2811 /* -----------------------------------------------------------------------------
2812 resurrectThreads is called after garbage collection on the list of
2813 threads found to be garbage. Each of these threads will be woken
2814 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2815 on an MVar, or NonTermination if the thread was blocked on a Black
2816 Hole.
2817
2818 Locks: assumes we hold *all* the capabilities.
2819 -------------------------------------------------------------------------- */
2820
2821 void
2822 resurrectThreads (StgTSO *threads)
2823 {
2824 StgTSO *tso, *next;
2825 Capability *cap;
2826 generation *gen;
2827
2828 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2829 next = tso->global_link;
2830
2831 gen = Bdescr((P_)tso)->gen;
2832 tso->global_link = gen->threads;
2833 gen->threads = tso;
2834
2835 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2836
2837 // Wake up the thread on the Capability it was last on
2838 cap = tso->cap;
2839
2840 switch (tso->why_blocked) {
2841 case BlockedOnMVar:
2842 /* Called by GC - sched_mutex lock is currently held. */
2843 throwToSingleThreaded(cap, tso,
2844 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2845 break;
2846 case BlockedOnBlackHole:
2847 throwToSingleThreaded(cap, tso,
2848 (StgClosure *)nonTermination_closure);
2849 break;
2850 case BlockedOnSTM:
2851 throwToSingleThreaded(cap, tso,
2852 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2853 break;
2854 case NotBlocked:
2855 /* This might happen if the thread was blocked on a black hole
2856 * belonging to a thread that we've just woken up (raiseAsync
2857 * can wake up threads, remember...).
2858 */
2859 continue;
2860 case BlockedOnMsgThrowTo:
2861 // This can happen if the target is masking, blocks on a
2862 // black hole, and then is found to be unreachable. In
2863 // this case, we want to let the target wake up and carry
2864 // on, and do nothing to this thread.
2865 continue;
2866 default:
2867 barf("resurrectThreads: thread blocked in a strange way: %d",
2868 tso->why_blocked);
2869 }
2870 }
2871 }