Allow limiting the number of GC threads (+RTS -qn<n>)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
127 static void stopAllCapabilities(Capability **pCap, Task *task);
128 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 nat prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 nat prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 nat prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
578 IF_DEBUG(sanity, checkRunQueue(cap));
579 }
580
581 void
582 promoteInRunQueue (Capability *cap, StgTSO *tso)
583 {
584 removeFromRunQueue(cap, tso);
585 pushOnRunQueue(cap, tso);
586 }
587
588 /* ----------------------------------------------------------------------------
589 * Setting up the scheduler loop
590 * ------------------------------------------------------------------------- */
591
592 static void
593 schedulePreLoop(void)
594 {
595 // initialisation for scheduler - what cannot go into initScheduler()
596
597 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
598 win32AllocStack();
599 #endif
600 }
601
602 /* -----------------------------------------------------------------------------
603 * scheduleFindWork()
604 *
605 * Search for work to do, and handle messages from elsewhere.
606 * -------------------------------------------------------------------------- */
607
608 static void
609 scheduleFindWork (Capability **pcap)
610 {
611 scheduleStartSignalHandlers(*pcap);
612
613 scheduleProcessInbox(pcap);
614
615 scheduleCheckBlockedThreads(*pcap);
616
617 #if defined(THREADED_RTS)
618 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
619 #endif
620 }
621
622 #if defined(THREADED_RTS)
623 STATIC_INLINE rtsBool
624 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
625 {
626 // we need to yield this capability to someone else if..
627 // - another thread is initiating a GC, and we didn't just do a GC
628 // (see Note [GC livelock])
629 // - another Task is returning from a foreign call
630 // - the thread at the head of the run queue cannot be run
631 // by this Task (it is bound to another Task, or it is unbound
632 // and this task it bound).
633 //
634 // Note [GC livelock]
635 //
636 // If we are interrupted to do a GC, then we do not immediately do
637 // another one. This avoids a starvation situation where one
638 // Capability keeps forcing a GC and the other Capabilities make no
639 // progress at all.
640
641 return ((pending_sync && !didGcLast) ||
642 cap->returning_tasks_hd != NULL ||
643 (!emptyRunQueue(cap) && (task->incall->tso == NULL
644 ? peekRunQueue(cap)->bound != NULL
645 : peekRunQueue(cap)->bound != task->incall)));
646 }
647
648 // This is the single place where a Task goes to sleep. There are
649 // two reasons it might need to sleep:
650 // - there are no threads to run
651 // - we need to yield this Capability to someone else
652 // (see shouldYieldCapability())
653 //
654 // Careful: the scheduler loop is quite delicate. Make sure you run
655 // the tests in testsuite/concurrent (all ways) after modifying this,
656 // and also check the benchmarks in nofib/parallel for regressions.
657
658 static void
659 scheduleYield (Capability **pcap, Task *task)
660 {
661 Capability *cap = *pcap;
662 int didGcLast = rtsFalse;
663
664 // if we have work, and we don't need to give up the Capability, continue.
665 //
666 if (!shouldYieldCapability(cap,task,rtsFalse) &&
667 (!emptyRunQueue(cap) ||
668 !emptyInbox(cap) ||
669 sched_state >= SCHED_INTERRUPTING)) {
670 return;
671 }
672
673 // otherwise yield (sleep), and keep yielding if necessary.
674 do {
675 didGcLast = yieldCapability(&cap,task, !didGcLast);
676 }
677 while (shouldYieldCapability(cap,task,didGcLast));
678
679 // note there may still be no threads on the run queue at this
680 // point, the caller has to check.
681
682 *pcap = cap;
683 return;
684 }
685 #endif
686
687 /* -----------------------------------------------------------------------------
688 * schedulePushWork()
689 *
690 * Push work to other Capabilities if we have some.
691 * -------------------------------------------------------------------------- */
692
693 static void
694 schedulePushWork(Capability *cap USED_IF_THREADS,
695 Task *task USED_IF_THREADS)
696 {
697 /* following code not for PARALLEL_HASKELL. I kept the call general,
698 future GUM versions might use pushing in a distributed setup */
699 #if defined(THREADED_RTS)
700
701 Capability *free_caps[n_capabilities], *cap0;
702 nat i, n_free_caps;
703
704 // migration can be turned off with +RTS -qm
705 if (!RtsFlags.ParFlags.migrate) return;
706
707 // Check whether we have more threads on our run queue, or sparks
708 // in our pool, that we could hand to another Capability.
709 if (emptyRunQueue(cap)) {
710 if (sparkPoolSizeCap(cap) < 2) return;
711 } else {
712 if (singletonRunQueue(cap) &&
713 sparkPoolSizeCap(cap) < 1) return;
714 }
715
716 // First grab as many free Capabilities as we can.
717 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
718 cap0 = capabilities[i];
719 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
720 if (!emptyRunQueue(cap0)
721 || cap0->returning_tasks_hd != NULL
722 || cap0->inbox != (Message*)END_TSO_QUEUE) {
723 // it already has some work, we just grabbed it at
724 // the wrong moment. Or maybe it's deadlocked!
725 releaseCapability(cap0);
726 } else {
727 free_caps[n_free_caps++] = cap0;
728 }
729 }
730 }
731
732 // we now have n_free_caps free capabilities stashed in
733 // free_caps[]. Share our run queue equally with them. This is
734 // probably the simplest thing we could do; improvements we might
735 // want to do include:
736 //
737 // - giving high priority to moving relatively new threads, on
738 // the gournds that they haven't had time to build up a
739 // working set in the cache on this CPU/Capability.
740 //
741 // - giving low priority to moving long-lived threads
742
743 if (n_free_caps > 0) {
744 StgTSO *prev, *t, *next;
745 #ifdef SPARK_PUSHING
746 rtsBool pushed_to_all;
747 #endif
748
749 debugTrace(DEBUG_sched,
750 "cap %d: %s and %d free capabilities, sharing...",
751 cap->no,
752 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
753 "excess threads on run queue":"sparks to share (>=2)",
754 n_free_caps);
755
756 i = 0;
757 #ifdef SPARK_PUSHING
758 pushed_to_all = rtsFalse;
759 #endif
760
761 if (cap->run_queue_hd != END_TSO_QUEUE) {
762 prev = cap->run_queue_hd;
763 t = prev->_link;
764 prev->_link = END_TSO_QUEUE;
765 for (; t != END_TSO_QUEUE; t = next) {
766 next = t->_link;
767 t->_link = END_TSO_QUEUE;
768 if (t->bound == task->incall // don't move my bound thread
769 || tsoLocked(t)) { // don't move a locked thread
770 setTSOLink(cap, prev, t);
771 setTSOPrev(cap, t, prev);
772 prev = t;
773 } else if (i == n_free_caps) {
774 #ifdef SPARK_PUSHING
775 pushed_to_all = rtsTrue;
776 #endif
777 i = 0;
778 // keep one for us
779 setTSOLink(cap, prev, t);
780 setTSOPrev(cap, t, prev);
781 prev = t;
782 } else {
783 appendToRunQueue(free_caps[i],t);
784
785 traceEventMigrateThread (cap, t, free_caps[i]->no);
786
787 if (t->bound) { t->bound->task->cap = free_caps[i]; }
788 t->cap = free_caps[i];
789 i++;
790 }
791 }
792 cap->run_queue_tl = prev;
793
794 IF_DEBUG(sanity, checkRunQueue(cap));
795 }
796
797 #ifdef SPARK_PUSHING
798 /* JB I left this code in place, it would work but is not necessary */
799
800 // If there are some free capabilities that we didn't push any
801 // threads to, then try to push a spark to each one.
802 if (!pushed_to_all) {
803 StgClosure *spark;
804 // i is the next free capability to push to
805 for (; i < n_free_caps; i++) {
806 if (emptySparkPoolCap(free_caps[i])) {
807 spark = tryStealSpark(cap->sparks);
808 if (spark != NULL) {
809 /* TODO: if anyone wants to re-enable this code then
810 * they must consider the fizzledSpark(spark) case
811 * and update the per-cap spark statistics.
812 */
813 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
814
815 traceEventStealSpark(free_caps[i], t, cap->no);
816
817 newSpark(&(free_caps[i]->r), spark);
818 }
819 }
820 }
821 }
822 #endif /* SPARK_PUSHING */
823
824 // release the capabilities
825 for (i = 0; i < n_free_caps; i++) {
826 task->cap = free_caps[i];
827 // The idea behind waking up the capability unconditionally is that
828 // it might be able to steal sparks. Perhaps we should only do this
829 // if there were sparks to steal?
830 releaseAndWakeupCapability(free_caps[i]);
831 }
832 }
833 task->cap = cap; // reset to point to our Capability.
834
835 #endif /* THREADED_RTS */
836
837 }
838
839 /* ----------------------------------------------------------------------------
840 * Start any pending signal handlers
841 * ------------------------------------------------------------------------- */
842
843 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
844 static void
845 scheduleStartSignalHandlers(Capability *cap)
846 {
847 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
848 // safe outside the lock
849 startSignalHandlers(cap);
850 }
851 }
852 #else
853 static void
854 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
855 {
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860 * Check for blocked threads that can be woken up.
861 * ------------------------------------------------------------------------- */
862
863 static void
864 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
865 {
866 #if !defined(THREADED_RTS)
867 //
868 // Check whether any waiting threads need to be woken up. If the
869 // run queue is empty, and there are no other tasks running, we
870 // can wait indefinitely for something to happen.
871 //
872 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
873 {
874 awaitEvent (emptyRunQueue(cap));
875 }
876 #endif
877 }
878
879 /* ----------------------------------------------------------------------------
880 * Detect deadlock conditions and attempt to resolve them.
881 * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleDetectDeadlock (Capability **pcap, Task *task)
885 {
886 Capability *cap = *pcap;
887 /*
888 * Detect deadlock: when we have no threads to run, there are no
889 * threads blocked, waiting for I/O, or sleeping, and all the
890 * other tasks are waiting for work, we must have a deadlock of
891 * some description.
892 */
893 if ( emptyThreadQueues(cap) )
894 {
895 #if defined(THREADED_RTS)
896 /*
897 * In the threaded RTS, we only check for deadlock if there
898 * has been no activity in a complete timeslice. This means
899 * we won't eagerly start a full GC just because we don't have
900 * any threads to run currently.
901 */
902 if (recent_activity != ACTIVITY_INACTIVE) return;
903 #endif
904
905 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
906
907 // Garbage collection can release some new threads due to
908 // either (a) finalizers or (b) threads resurrected because
909 // they are unreachable and will therefore be sent an
910 // exception. Any threads thus released will be immediately
911 // runnable.
912 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
913 cap = *pcap;
914 // when force_major == rtsTrue. scheduleDoGC sets
915 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
916 // signal.
917
918 if ( !emptyRunQueue(cap) ) return;
919
920 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
921 /* If we have user-installed signal handlers, then wait
922 * for signals to arrive rather then bombing out with a
923 * deadlock.
924 */
925 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
926 debugTrace(DEBUG_sched,
927 "still deadlocked, waiting for signals...");
928
929 awaitUserSignals();
930
931 if (signals_pending()) {
932 startSignalHandlers(cap);
933 }
934
935 // either we have threads to run, or we were interrupted:
936 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
937
938 return;
939 }
940 #endif
941
942 #if !defined(THREADED_RTS)
943 /* Probably a real deadlock. Send the current main thread the
944 * Deadlock exception.
945 */
946 if (task->incall->tso) {
947 switch (task->incall->tso->why_blocked) {
948 case BlockedOnSTM:
949 case BlockedOnBlackHole:
950 case BlockedOnMsgThrowTo:
951 case BlockedOnMVar:
952 case BlockedOnMVarRead:
953 throwToSingleThreaded(cap, task->incall->tso,
954 (StgClosure *)nonTermination_closure);
955 return;
956 default:
957 barf("deadlock: main thread blocked in a strange way");
958 }
959 }
960 return;
961 #endif
962 }
963 }
964
965
966 /* ----------------------------------------------------------------------------
967 * Process message in the current Capability's inbox
968 * ------------------------------------------------------------------------- */
969
970 static void
971 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
972 {
973 #if defined(THREADED_RTS)
974 Message *m, *next;
975 int r;
976 Capability *cap = *pcap;
977
978 while (!emptyInbox(cap)) {
979 if (cap->r.rCurrentNursery->link == NULL ||
980 g0->n_new_large_words >= large_alloc_lim) {
981 scheduleDoGC(pcap, cap->running_task, rtsFalse);
982 cap = *pcap;
983 }
984
985 // don't use a blocking acquire; if the lock is held by
986 // another thread then just carry on. This seems to avoid
987 // getting stuck in a message ping-pong situation with other
988 // processors. We'll check the inbox again later anyway.
989 //
990 // We should really use a more efficient queue data structure
991 // here. The trickiness is that we must ensure a Capability
992 // never goes idle if the inbox is non-empty, which is why we
993 // use cap->lock (cap->lock is released as the last thing
994 // before going idle; see Capability.c:releaseCapability()).
995 r = TRY_ACQUIRE_LOCK(&cap->lock);
996 if (r != 0) return;
997
998 m = cap->inbox;
999 cap->inbox = (Message*)END_TSO_QUEUE;
1000
1001 RELEASE_LOCK(&cap->lock);
1002
1003 while (m != (Message*)END_TSO_QUEUE) {
1004 next = m->link;
1005 executeMessage(cap, m);
1006 m = next;
1007 }
1008 }
1009 #endif
1010 }
1011
1012 /* ----------------------------------------------------------------------------
1013 * Activate spark threads (THREADED_RTS)
1014 * ------------------------------------------------------------------------- */
1015
1016 #if defined(THREADED_RTS)
1017 static void
1018 scheduleActivateSpark(Capability *cap)
1019 {
1020 if (anySparks() && !cap->disabled)
1021 {
1022 createSparkThread(cap);
1023 debugTrace(DEBUG_sched, "creating a spark thread");
1024 }
1025 }
1026 #endif // THREADED_RTS
1027
1028 /* ----------------------------------------------------------------------------
1029 * After running a thread...
1030 * ------------------------------------------------------------------------- */
1031
1032 static void
1033 schedulePostRunThread (Capability *cap, StgTSO *t)
1034 {
1035 // We have to be able to catch transactions that are in an
1036 // infinite loop as a result of seeing an inconsistent view of
1037 // memory, e.g.
1038 //
1039 // atomically $ do
1040 // [a,b] <- mapM readTVar [ta,tb]
1041 // when (a == b) loop
1042 //
1043 // and a is never equal to b given a consistent view of memory.
1044 //
1045 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1046 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1047 debugTrace(DEBUG_sched | DEBUG_stm,
1048 "trec %p found wasting its time", t);
1049
1050 // strip the stack back to the
1051 // ATOMICALLY_FRAME, aborting the (nested)
1052 // transaction, and saving the stack of any
1053 // partially-evaluated thunks on the heap.
1054 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1055
1056 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1057 }
1058 }
1059
1060 //
1061 // If the current thread's allocation limit has run out, send it
1062 // the AllocationLimitExceeded exception.
1063
1064 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1065 // Use a throwToSelf rather than a throwToSingleThreaded, because
1066 // it correctly handles the case where the thread is currently
1067 // inside mask. Also the thread might be blocked (e.g. on an
1068 // MVar), and throwToSingleThreaded doesn't unblock it
1069 // correctly in that case.
1070 throwToSelf(cap, t, allocationLimitExceeded_closure);
1071 ASSIGN_Int64((W_*)&(t->alloc_limit),
1072 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1073 }
1074
1075 /* some statistics gathering in the parallel case */
1076 }
1077
1078 /* -----------------------------------------------------------------------------
1079 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1080 * -------------------------------------------------------------------------- */
1081
1082 static rtsBool
1083 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1084 {
1085 if (cap->r.rHpLim == NULL || cap->context_switch) {
1086 // Sometimes we miss a context switch, e.g. when calling
1087 // primitives in a tight loop, MAYBE_GC() doesn't check the
1088 // context switch flag, and we end up waiting for a GC.
1089 // See #1984, and concurrent/should_run/1984
1090 cap->context_switch = 0;
1091 appendToRunQueue(cap,t);
1092 } else {
1093 pushOnRunQueue(cap,t);
1094 }
1095
1096 // did the task ask for a large block?
1097 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1098 // if so, get one and push it on the front of the nursery.
1099 bdescr *bd;
1100 W_ blocks;
1101
1102 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1103
1104 if (blocks > BLOCKS_PER_MBLOCK) {
1105 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1106 }
1107
1108 debugTrace(DEBUG_sched,
1109 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1110 (long)t->id, what_next_strs[t->what_next], blocks);
1111
1112 // don't do this if the nursery is (nearly) full, we'll GC first.
1113 if (cap->r.rCurrentNursery->link != NULL ||
1114 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1115 // infinite loop if the
1116 // nursery has only one
1117 // block.
1118
1119 bd = allocGroup_lock(blocks);
1120 cap->r.rNursery->n_blocks += blocks;
1121
1122 // link the new group after CurrentNursery
1123 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1124
1125 // initialise it as a nursery block. We initialise the
1126 // step, gen_no, and flags field of *every* sub-block in
1127 // this large block, because this is easier than making
1128 // sure that we always find the block head of a large
1129 // block whenever we call Bdescr() (eg. evacuate() and
1130 // isAlive() in the GC would both have to do this, at
1131 // least).
1132 {
1133 bdescr *x;
1134 for (x = bd; x < bd + blocks; x++) {
1135 initBdescr(x,g0,g0);
1136 x->free = x->start;
1137 x->flags = 0;
1138 }
1139 }
1140
1141 // This assert can be a killer if the app is doing lots
1142 // of large block allocations.
1143 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1144
1145 // now update the nursery to point to the new block
1146 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1147 cap->r.rCurrentNursery = bd;
1148
1149 // we might be unlucky and have another thread get on the
1150 // run queue before us and steal the large block, but in that
1151 // case the thread will just end up requesting another large
1152 // block.
1153 return rtsFalse; /* not actually GC'ing */
1154 }
1155 }
1156
1157 // if we got here because we exceeded large_alloc_lim, then
1158 // proceed straight to GC.
1159 if (g0->n_new_large_words >= large_alloc_lim) {
1160 return rtsTrue;
1161 }
1162
1163 // Otherwise, we just ran out of space in the current nursery.
1164 // Grab another nursery if we can.
1165 if (getNewNursery(cap)) {
1166 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1167 return rtsFalse;
1168 }
1169
1170 return rtsTrue;
1171 /* actual GC is done at the end of the while loop in schedule() */
1172 }
1173
1174 /* -----------------------------------------------------------------------------
1175 * Handle a thread that returned to the scheduler with ThreadYielding
1176 * -------------------------------------------------------------------------- */
1177
1178 static rtsBool
1179 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1180 {
1181 /* put the thread back on the run queue. Then, if we're ready to
1182 * GC, check whether this is the last task to stop. If so, wake
1183 * up the GC thread. getThread will block during a GC until the
1184 * GC is finished.
1185 */
1186
1187 ASSERT(t->_link == END_TSO_QUEUE);
1188
1189 // Shortcut if we're just switching evaluators: don't bother
1190 // doing stack squeezing (which can be expensive), just run the
1191 // thread.
1192 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1193 debugTrace(DEBUG_sched,
1194 "--<< thread %ld (%s) stopped to switch evaluators",
1195 (long)t->id, what_next_strs[t->what_next]);
1196 return rtsTrue;
1197 }
1198
1199 // Reset the context switch flag. We don't do this just before
1200 // running the thread, because that would mean we would lose ticks
1201 // during GC, which can lead to unfair scheduling (a thread hogs
1202 // the CPU because the tick always arrives during GC). This way
1203 // penalises threads that do a lot of allocation, but that seems
1204 // better than the alternative.
1205 if (cap->context_switch != 0) {
1206 cap->context_switch = 0;
1207 appendToRunQueue(cap,t);
1208 } else {
1209 pushOnRunQueue(cap,t);
1210 }
1211
1212 IF_DEBUG(sanity,
1213 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1214 checkTSO(t));
1215
1216 return rtsFalse;
1217 }
1218
1219 /* -----------------------------------------------------------------------------
1220 * Handle a thread that returned to the scheduler with ThreadBlocked
1221 * -------------------------------------------------------------------------- */
1222
1223 static void
1224 scheduleHandleThreadBlocked( StgTSO *t
1225 #if !defined(DEBUG)
1226 STG_UNUSED
1227 #endif
1228 )
1229 {
1230
1231 // We don't need to do anything. The thread is blocked, and it
1232 // has tidied up its stack and placed itself on whatever queue
1233 // it needs to be on.
1234
1235 // ASSERT(t->why_blocked != NotBlocked);
1236 // Not true: for example,
1237 // - the thread may have woken itself up already, because
1238 // threadPaused() might have raised a blocked throwTo
1239 // exception, see maybePerformBlockedException().
1240
1241 #ifdef DEBUG
1242 traceThreadStatus(DEBUG_sched, t);
1243 #endif
1244 }
1245
1246 /* -----------------------------------------------------------------------------
1247 * Handle a thread that returned to the scheduler with ThreadFinished
1248 * -------------------------------------------------------------------------- */
1249
1250 static rtsBool
1251 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1252 {
1253 /* Need to check whether this was a main thread, and if so,
1254 * return with the return value.
1255 *
1256 * We also end up here if the thread kills itself with an
1257 * uncaught exception, see Exception.cmm.
1258 */
1259
1260 // blocked exceptions can now complete, even if the thread was in
1261 // blocked mode (see #2910).
1262 awakenBlockedExceptionQueue (cap, t);
1263
1264 //
1265 // Check whether the thread that just completed was a bound
1266 // thread, and if so return with the result.
1267 //
1268 // There is an assumption here that all thread completion goes
1269 // through this point; we need to make sure that if a thread
1270 // ends up in the ThreadKilled state, that it stays on the run
1271 // queue so it can be dealt with here.
1272 //
1273
1274 if (t->bound) {
1275
1276 if (t->bound != task->incall) {
1277 #if !defined(THREADED_RTS)
1278 // Must be a bound thread that is not the topmost one. Leave
1279 // it on the run queue until the stack has unwound to the
1280 // point where we can deal with this. Leaving it on the run
1281 // queue also ensures that the garbage collector knows about
1282 // this thread and its return value (it gets dropped from the
1283 // step->threads list so there's no other way to find it).
1284 appendToRunQueue(cap,t);
1285 return rtsFalse;
1286 #else
1287 // this cannot happen in the threaded RTS, because a
1288 // bound thread can only be run by the appropriate Task.
1289 barf("finished bound thread that isn't mine");
1290 #endif
1291 }
1292
1293 ASSERT(task->incall->tso == t);
1294
1295 if (t->what_next == ThreadComplete) {
1296 if (task->incall->ret) {
1297 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1298 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1299 }
1300 task->incall->rstat = Success;
1301 } else {
1302 if (task->incall->ret) {
1303 *(task->incall->ret) = NULL;
1304 }
1305 if (sched_state >= SCHED_INTERRUPTING) {
1306 if (heap_overflow) {
1307 task->incall->rstat = HeapExhausted;
1308 } else {
1309 task->incall->rstat = Interrupted;
1310 }
1311 } else {
1312 task->incall->rstat = Killed;
1313 }
1314 }
1315 #ifdef DEBUG
1316 removeThreadLabel((StgWord)task->incall->tso->id);
1317 #endif
1318
1319 // We no longer consider this thread and task to be bound to
1320 // each other. The TSO lives on until it is GC'd, but the
1321 // task is about to be released by the caller, and we don't
1322 // want anyone following the pointer from the TSO to the
1323 // defunct task (which might have already been
1324 // re-used). This was a real bug: the GC updated
1325 // tso->bound->tso which lead to a deadlock.
1326 t->bound = NULL;
1327 task->incall->tso = NULL;
1328
1329 return rtsTrue; // tells schedule() to return
1330 }
1331
1332 return rtsFalse;
1333 }
1334
1335 /* -----------------------------------------------------------------------------
1336 * Perform a heap census
1337 * -------------------------------------------------------------------------- */
1338
1339 static rtsBool
1340 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1341 {
1342 // When we have +RTS -i0 and we're heap profiling, do a census at
1343 // every GC. This lets us get repeatable runs for debugging.
1344 if (performHeapProfile ||
1345 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1346 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1347 return rtsTrue;
1348 } else {
1349 return rtsFalse;
1350 }
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * stopAllCapabilities()
1355 *
1356 * Stop all Haskell execution. This is used when we need to make some global
1357 * change to the system, such as altering the number of capabilities, or
1358 * forking.
1359 *
1360 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1361 * -------------------------------------------------------------------------- */
1362
1363 #if defined(THREADED_RTS)
1364 static void stopAllCapabilities (Capability **pCap, Task *task)
1365 {
1366 rtsBool was_syncing;
1367 SyncType prev_sync_type;
1368
1369 PendingSync sync = {
1370 .type = SYNC_OTHER,
1371 .idle = NULL,
1372 .task = task
1373 };
1374
1375 do {
1376 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1377 } while (was_syncing);
1378
1379 acquireAllCapabilities(*pCap,task);
1380
1381 pending_sync = 0;
1382 }
1383 #endif
1384
1385 /* -----------------------------------------------------------------------------
1386 * requestSync()
1387 *
1388 * Commence a synchronisation between all capabilities. Normally not called
1389 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1390 * has some special synchronisation requirements.
1391 *
1392 * Returns:
1393 * rtsFalse if we successfully got a sync
1394 * rtsTrue if there was another sync request in progress,
1395 * and we yielded to it. The value returned is the
1396 * type of the other sync request.
1397 * -------------------------------------------------------------------------- */
1398
1399 #if defined(THREADED_RTS)
1400 static rtsBool requestSync (
1401 Capability **pcap, Task *task, PendingSync *sync,
1402 SyncType *prev_sync_type)
1403 {
1404 PendingSync *prev_sync;
1405
1406 prev_sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1407 (StgWord)NULL,
1408 (StgWord)sync);
1409
1410 if (prev_sync)
1411 {
1412 do {
1413 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1414 prev_sync->type);
1415 ASSERT(*pcap);
1416 yieldCapability(pcap,task,rtsTrue);
1417 } while (pending_sync);
1418 // NOTE: task->cap might have changed now
1419 *prev_sync_type = prev_sync->type;
1420 return rtsTrue;
1421 }
1422 else
1423 {
1424 return rtsFalse;
1425 }
1426 }
1427 #endif
1428
1429 /* -----------------------------------------------------------------------------
1430 * acquireAllCapabilities()
1431 *
1432 * Grab all the capabilities except the one we already hold. Used
1433 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1434 * before a fork (SYNC_OTHER).
1435 *
1436 * Only call this after requestSync(), otherwise a deadlock might
1437 * ensue if another thread is trying to synchronise.
1438 * -------------------------------------------------------------------------- */
1439
1440 #ifdef THREADED_RTS
1441 static void acquireAllCapabilities(Capability *cap, Task *task)
1442 {
1443 Capability *tmpcap;
1444 nat i;
1445
1446 ASSERT(pending_sync != NULL);
1447 for (i=0; i < n_capabilities; i++) {
1448 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1449 i, n_capabilities);
1450 tmpcap = capabilities[i];
1451 if (tmpcap != cap) {
1452 // we better hope this task doesn't get migrated to
1453 // another Capability while we're waiting for this one.
1454 // It won't, because load balancing happens while we have
1455 // all the Capabilities, but even so it's a slightly
1456 // unsavoury invariant.
1457 task->cap = tmpcap;
1458 waitForCapability(&tmpcap, task);
1459 if (tmpcap->no != i) {
1460 barf("acquireAllCapabilities: got the wrong capability");
1461 }
1462 }
1463 }
1464 task->cap = cap;
1465 }
1466 #endif
1467
1468 /* -----------------------------------------------------------------------------
1469 * releaseAllcapabilities()
1470 *
1471 * Assuming this thread holds all the capabilities, release them all except for
1472 * the one passed in as cap.
1473 * -------------------------------------------------------------------------- */
1474
1475 #ifdef THREADED_RTS
1476 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1477 {
1478 nat i;
1479
1480 for (i = 0; i < n; i++) {
1481 if (cap->no != i) {
1482 task->cap = capabilities[i];
1483 releaseCapability(capabilities[i]);
1484 }
1485 }
1486 task->cap = cap;
1487 }
1488 #endif
1489
1490 /* -----------------------------------------------------------------------------
1491 * Perform a garbage collection if necessary
1492 * -------------------------------------------------------------------------- */
1493
1494 static void
1495 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1496 rtsBool force_major)
1497 {
1498 Capability *cap = *pcap;
1499 rtsBool heap_census;
1500 nat collect_gen;
1501 rtsBool major_gc;
1502 #ifdef THREADED_RTS
1503 nat gc_type;
1504 nat i;
1505 nat need_idle;
1506 nat n_idle_caps = 0, n_failed_trygrab_idles = 0;
1507 StgTSO *tso;
1508 rtsBool *idle_cap;
1509 #endif
1510
1511 if (sched_state == SCHED_SHUTTING_DOWN) {
1512 // The final GC has already been done, and the system is
1513 // shutting down. We'll probably deadlock if we try to GC
1514 // now.
1515 return;
1516 }
1517
1518 heap_census = scheduleNeedHeapProfile(rtsTrue);
1519
1520 // Figure out which generation we are collecting, so that we can
1521 // decide whether this is a parallel GC or not.
1522 collect_gen = calcNeeded(force_major || heap_census, NULL);
1523 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1524
1525 #ifdef THREADED_RTS
1526 if (sched_state < SCHED_INTERRUPTING
1527 && RtsFlags.ParFlags.parGcEnabled
1528 && collect_gen >= RtsFlags.ParFlags.parGcGen
1529 && ! oldest_gen->mark)
1530 {
1531 gc_type = SYNC_GC_PAR;
1532 } else {
1533 gc_type = SYNC_GC_SEQ;
1534 }
1535
1536 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1537 need_idle = stg_max(0, enabled_capabilities -
1538 RtsFlags.ParFlags.parGcThreads);
1539 } else {
1540 need_idle = 0;
1541 }
1542
1543 // In order to GC, there must be no threads running Haskell code.
1544 // Therefore, the GC thread needs to hold *all* the capabilities,
1545 // and release them after the GC has completed.
1546 //
1547 // This seems to be the simplest way: previous attempts involved
1548 // making all the threads with capabilities give up their
1549 // capabilities and sleep except for the *last* one, which
1550 // actually did the GC. But it's quite hard to arrange for all
1551 // the other tasks to sleep and stay asleep.
1552 //
1553
1554 /* Other capabilities are prevented from running yet more Haskell
1555 threads if pending_sync is set. Tested inside
1556 yieldCapability() and releaseCapability() in Capability.c */
1557
1558 PendingSync sync = {
1559 .type = gc_type,
1560 .idle = NULL,
1561 .task = task
1562 };
1563
1564 {
1565 SyncType prev_sync = 0;
1566 rtsBool was_syncing;
1567 do {
1568 // We need an array of size n_capabilities, but since this may
1569 // change each time around the loop we must allocate it afresh.
1570 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1571 sizeof(rtsBool),
1572 "scheduleDoGC");
1573 sync.idle = idle_cap;
1574
1575 // When using +RTS -qn, we need some capabilities to be idle during
1576 // GC. The best bet is to choose some inactive ones, so we look for
1577 // those first:
1578 nat n_idle = need_idle;
1579 for (i=0; i < n_capabilities; i++) {
1580 if (capabilities[i]->disabled) {
1581 idle_cap[i] = rtsTrue;
1582 } else if (n_idle > 0 &&
1583 capabilities[i]->running_task == NULL) {
1584 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1585 n_idle--;
1586 idle_cap[i] = rtsTrue;
1587 } else {
1588 idle_cap[i] = rtsFalse;
1589 }
1590 }
1591 // If we didn't find enough inactive capabilities, just pick some
1592 // more to be idle.
1593 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1594 if (!idle_cap[i] && i != cap->no) {
1595 idle_cap[i] = rtsTrue;
1596 n_idle--;
1597 }
1598 }
1599 ASSERT(n_idle == 0);
1600
1601 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1602 cap = *pcap;
1603 if (was_syncing) {
1604 stgFree(idle_cap);
1605 }
1606 if (was_syncing && (prev_sync == SYNC_GC_SEQ ||
1607 prev_sync == SYNC_GC_PAR)) {
1608 // someone else had a pending sync request for a GC, so
1609 // let's assume GC has been done and we don't need to GC
1610 // again.
1611 return;
1612 }
1613 if (sched_state == SCHED_SHUTTING_DOWN) {
1614 // The scheduler might now be shutting down. We tested
1615 // this above, but it might have become true since then as
1616 // we yielded the capability in requestSync().
1617 return;
1618 }
1619 } while (was_syncing);
1620 }
1621
1622 #ifdef DEBUG
1623 unsigned int old_n_capabilities = n_capabilities;
1624 #endif
1625
1626 interruptAllCapabilities();
1627
1628 // The final shutdown GC is always single-threaded, because it's
1629 // possible that some of the Capabilities have no worker threads.
1630
1631 if (gc_type == SYNC_GC_SEQ) {
1632 traceEventRequestSeqGc(cap);
1633 } else {
1634 traceEventRequestParGc(cap);
1635 }
1636
1637 if (gc_type == SYNC_GC_SEQ) {
1638 // single-threaded GC: grab all the capabilities
1639 acquireAllCapabilities(cap,task);
1640 }
1641 else
1642 {
1643 // If we are load-balancing collections in this
1644 // generation, then we require all GC threads to participate
1645 // in the collection. Otherwise, we only require active
1646 // threads to participate, and we set gc_threads[i]->idle for
1647 // any idle capabilities. The rationale here is that waking
1648 // up an idle Capability takes much longer than just doing any
1649 // GC work on its behalf.
1650
1651 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1652 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1653 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1654 {
1655 for (i=0; i < n_capabilities; i++) {
1656 if (capabilities[i]->disabled) {
1657 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1658 if (idle_cap[i]) {
1659 n_idle_caps++;
1660 }
1661 } else {
1662 if (i != cap->no && idle_cap[i]) {
1663 Capability *tmpcap = capabilities[i];
1664 task->cap = tmpcap;
1665 waitForCapability(&tmpcap, task);
1666 n_idle_caps++;
1667 }
1668 }
1669 }
1670 }
1671 else
1672 {
1673 for (i=0; i < n_capabilities; i++) {
1674 if (capabilities[i]->disabled) {
1675 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1676 if (idle_cap[i]) {
1677 n_idle_caps++;
1678 }
1679 } else if (i != cap->no &&
1680 capabilities[i]->idle >=
1681 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1682 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1683 if (idle_cap[i]) {
1684 n_idle_caps++;
1685 } else {
1686 n_failed_trygrab_idles++;
1687 }
1688 }
1689 }
1690 }
1691 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1692
1693 // We set the gc_thread[i]->idle flag if that
1694 // capability/thread is not participating in this collection.
1695 // We also keep a local record of which capabilities are idle
1696 // in idle_cap[], because scheduleDoGC() is re-entrant:
1697 // another thread might start a GC as soon as we've finished
1698 // this one, and thus the gc_thread[]->idle flags are invalid
1699 // as soon as we release any threads after GC. Getting this
1700 // wrong leads to a rare and hard to debug deadlock!
1701
1702 for (i=0; i < n_capabilities; i++) {
1703 gc_threads[i]->idle = idle_cap[i];
1704 capabilities[i]->idle++;
1705 }
1706
1707 // For all capabilities participating in this GC, wait until
1708 // they have stopped mutating and are standing by for GC.
1709 waitForGcThreads(cap);
1710
1711 #if defined(THREADED_RTS)
1712 // Stable point where we can do a global check on our spark counters
1713 ASSERT(checkSparkCountInvariant());
1714 #endif
1715 }
1716
1717 #endif
1718
1719 IF_DEBUG(scheduler, printAllThreads());
1720
1721 delete_threads_and_gc:
1722 /*
1723 * We now have all the capabilities; if we're in an interrupting
1724 * state, then we should take the opportunity to delete all the
1725 * threads in the system.
1726 * Checking for major_gc ensures that the last GC is major.
1727 */
1728 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1729 deleteAllThreads(cap);
1730 #if defined(THREADED_RTS)
1731 // Discard all the sparks from every Capability. Why?
1732 // They'll probably be GC'd anyway since we've killed all the
1733 // threads. It just avoids the GC having to do any work to
1734 // figure out that any remaining sparks are garbage.
1735 for (i = 0; i < n_capabilities; i++) {
1736 capabilities[i]->spark_stats.gcd +=
1737 sparkPoolSize(capabilities[i]->sparks);
1738 // No race here since all Caps are stopped.
1739 discardSparksCap(capabilities[i]);
1740 }
1741 #endif
1742 sched_state = SCHED_SHUTTING_DOWN;
1743 }
1744
1745 /*
1746 * When there are disabled capabilities, we want to migrate any
1747 * threads away from them. Normally this happens in the
1748 * scheduler's loop, but only for unbound threads - it's really
1749 * hard for a bound thread to migrate itself. So we have another
1750 * go here.
1751 */
1752 #if defined(THREADED_RTS)
1753 for (i = enabled_capabilities; i < n_capabilities; i++) {
1754 Capability *tmp_cap, *dest_cap;
1755 tmp_cap = capabilities[i];
1756 ASSERT(tmp_cap->disabled);
1757 if (i != cap->no) {
1758 dest_cap = capabilities[i % enabled_capabilities];
1759 while (!emptyRunQueue(tmp_cap)) {
1760 tso = popRunQueue(tmp_cap);
1761 migrateThread(tmp_cap, tso, dest_cap);
1762 if (tso->bound) {
1763 traceTaskMigrate(tso->bound->task,
1764 tso->bound->task->cap,
1765 dest_cap);
1766 tso->bound->task->cap = dest_cap;
1767 }
1768 }
1769 }
1770 }
1771 #endif
1772
1773 #if defined(THREADED_RTS)
1774 // reset pending_sync *before* GC, so that when the GC threads
1775 // emerge they don't immediately re-enter the GC.
1776 pending_sync = 0;
1777 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1778 #else
1779 GarbageCollect(collect_gen, heap_census, 0, cap);
1780 #endif
1781
1782 traceSparkCounters(cap);
1783
1784 switch (recent_activity) {
1785 case ACTIVITY_INACTIVE:
1786 if (force_major) {
1787 // We are doing a GC because the system has been idle for a
1788 // timeslice and we need to check for deadlock. Record the
1789 // fact that we've done a GC and turn off the timer signal;
1790 // it will get re-enabled if we run any threads after the GC.
1791 recent_activity = ACTIVITY_DONE_GC;
1792 #ifndef PROFILING
1793 stopTimer();
1794 #endif
1795 break;
1796 }
1797 // fall through...
1798
1799 case ACTIVITY_MAYBE_NO:
1800 // the GC might have taken long enough for the timer to set
1801 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1802 // but we aren't necessarily deadlocked:
1803 recent_activity = ACTIVITY_YES;
1804 break;
1805
1806 case ACTIVITY_DONE_GC:
1807 // If we are actually active, the scheduler will reset the
1808 // recent_activity flag and re-enable the timer.
1809 break;
1810 }
1811
1812 #if defined(THREADED_RTS)
1813 // Stable point where we can do a global check on our spark counters
1814 ASSERT(checkSparkCountInvariant());
1815 #endif
1816
1817 // The heap census itself is done during GarbageCollect().
1818 if (heap_census) {
1819 performHeapProfile = rtsFalse;
1820 }
1821
1822 #if defined(THREADED_RTS)
1823
1824 // If n_capabilities has changed during GC, we're in trouble.
1825 ASSERT(n_capabilities == old_n_capabilities);
1826
1827 if (gc_type == SYNC_GC_PAR)
1828 {
1829 releaseGCThreads(cap);
1830 for (i = 0; i < n_capabilities; i++) {
1831 if (i != cap->no) {
1832 if (idle_cap[i]) {
1833 ASSERT(capabilities[i]->running_task == task);
1834 task->cap = capabilities[i];
1835 releaseCapability(capabilities[i]);
1836 } else {
1837 ASSERT(capabilities[i]->running_task != task);
1838 }
1839 }
1840 }
1841 task->cap = cap;
1842 }
1843
1844 stgFree(idle_cap);
1845 #endif
1846
1847 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1848 // GC set the heap_overflow flag, so we should proceed with
1849 // an orderly shutdown now. Ultimately we want the main
1850 // thread to return to its caller with HeapExhausted, at which
1851 // point the caller should call hs_exit(). The first step is
1852 // to delete all the threads.
1853 //
1854 // Another way to do this would be to raise an exception in
1855 // the main thread, which we really should do because it gives
1856 // the program a chance to clean up. But how do we find the
1857 // main thread? It should presumably be the same one that
1858 // gets ^C exceptions, but that's all done on the Haskell side
1859 // (GHC.TopHandler).
1860 sched_state = SCHED_INTERRUPTING;
1861 goto delete_threads_and_gc;
1862 }
1863
1864 #ifdef SPARKBALANCE
1865 /* JB
1866 Once we are all together... this would be the place to balance all
1867 spark pools. No concurrent stealing or adding of new sparks can
1868 occur. Should be defined in Sparks.c. */
1869 balanceSparkPoolsCaps(n_capabilities, capabilities);
1870 #endif
1871
1872 #if defined(THREADED_RTS)
1873 if (gc_type == SYNC_GC_SEQ) {
1874 // release our stash of capabilities.
1875 releaseAllCapabilities(n_capabilities, cap, task);
1876 }
1877 #endif
1878
1879 return;
1880 }
1881
1882 /* ---------------------------------------------------------------------------
1883 * Singleton fork(). Do not copy any running threads.
1884 * ------------------------------------------------------------------------- */
1885
1886 pid_t
1887 forkProcess(HsStablePtr *entry
1888 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1889 STG_UNUSED
1890 #endif
1891 )
1892 {
1893 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1894 pid_t pid;
1895 StgTSO* t,*next;
1896 Capability *cap;
1897 nat g;
1898 Task *task = NULL;
1899 nat i;
1900
1901 debugTrace(DEBUG_sched, "forking!");
1902
1903 task = newBoundTask();
1904
1905 cap = NULL;
1906 waitForCapability(&cap, task);
1907
1908 #ifdef THREADED_RTS
1909 stopAllCapabilities(&cap, task);
1910 #endif
1911
1912 // no funny business: hold locks while we fork, otherwise if some
1913 // other thread is holding a lock when the fork happens, the data
1914 // structure protected by the lock will forever be in an
1915 // inconsistent state in the child. See also #1391.
1916 ACQUIRE_LOCK(&sched_mutex);
1917 ACQUIRE_LOCK(&sm_mutex);
1918 ACQUIRE_LOCK(&stable_mutex);
1919 ACQUIRE_LOCK(&task->lock);
1920
1921 for (i=0; i < n_capabilities; i++) {
1922 ACQUIRE_LOCK(&capabilities[i]->lock);
1923 }
1924
1925 #ifdef THREADED_RTS
1926 ACQUIRE_LOCK(&all_tasks_mutex);
1927 #endif
1928
1929 stopTimer(); // See #4074
1930
1931 #if defined(TRACING)
1932 flushEventLog(); // so that child won't inherit dirty file buffers
1933 #endif
1934
1935 pid = fork();
1936
1937 if (pid) { // parent
1938
1939 startTimer(); // #4074
1940
1941 RELEASE_LOCK(&sched_mutex);
1942 RELEASE_LOCK(&sm_mutex);
1943 RELEASE_LOCK(&stable_mutex);
1944 RELEASE_LOCK(&task->lock);
1945
1946 for (i=0; i < n_capabilities; i++) {
1947 releaseCapability_(capabilities[i],rtsFalse);
1948 RELEASE_LOCK(&capabilities[i]->lock);
1949 }
1950
1951 #ifdef THREADED_RTS
1952 RELEASE_LOCK(&all_tasks_mutex);
1953 #endif
1954
1955 boundTaskExiting(task);
1956
1957 // just return the pid
1958 return pid;
1959
1960 } else { // child
1961
1962 #if defined(THREADED_RTS)
1963 initMutex(&sched_mutex);
1964 initMutex(&sm_mutex);
1965 initMutex(&stable_mutex);
1966 initMutex(&task->lock);
1967
1968 for (i=0; i < n_capabilities; i++) {
1969 initMutex(&capabilities[i]->lock);
1970 }
1971
1972 initMutex(&all_tasks_mutex);
1973 #endif
1974
1975 #ifdef TRACING
1976 resetTracing();
1977 #endif
1978
1979 // Now, all OS threads except the thread that forked are
1980 // stopped. We need to stop all Haskell threads, including
1981 // those involved in foreign calls. Also we need to delete
1982 // all Tasks, because they correspond to OS threads that are
1983 // now gone.
1984
1985 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1986 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1987 next = t->global_link;
1988 // don't allow threads to catch the ThreadKilled
1989 // exception, but we do want to raiseAsync() because these
1990 // threads may be evaluating thunks that we need later.
1991 deleteThread_(t->cap,t);
1992
1993 // stop the GC from updating the InCall to point to
1994 // the TSO. This is only necessary because the
1995 // OSThread bound to the TSO has been killed, and
1996 // won't get a chance to exit in the usual way (see
1997 // also scheduleHandleThreadFinished).
1998 t->bound = NULL;
1999 }
2000 }
2001
2002 discardTasksExcept(task);
2003
2004 for (i=0; i < n_capabilities; i++) {
2005 cap = capabilities[i];
2006
2007 // Empty the run queue. It seems tempting to let all the
2008 // killed threads stay on the run queue as zombies to be
2009 // cleaned up later, but some of them may correspond to
2010 // bound threads for which the corresponding Task does not
2011 // exist.
2012 truncateRunQueue(cap);
2013
2014 // Any suspended C-calling Tasks are no more, their OS threads
2015 // don't exist now:
2016 cap->suspended_ccalls = NULL;
2017
2018 #if defined(THREADED_RTS)
2019 // Wipe our spare workers list, they no longer exist. New
2020 // workers will be created if necessary.
2021 cap->spare_workers = NULL;
2022 cap->n_spare_workers = 0;
2023 cap->returning_tasks_hd = NULL;
2024 cap->returning_tasks_tl = NULL;
2025 #endif
2026
2027 // Release all caps except 0, we'll use that for starting
2028 // the IO manager and running the client action below.
2029 if (cap->no != 0) {
2030 task->cap = cap;
2031 releaseCapability(cap);
2032 }
2033 }
2034 cap = capabilities[0];
2035 task->cap = cap;
2036
2037 // Empty the threads lists. Otherwise, the garbage
2038 // collector may attempt to resurrect some of these threads.
2039 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2040 generations[g].threads = END_TSO_QUEUE;
2041 }
2042
2043 // On Unix, all timers are reset in the child, so we need to start
2044 // the timer again.
2045 initTimer();
2046 startTimer();
2047
2048 // TODO: need to trace various other things in the child
2049 // like startup event, capabilities, process info etc
2050 traceTaskCreate(task, cap);
2051
2052 #if defined(THREADED_RTS)
2053 ioManagerStartCap(&cap);
2054 #endif
2055
2056 rts_evalStableIO(&cap, entry, NULL); // run the action
2057 rts_checkSchedStatus("forkProcess",cap);
2058
2059 rts_unlock(cap);
2060 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2061 }
2062 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2063 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2064 #endif
2065 }
2066
2067 /* ---------------------------------------------------------------------------
2068 * Changing the number of Capabilities
2069 *
2070 * Changing the number of Capabilities is very tricky! We can only do
2071 * it with the system fully stopped, so we do a full sync with
2072 * requestSync(SYNC_OTHER) and grab all the capabilities.
2073 *
2074 * Then we resize the appropriate data structures, and update all
2075 * references to the old data structures which have now moved.
2076 * Finally we release the Capabilities we are holding, and start
2077 * worker Tasks on the new Capabilities we created.
2078 *
2079 * ------------------------------------------------------------------------- */
2080
2081 void
2082 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
2083 {
2084 #if !defined(THREADED_RTS)
2085 if (new_n_capabilities != 1) {
2086 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2087 }
2088 return;
2089 #elif defined(NOSMP)
2090 if (new_n_capabilities != 1) {
2091 errorBelch("setNumCapabilities: not supported on this platform");
2092 }
2093 return;
2094 #else
2095 Task *task;
2096 Capability *cap;
2097 nat n;
2098 Capability *old_capabilities = NULL;
2099 nat old_n_capabilities = n_capabilities;
2100
2101 if (new_n_capabilities == enabled_capabilities) return;
2102
2103 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2104 enabled_capabilities, new_n_capabilities);
2105
2106 cap = rts_lock();
2107 task = cap->running_task;
2108
2109 stopAllCapabilities(&cap, task);
2110
2111 if (new_n_capabilities < enabled_capabilities)
2112 {
2113 // Reducing the number of capabilities: we do not actually
2114 // remove the extra capabilities, we just mark them as
2115 // "disabled". This has the following effects:
2116 //
2117 // - threads on a disabled capability are migrated away by the
2118 // scheduler loop
2119 //
2120 // - disabled capabilities do not participate in GC
2121 // (see scheduleDoGC())
2122 //
2123 // - No spark threads are created on this capability
2124 // (see scheduleActivateSpark())
2125 //
2126 // - We do not attempt to migrate threads *to* a disabled
2127 // capability (see schedulePushWork()).
2128 //
2129 // but in other respects, a disabled capability remains
2130 // alive. Threads may be woken up on a disabled capability,
2131 // but they will be immediately migrated away.
2132 //
2133 // This approach is much easier than trying to actually remove
2134 // the capability; we don't have to worry about GC data
2135 // structures, the nursery, etc.
2136 //
2137 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2138 capabilities[n]->disabled = rtsTrue;
2139 traceCapDisable(capabilities[n]);
2140 }
2141 enabled_capabilities = new_n_capabilities;
2142 }
2143 else
2144 {
2145 // Increasing the number of enabled capabilities.
2146 //
2147 // enable any disabled capabilities, up to the required number
2148 for (n = enabled_capabilities;
2149 n < new_n_capabilities && n < n_capabilities; n++) {
2150 capabilities[n]->disabled = rtsFalse;
2151 traceCapEnable(capabilities[n]);
2152 }
2153 enabled_capabilities = n;
2154
2155 if (new_n_capabilities > n_capabilities) {
2156 #if defined(TRACING)
2157 // Allocate eventlog buffers for the new capabilities. Note this
2158 // must be done before calling moreCapabilities(), because that
2159 // will emit events about creating the new capabilities and adding
2160 // them to existing capsets.
2161 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2162 #endif
2163
2164 // Resize the capabilities array
2165 // NB. after this, capabilities points somewhere new. Any pointers
2166 // of type (Capability *) are now invalid.
2167 moreCapabilities(n_capabilities, new_n_capabilities);
2168
2169 // Resize and update storage manager data structures
2170 storageAddCapabilities(n_capabilities, new_n_capabilities);
2171 }
2172 }
2173
2174 // update n_capabilities before things start running
2175 if (new_n_capabilities > n_capabilities) {
2176 n_capabilities = enabled_capabilities = new_n_capabilities;
2177 }
2178
2179 // Start worker tasks on the new Capabilities
2180 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2181
2182 // We're done: release the original Capabilities
2183 releaseAllCapabilities(old_n_capabilities, cap,task);
2184
2185 // We can't free the old array until now, because we access it
2186 // while updating pointers in updateCapabilityRefs().
2187 if (old_capabilities) {
2188 stgFree(old_capabilities);
2189 }
2190
2191 // Notify IO manager that the number of capabilities has changed.
2192 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2193
2194 rts_unlock(cap);
2195
2196 #endif // THREADED_RTS
2197 }
2198
2199
2200
2201 /* ---------------------------------------------------------------------------
2202 * Delete all the threads in the system
2203 * ------------------------------------------------------------------------- */
2204
2205 static void
2206 deleteAllThreads ( Capability *cap )
2207 {
2208 // NOTE: only safe to call if we own all capabilities.
2209
2210 StgTSO* t, *next;
2211 nat g;
2212
2213 debugTrace(DEBUG_sched,"deleting all threads");
2214 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2215 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2216 next = t->global_link;
2217 deleteThread(cap,t);
2218 }
2219 }
2220
2221 // The run queue now contains a bunch of ThreadKilled threads. We
2222 // must not throw these away: the main thread(s) will be in there
2223 // somewhere, and the main scheduler loop has to deal with it.
2224 // Also, the run queue is the only thing keeping these threads from
2225 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2226
2227 #if !defined(THREADED_RTS)
2228 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2229 ASSERT(sleeping_queue == END_TSO_QUEUE);
2230 #endif
2231 }
2232
2233 /* -----------------------------------------------------------------------------
2234 Managing the suspended_ccalls list.
2235 Locks required: sched_mutex
2236 -------------------------------------------------------------------------- */
2237
2238 STATIC_INLINE void
2239 suspendTask (Capability *cap, Task *task)
2240 {
2241 InCall *incall;
2242
2243 incall = task->incall;
2244 ASSERT(incall->next == NULL && incall->prev == NULL);
2245 incall->next = cap->suspended_ccalls;
2246 incall->prev = NULL;
2247 if (cap->suspended_ccalls) {
2248 cap->suspended_ccalls->prev = incall;
2249 }
2250 cap->suspended_ccalls = incall;
2251 }
2252
2253 STATIC_INLINE void
2254 recoverSuspendedTask (Capability *cap, Task *task)
2255 {
2256 InCall *incall;
2257
2258 incall = task->incall;
2259 if (incall->prev) {
2260 incall->prev->next = incall->next;
2261 } else {
2262 ASSERT(cap->suspended_ccalls == incall);
2263 cap->suspended_ccalls = incall->next;
2264 }
2265 if (incall->next) {
2266 incall->next->prev = incall->prev;
2267 }
2268 incall->next = incall->prev = NULL;
2269 }
2270
2271 /* ---------------------------------------------------------------------------
2272 * Suspending & resuming Haskell threads.
2273 *
2274 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2275 * its capability before calling the C function. This allows another
2276 * task to pick up the capability and carry on running Haskell
2277 * threads. It also means that if the C call blocks, it won't lock
2278 * the whole system.
2279 *
2280 * The Haskell thread making the C call is put to sleep for the
2281 * duration of the call, on the suspended_ccalling_threads queue. We
2282 * give out a token to the task, which it can use to resume the thread
2283 * on return from the C function.
2284 *
2285 * If this is an interruptible C call, this means that the FFI call may be
2286 * unceremoniously terminated and should be scheduled on an
2287 * unbound worker thread.
2288 * ------------------------------------------------------------------------- */
2289
2290 void *
2291 suspendThread (StgRegTable *reg, rtsBool interruptible)
2292 {
2293 Capability *cap;
2294 int saved_errno;
2295 StgTSO *tso;
2296 Task *task;
2297 #if mingw32_HOST_OS
2298 StgWord32 saved_winerror;
2299 #endif
2300
2301 saved_errno = errno;
2302 #if mingw32_HOST_OS
2303 saved_winerror = GetLastError();
2304 #endif
2305
2306 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2307 */
2308 cap = regTableToCapability(reg);
2309
2310 task = cap->running_task;
2311 tso = cap->r.rCurrentTSO;
2312
2313 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2314
2315 // XXX this might not be necessary --SDM
2316 tso->what_next = ThreadRunGHC;
2317
2318 threadPaused(cap,tso);
2319
2320 if (interruptible) {
2321 tso->why_blocked = BlockedOnCCall_Interruptible;
2322 } else {
2323 tso->why_blocked = BlockedOnCCall;
2324 }
2325
2326 // Hand back capability
2327 task->incall->suspended_tso = tso;
2328 task->incall->suspended_cap = cap;
2329
2330 // Otherwise allocate() will write to invalid memory.
2331 cap->r.rCurrentTSO = NULL;
2332
2333 ACQUIRE_LOCK(&cap->lock);
2334
2335 suspendTask(cap,task);
2336 cap->in_haskell = rtsFalse;
2337 releaseCapability_(cap,rtsFalse);
2338
2339 RELEASE_LOCK(&cap->lock);
2340
2341 errno = saved_errno;
2342 #if mingw32_HOST_OS
2343 SetLastError(saved_winerror);
2344 #endif
2345 return task;
2346 }
2347
2348 StgRegTable *
2349 resumeThread (void *task_)
2350 {
2351 StgTSO *tso;
2352 InCall *incall;
2353 Capability *cap;
2354 Task *task = task_;
2355 int saved_errno;
2356 #if mingw32_HOST_OS
2357 StgWord32 saved_winerror;
2358 #endif
2359
2360 saved_errno = errno;
2361 #if mingw32_HOST_OS
2362 saved_winerror = GetLastError();
2363 #endif
2364
2365 incall = task->incall;
2366 cap = incall->suspended_cap;
2367 task->cap = cap;
2368
2369 // Wait for permission to re-enter the RTS with the result.
2370 waitForCapability(&cap,task);
2371 // we might be on a different capability now... but if so, our
2372 // entry on the suspended_ccalls list will also have been
2373 // migrated.
2374
2375 // Remove the thread from the suspended list
2376 recoverSuspendedTask(cap,task);
2377
2378 tso = incall->suspended_tso;
2379 incall->suspended_tso = NULL;
2380 incall->suspended_cap = NULL;
2381 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2382
2383 traceEventRunThread(cap, tso);
2384
2385 /* Reset blocking status */
2386 tso->why_blocked = NotBlocked;
2387
2388 if ((tso->flags & TSO_BLOCKEX) == 0) {
2389 // avoid locking the TSO if we don't have to
2390 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2391 maybePerformBlockedException(cap,tso);
2392 }
2393 }
2394
2395 cap->r.rCurrentTSO = tso;
2396 cap->in_haskell = rtsTrue;
2397 errno = saved_errno;
2398 #if mingw32_HOST_OS
2399 SetLastError(saved_winerror);
2400 #endif
2401
2402 /* We might have GC'd, mark the TSO dirty again */
2403 dirty_TSO(cap,tso);
2404 dirty_STACK(cap,tso->stackobj);
2405
2406 IF_DEBUG(sanity, checkTSO(tso));
2407
2408 return &cap->r;
2409 }
2410
2411 /* ---------------------------------------------------------------------------
2412 * scheduleThread()
2413 *
2414 * scheduleThread puts a thread on the end of the runnable queue.
2415 * This will usually be done immediately after a thread is created.
2416 * The caller of scheduleThread must create the thread using e.g.
2417 * createThread and push an appropriate closure
2418 * on this thread's stack before the scheduler is invoked.
2419 * ------------------------------------------------------------------------ */
2420
2421 void
2422 scheduleThread(Capability *cap, StgTSO *tso)
2423 {
2424 // The thread goes at the *end* of the run-queue, to avoid possible
2425 // starvation of any threads already on the queue.
2426 appendToRunQueue(cap,tso);
2427 }
2428
2429 void
2430 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2431 {
2432 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2433 // move this thread from now on.
2434 #if defined(THREADED_RTS)
2435 cpu %= enabled_capabilities;
2436 if (cpu == cap->no) {
2437 appendToRunQueue(cap,tso);
2438 } else {
2439 migrateThread(cap, tso, capabilities[cpu]);
2440 }
2441 #else
2442 appendToRunQueue(cap,tso);
2443 #endif
2444 }
2445
2446 void
2447 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2448 {
2449 Task *task;
2450 DEBUG_ONLY( StgThreadID id );
2451 Capability *cap;
2452
2453 cap = *pcap;
2454
2455 // We already created/initialised the Task
2456 task = cap->running_task;
2457
2458 // This TSO is now a bound thread; make the Task and TSO
2459 // point to each other.
2460 tso->bound = task->incall;
2461 tso->cap = cap;
2462
2463 task->incall->tso = tso;
2464 task->incall->ret = ret;
2465 task->incall->rstat = NoStatus;
2466
2467 appendToRunQueue(cap,tso);
2468
2469 DEBUG_ONLY( id = tso->id );
2470 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2471
2472 cap = schedule(cap,task);
2473
2474 ASSERT(task->incall->rstat != NoStatus);
2475 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2476
2477 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2478 *pcap = cap;
2479 }
2480
2481 /* ----------------------------------------------------------------------------
2482 * Starting Tasks
2483 * ------------------------------------------------------------------------- */
2484
2485 #if defined(THREADED_RTS)
2486 void scheduleWorker (Capability *cap, Task *task)
2487 {
2488 // schedule() runs without a lock.
2489 cap = schedule(cap,task);
2490
2491 // On exit from schedule(), we have a Capability, but possibly not
2492 // the same one we started with.
2493
2494 // During shutdown, the requirement is that after all the
2495 // Capabilities are shut down, all workers that are shutting down
2496 // have finished workerTaskStop(). This is why we hold on to
2497 // cap->lock until we've finished workerTaskStop() below.
2498 //
2499 // There may be workers still involved in foreign calls; those
2500 // will just block in waitForCapability() because the
2501 // Capability has been shut down.
2502 //
2503 ACQUIRE_LOCK(&cap->lock);
2504 releaseCapability_(cap,rtsFalse);
2505 workerTaskStop(task);
2506 RELEASE_LOCK(&cap->lock);
2507 }
2508 #endif
2509
2510 /* ---------------------------------------------------------------------------
2511 * Start new worker tasks on Capabilities from--to
2512 * -------------------------------------------------------------------------- */
2513
2514 static void
2515 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2516 {
2517 #if defined(THREADED_RTS)
2518 nat i;
2519 Capability *cap;
2520
2521 for (i = from; i < to; i++) {
2522 cap = capabilities[i];
2523 ACQUIRE_LOCK(&cap->lock);
2524 startWorkerTask(cap);
2525 RELEASE_LOCK(&cap->lock);
2526 }
2527 #endif
2528 }
2529
2530 /* ---------------------------------------------------------------------------
2531 * initScheduler()
2532 *
2533 * Initialise the scheduler. This resets all the queues - if the
2534 * queues contained any threads, they'll be garbage collected at the
2535 * next pass.
2536 *
2537 * ------------------------------------------------------------------------ */
2538
2539 void
2540 initScheduler(void)
2541 {
2542 #if !defined(THREADED_RTS)
2543 blocked_queue_hd = END_TSO_QUEUE;
2544 blocked_queue_tl = END_TSO_QUEUE;
2545 sleeping_queue = END_TSO_QUEUE;
2546 #endif
2547
2548 sched_state = SCHED_RUNNING;
2549 recent_activity = ACTIVITY_YES;
2550
2551 #if defined(THREADED_RTS)
2552 /* Initialise the mutex and condition variables used by
2553 * the scheduler. */
2554 initMutex(&sched_mutex);
2555 #endif
2556
2557 ACQUIRE_LOCK(&sched_mutex);
2558
2559 /* A capability holds the state a native thread needs in
2560 * order to execute STG code. At least one capability is
2561 * floating around (only THREADED_RTS builds have more than one).
2562 */
2563 initCapabilities();
2564
2565 initTaskManager();
2566
2567 /*
2568 * Eagerly start one worker to run each Capability, except for
2569 * Capability 0. The idea is that we're probably going to start a
2570 * bound thread on Capability 0 pretty soon, so we don't want a
2571 * worker task hogging it.
2572 */
2573 startWorkerTasks(1, n_capabilities);
2574
2575 RELEASE_LOCK(&sched_mutex);
2576
2577 }
2578
2579 void
2580 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2581 /* see Capability.c, shutdownCapability() */
2582 {
2583 Task *task = NULL;
2584
2585 task = newBoundTask();
2586
2587 // If we haven't killed all the threads yet, do it now.
2588 if (sched_state < SCHED_SHUTTING_DOWN) {
2589 sched_state = SCHED_INTERRUPTING;
2590 Capability *cap = task->cap;
2591 waitForCapability(&cap,task);
2592 scheduleDoGC(&cap,task,rtsTrue);
2593 ASSERT(task->incall->tso == NULL);
2594 releaseCapability(cap);
2595 }
2596 sched_state = SCHED_SHUTTING_DOWN;
2597
2598 shutdownCapabilities(task, wait_foreign);
2599
2600 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2601 // n_failed_trygrab_idles, n_idle_caps);
2602
2603 boundTaskExiting(task);
2604 }
2605
2606 void
2607 freeScheduler( void )
2608 {
2609 nat still_running;
2610
2611 ACQUIRE_LOCK(&sched_mutex);
2612 still_running = freeTaskManager();
2613 // We can only free the Capabilities if there are no Tasks still
2614 // running. We might have a Task about to return from a foreign
2615 // call into waitForCapability(), for example (actually,
2616 // this should be the *only* thing that a still-running Task can
2617 // do at this point, and it will block waiting for the
2618 // Capability).
2619 if (still_running == 0) {
2620 freeCapabilities();
2621 }
2622 RELEASE_LOCK(&sched_mutex);
2623 #if defined(THREADED_RTS)
2624 closeMutex(&sched_mutex);
2625 #endif
2626 }
2627
2628 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2629 void *user USED_IF_NOT_THREADS)
2630 {
2631 #if !defined(THREADED_RTS)
2632 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2633 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2634 evac(user, (StgClosure **)(void *)&sleeping_queue);
2635 #endif
2636 }
2637
2638 /* -----------------------------------------------------------------------------
2639 performGC
2640
2641 This is the interface to the garbage collector from Haskell land.
2642 We provide this so that external C code can allocate and garbage
2643 collect when called from Haskell via _ccall_GC.
2644 -------------------------------------------------------------------------- */
2645
2646 static void
2647 performGC_(rtsBool force_major)
2648 {
2649 Task *task;
2650 Capability *cap = NULL;
2651
2652 // We must grab a new Task here, because the existing Task may be
2653 // associated with a particular Capability, and chained onto the
2654 // suspended_ccalls queue.
2655 task = newBoundTask();
2656
2657 // TODO: do we need to traceTask*() here?
2658
2659 waitForCapability(&cap,task);
2660 scheduleDoGC(&cap,task,force_major);
2661 releaseCapability(cap);
2662 boundTaskExiting(task);
2663 }
2664
2665 void
2666 performGC(void)
2667 {
2668 performGC_(rtsFalse);
2669 }
2670
2671 void
2672 performMajorGC(void)
2673 {
2674 performGC_(rtsTrue);
2675 }
2676
2677 /* ---------------------------------------------------------------------------
2678 Interrupt execution.
2679 Might be called inside a signal handler so it mustn't do anything fancy.
2680 ------------------------------------------------------------------------ */
2681
2682 void
2683 interruptStgRts(void)
2684 {
2685 sched_state = SCHED_INTERRUPTING;
2686 interruptAllCapabilities();
2687 #if defined(THREADED_RTS)
2688 wakeUpRts();
2689 #endif
2690 }
2691
2692 /* -----------------------------------------------------------------------------
2693 Wake up the RTS
2694
2695 This function causes at least one OS thread to wake up and run the
2696 scheduler loop. It is invoked when the RTS might be deadlocked, or
2697 an external event has arrived that may need servicing (eg. a
2698 keyboard interrupt).
2699
2700 In the single-threaded RTS we don't do anything here; we only have
2701 one thread anyway, and the event that caused us to want to wake up
2702 will have interrupted any blocking system call in progress anyway.
2703 -------------------------------------------------------------------------- */
2704
2705 #if defined(THREADED_RTS)
2706 void wakeUpRts(void)
2707 {
2708 // This forces the IO Manager thread to wakeup, which will
2709 // in turn ensure that some OS thread wakes up and runs the
2710 // scheduler loop, which will cause a GC and deadlock check.
2711 ioManagerWakeup();
2712 }
2713 #endif
2714
2715 /* -----------------------------------------------------------------------------
2716 Deleting threads
2717
2718 This is used for interruption (^C) and forking, and corresponds to
2719 raising an exception but without letting the thread catch the
2720 exception.
2721 -------------------------------------------------------------------------- */
2722
2723 static void
2724 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2725 {
2726 // NOTE: must only be called on a TSO that we have exclusive
2727 // access to, because we will call throwToSingleThreaded() below.
2728 // The TSO must be on the run queue of the Capability we own, or
2729 // we must own all Capabilities.
2730
2731 if (tso->why_blocked != BlockedOnCCall &&
2732 tso->why_blocked != BlockedOnCCall_Interruptible) {
2733 throwToSingleThreaded(tso->cap,tso,NULL);
2734 }
2735 }
2736
2737 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2738 static void
2739 deleteThread_(Capability *cap, StgTSO *tso)
2740 { // for forkProcess only:
2741 // like deleteThread(), but we delete threads in foreign calls, too.
2742
2743 if (tso->why_blocked == BlockedOnCCall ||
2744 tso->why_blocked == BlockedOnCCall_Interruptible) {
2745 tso->what_next = ThreadKilled;
2746 appendToRunQueue(tso->cap, tso);
2747 } else {
2748 deleteThread(cap,tso);
2749 }
2750 }
2751 #endif
2752
2753 /* -----------------------------------------------------------------------------
2754 raiseExceptionHelper
2755
2756 This function is called by the raise# primitve, just so that we can
2757 move some of the tricky bits of raising an exception from C-- into
2758 C. Who knows, it might be a useful re-useable thing here too.
2759 -------------------------------------------------------------------------- */
2760
2761 StgWord
2762 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2763 {
2764 Capability *cap = regTableToCapability(reg);
2765 StgThunk *raise_closure = NULL;
2766 StgPtr p, next;
2767 StgRetInfoTable *info;
2768 //
2769 // This closure represents the expression 'raise# E' where E
2770 // is the exception raise. It is used to overwrite all the
2771 // thunks which are currently under evaluataion.
2772 //
2773
2774 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2775 // LDV profiling: stg_raise_info has THUNK as its closure
2776 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2777 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2778 // 1 does not cause any problem unless profiling is performed.
2779 // However, when LDV profiling goes on, we need to linearly scan
2780 // small object pool, where raise_closure is stored, so we should
2781 // use MIN_UPD_SIZE.
2782 //
2783 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2784 // sizeofW(StgClosure)+1);
2785 //
2786
2787 //
2788 // Walk up the stack, looking for the catch frame. On the way,
2789 // we update any closures pointed to from update frames with the
2790 // raise closure that we just built.
2791 //
2792 p = tso->stackobj->sp;
2793 while(1) {
2794 info = get_ret_itbl((StgClosure *)p);
2795 next = p + stack_frame_sizeW((StgClosure *)p);
2796 switch (info->i.type) {
2797
2798 case UPDATE_FRAME:
2799 // Only create raise_closure if we need to.
2800 if (raise_closure == NULL) {
2801 raise_closure =
2802 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2803 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2804 raise_closure->payload[0] = exception;
2805 }
2806 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2807 (StgClosure *)raise_closure);
2808 p = next;
2809 continue;
2810
2811 case ATOMICALLY_FRAME:
2812 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2813 tso->stackobj->sp = p;
2814 return ATOMICALLY_FRAME;
2815
2816 case CATCH_FRAME:
2817 tso->stackobj->sp = p;
2818 return CATCH_FRAME;
2819
2820 case CATCH_STM_FRAME:
2821 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2822 tso->stackobj->sp = p;
2823 return CATCH_STM_FRAME;
2824
2825 case UNDERFLOW_FRAME:
2826 tso->stackobj->sp = p;
2827 threadStackUnderflow(cap,tso);
2828 p = tso->stackobj->sp;
2829 continue;
2830
2831 case STOP_FRAME:
2832 tso->stackobj->sp = p;
2833 return STOP_FRAME;
2834
2835 case CATCH_RETRY_FRAME: {
2836 StgTRecHeader *trec = tso -> trec;
2837 StgTRecHeader *outer = trec -> enclosing_trec;
2838 debugTrace(DEBUG_stm,
2839 "found CATCH_RETRY_FRAME at %p during raise", p);
2840 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2841 stmAbortTransaction(cap, trec);
2842 stmFreeAbortedTRec(cap, trec);
2843 tso -> trec = outer;
2844 p = next;
2845 continue;
2846 }
2847
2848 default:
2849 p = next;
2850 continue;
2851 }
2852 }
2853 }
2854
2855
2856 /* -----------------------------------------------------------------------------
2857 findRetryFrameHelper
2858
2859 This function is called by the retry# primitive. It traverses the stack
2860 leaving tso->sp referring to the frame which should handle the retry.
2861
2862 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2863 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2864
2865 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2866 create) because retries are not considered to be exceptions, despite the
2867 similar implementation.
2868
2869 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2870 not be created within memory transactions.
2871 -------------------------------------------------------------------------- */
2872
2873 StgWord
2874 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2875 {
2876 StgPtr p, next;
2877 StgRetInfoTable *info;
2878
2879 p = tso->stackobj->sp;
2880 while (1) {
2881 info = get_ret_itbl((StgClosure *)p);
2882 next = p + stack_frame_sizeW((StgClosure *)p);
2883 switch (info->i.type) {
2884
2885 case ATOMICALLY_FRAME:
2886 debugTrace(DEBUG_stm,
2887 "found ATOMICALLY_FRAME at %p during retry", p);
2888 tso->stackobj->sp = p;
2889 return ATOMICALLY_FRAME;
2890
2891 case CATCH_RETRY_FRAME:
2892 debugTrace(DEBUG_stm,
2893 "found CATCH_RETRY_FRAME at %p during retry", p);
2894 tso->stackobj->sp = p;
2895 return CATCH_RETRY_FRAME;
2896
2897 case CATCH_STM_FRAME: {
2898 StgTRecHeader *trec = tso -> trec;
2899 StgTRecHeader *outer = trec -> enclosing_trec;
2900 debugTrace(DEBUG_stm,
2901 "found CATCH_STM_FRAME at %p during retry", p);
2902 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2903 stmAbortTransaction(cap, trec);
2904 stmFreeAbortedTRec(cap, trec);
2905 tso -> trec = outer;
2906 p = next;
2907 continue;
2908 }
2909
2910 case UNDERFLOW_FRAME:
2911 tso->stackobj->sp = p;
2912 threadStackUnderflow(cap,tso);
2913 p = tso->stackobj->sp;
2914 continue;
2915
2916 default:
2917 ASSERT(info->i.type != CATCH_FRAME);
2918 ASSERT(info->i.type != STOP_FRAME);
2919 p = next;
2920 continue;
2921 }
2922 }
2923 }
2924
2925 /* -----------------------------------------------------------------------------
2926 resurrectThreads is called after garbage collection on the list of
2927 threads found to be garbage. Each of these threads will be woken
2928 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2929 on an MVar, or NonTermination if the thread was blocked on a Black
2930 Hole.
2931
2932 Locks: assumes we hold *all* the capabilities.
2933 -------------------------------------------------------------------------- */
2934
2935 void
2936 resurrectThreads (StgTSO *threads)
2937 {
2938 StgTSO *tso, *next;
2939 Capability *cap;
2940 generation *gen;
2941
2942 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2943 next = tso->global_link;
2944
2945 gen = Bdescr((P_)tso)->gen;
2946 tso->global_link = gen->threads;
2947 gen->threads = tso;
2948
2949 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2950
2951 // Wake up the thread on the Capability it was last on
2952 cap = tso->cap;
2953
2954 switch (tso->why_blocked) {
2955 case BlockedOnMVar:
2956 case BlockedOnMVarRead:
2957 /* Called by GC - sched_mutex lock is currently held. */
2958 throwToSingleThreaded(cap, tso,
2959 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2960 break;
2961 case BlockedOnBlackHole:
2962 throwToSingleThreaded(cap, tso,
2963 (StgClosure *)nonTermination_closure);
2964 break;
2965 case BlockedOnSTM:
2966 throwToSingleThreaded(cap, tso,
2967 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2968 break;
2969 case NotBlocked:
2970 /* This might happen if the thread was blocked on a black hole
2971 * belonging to a thread that we've just woken up (raiseAsync
2972 * can wake up threads, remember...).
2973 */
2974 continue;
2975 case BlockedOnMsgThrowTo:
2976 // This can happen if the target is masking, blocks on a
2977 // black hole, and then is found to be unreachable. In
2978 // this case, we want to let the target wake up and carry
2979 // on, and do nothing to this thread.
2980 continue;
2981 default:
2982 barf("resurrectThreads: thread blocked in a strange way: %d",
2983 tso->why_blocked);
2984 }
2985 }
2986 }