Make it possible to use +RTS -qn without -N
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) {
709 spare_threads = 0;
710 }
711
712 // Figure out how many capabilities we want to wake up. We need at least
713 // sparkPoolSize(cap) plus the number of spare threads we have.
714 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715 if (n_wanted_caps == 0) return;
716
717 // First grab as many free Capabilities as we can. ToDo: we should use
718 // capabilities on the same NUMA node preferably, but not exclusively.
719 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
720 n_free_caps < n_wanted_caps && i != cap->no;
721 i = (i + 1) % n_capabilities) {
722 cap0 = capabilities[i];
723 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724 if (!emptyRunQueue(cap0)
725 || cap0->n_returning_tasks != 0
726 || !emptyInbox(cap0)) {
727 // it already has some work, we just grabbed it at
728 // the wrong moment. Or maybe it's deadlocked!
729 releaseCapability(cap0);
730 } else {
731 free_caps[n_free_caps++] = cap0;
732 }
733 }
734 }
735
736 // We now have n_free_caps free capabilities stashed in
737 // free_caps[]. Attempt to share our run queue equally with them.
738 // This is complicated slightly by the fact that we can't move
739 // some threads:
740 //
741 // - threads that have TSO_LOCKED cannot migrate
742 // - a thread that is bound to the current Task cannot be migrated
743 //
744 // This is about the simplest thing we could do; improvements we
745 // might want to do include:
746 //
747 // - giving high priority to moving relatively new threads, on
748 // the gournds that they haven't had time to build up a
749 // working set in the cache on this CPU/Capability.
750 //
751 // - giving low priority to moving long-lived threads
752
753 if (n_free_caps > 0) {
754 StgTSO *prev, *t, *next;
755
756 debugTrace(DEBUG_sched,
757 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
758 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
759 n_free_caps);
760
761 // There are n_free_caps+1 caps in total. We will share the threads
762 // evently between them, *except* that if the run queue does not divide
763 // evenly by n_free_caps+1 then we bias towards the current capability.
764 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
765 uint32_t keep_threads =
766 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
767
768 // This also ensures that we don't give away all our threads, since
769 // (x + y) / (y + 1) >= 1 when x >= 1.
770
771 // The number of threads we have left.
772 uint32_t n = cap->n_run_queue;
773
774 // prev = the previous thread on this cap's run queue
775 prev = END_TSO_QUEUE;
776
777 // We're going to walk through the run queue, migrating threads to other
778 // capabilities until we have only keep_threads left. We might
779 // encounter a thread that cannot be migrated, in which case we add it
780 // to the current run queue and decrement keep_threads.
781 for (t = cap->run_queue_hd, i = 0;
782 t != END_TSO_QUEUE && n > keep_threads;
783 t = next)
784 {
785 next = t->_link;
786 t->_link = END_TSO_QUEUE;
787
788 // Should we keep this thread?
789 if (t->bound == task->incall // don't move my bound thread
790 || tsoLocked(t) // don't move a locked thread
791 ) {
792 if (prev == END_TSO_QUEUE) {
793 cap->run_queue_hd = t;
794 } else {
795 setTSOLink(cap, prev, t);
796 }
797 setTSOPrev(cap, t, prev);
798 prev = t;
799 if (keep_threads > 0) keep_threads--;
800 }
801
802 // Or migrate it?
803 else {
804 appendToRunQueue(free_caps[i],t);
805 traceEventMigrateThread (cap, t, free_caps[i]->no);
806
807 if (t->bound) { t->bound->task->cap = free_caps[i]; }
808 t->cap = free_caps[i];
809 n--; // we have one fewer threads now
810 i++; // move on to the next free_cap
811 if (i == n_free_caps) i = 0;
812 }
813 }
814
815 // Join up the beginning of the queue (prev)
816 // with the rest of the queue (t)
817 if (t == END_TSO_QUEUE) {
818 cap->run_queue_tl = prev;
819 } else {
820 setTSOPrev(cap, t, prev);
821 }
822 if (prev == END_TSO_QUEUE) {
823 cap->run_queue_hd = t;
824 } else {
825 setTSOLink(cap, prev, t);
826 }
827 cap->n_run_queue = n;
828
829 IF_DEBUG(sanity, checkRunQueue(cap));
830
831 // release the capabilities
832 for (i = 0; i < n_free_caps; i++) {
833 task->cap = free_caps[i];
834 if (sparkPoolSizeCap(cap) > 0) {
835 // If we have sparks to steal, wake up a worker on the
836 // capability, even if it has no threads to run.
837 releaseAndWakeupCapability(free_caps[i]);
838 } else {
839 releaseCapability(free_caps[i]);
840 }
841 }
842 }
843 task->cap = cap; // reset to point to our Capability.
844
845 #endif /* THREADED_RTS */
846
847 }
848
849 /* ----------------------------------------------------------------------------
850 * Start any pending signal handlers
851 * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
858 // safe outside the lock
859 startSignalHandlers(cap);
860 }
861 }
862 #else
863 static void
864 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
865 {
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870 * Check for blocked threads that can be woken up.
871 * ------------------------------------------------------------------------- */
872
873 static void
874 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
875 {
876 #if !defined(THREADED_RTS)
877 //
878 // Check whether any waiting threads need to be woken up. If the
879 // run queue is empty, and there are no other tasks running, we
880 // can wait indefinitely for something to happen.
881 //
882 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
883 {
884 awaitEvent (emptyRunQueue(cap));
885 }
886 #endif
887 }
888
889 /* ----------------------------------------------------------------------------
890 * Detect deadlock conditions and attempt to resolve them.
891 * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability **pcap, Task *task)
895 {
896 Capability *cap = *pcap;
897 /*
898 * Detect deadlock: when we have no threads to run, there are no
899 * threads blocked, waiting for I/O, or sleeping, and all the
900 * other tasks are waiting for work, we must have a deadlock of
901 * some description.
902 */
903 if ( emptyThreadQueues(cap) )
904 {
905 #if defined(THREADED_RTS)
906 /*
907 * In the threaded RTS, we only check for deadlock if there
908 * has been no activity in a complete timeslice. This means
909 * we won't eagerly start a full GC just because we don't have
910 * any threads to run currently.
911 */
912 if (recent_activity != ACTIVITY_INACTIVE) return;
913 #endif
914
915 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
916
917 // Garbage collection can release some new threads due to
918 // either (a) finalizers or (b) threads resurrected because
919 // they are unreachable and will therefore be sent an
920 // exception. Any threads thus released will be immediately
921 // runnable.
922 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
923 cap = *pcap;
924 // when force_major == rtsTrue. scheduleDoGC sets
925 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
926 // signal.
927
928 if ( !emptyRunQueue(cap) ) return;
929
930 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
931 /* If we have user-installed signal handlers, then wait
932 * for signals to arrive rather then bombing out with a
933 * deadlock.
934 */
935 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
936 debugTrace(DEBUG_sched,
937 "still deadlocked, waiting for signals...");
938
939 awaitUserSignals();
940
941 if (signals_pending()) {
942 startSignalHandlers(cap);
943 }
944
945 // either we have threads to run, or we were interrupted:
946 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
947
948 return;
949 }
950 #endif
951
952 #if !defined(THREADED_RTS)
953 /* Probably a real deadlock. Send the current main thread the
954 * Deadlock exception.
955 */
956 if (task->incall->tso) {
957 switch (task->incall->tso->why_blocked) {
958 case BlockedOnSTM:
959 case BlockedOnBlackHole:
960 case BlockedOnMsgThrowTo:
961 case BlockedOnMVar:
962 case BlockedOnMVarRead:
963 throwToSingleThreaded(cap, task->incall->tso,
964 (StgClosure *)nonTermination_closure);
965 return;
966 default:
967 barf("deadlock: main thread blocked in a strange way");
968 }
969 }
970 return;
971 #endif
972 }
973 }
974
975
976 /* ----------------------------------------------------------------------------
977 * Process message in the current Capability's inbox
978 * ------------------------------------------------------------------------- */
979
980 static void
981 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
982 {
983 #if defined(THREADED_RTS)
984 Message *m, *next;
985 PutMVar *p, *pnext;
986 int r;
987 Capability *cap = *pcap;
988
989 while (!emptyInbox(cap)) {
990 if (cap->r.rCurrentNursery->link == NULL ||
991 g0->n_new_large_words >= large_alloc_lim) {
992 scheduleDoGC(pcap, cap->running_task, rtsFalse);
993 cap = *pcap;
994 }
995
996 // don't use a blocking acquire; if the lock is held by
997 // another thread then just carry on. This seems to avoid
998 // getting stuck in a message ping-pong situation with other
999 // processors. We'll check the inbox again later anyway.
1000 //
1001 // We should really use a more efficient queue data structure
1002 // here. The trickiness is that we must ensure a Capability
1003 // never goes idle if the inbox is non-empty, which is why we
1004 // use cap->lock (cap->lock is released as the last thing
1005 // before going idle; see Capability.c:releaseCapability()).
1006 r = TRY_ACQUIRE_LOCK(&cap->lock);
1007 if (r != 0) return;
1008
1009 m = cap->inbox;
1010 p = cap->putMVars;
1011 cap->inbox = (Message*)END_TSO_QUEUE;
1012 cap->putMVars = NULL;
1013
1014 RELEASE_LOCK(&cap->lock);
1015
1016 while (m != (Message*)END_TSO_QUEUE) {
1017 next = m->link;
1018 executeMessage(cap, m);
1019 m = next;
1020 }
1021
1022 while (p != NULL) {
1023 pnext = p->link;
1024 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1025 Unit_closure);
1026 freeStablePtr(p->mvar);
1027 stgFree(p);
1028 p = pnext;
1029 }
1030 }
1031 #endif
1032 }
1033
1034
1035 /* ----------------------------------------------------------------------------
1036 * Activate spark threads (THREADED_RTS)
1037 * ------------------------------------------------------------------------- */
1038
1039 #if defined(THREADED_RTS)
1040 static void
1041 scheduleActivateSpark(Capability *cap)
1042 {
1043 if (anySparks() && !cap->disabled)
1044 {
1045 createSparkThread(cap);
1046 debugTrace(DEBUG_sched, "creating a spark thread");
1047 }
1048 }
1049 #endif // THREADED_RTS
1050
1051 /* ----------------------------------------------------------------------------
1052 * After running a thread...
1053 * ------------------------------------------------------------------------- */
1054
1055 static void
1056 schedulePostRunThread (Capability *cap, StgTSO *t)
1057 {
1058 // We have to be able to catch transactions that are in an
1059 // infinite loop as a result of seeing an inconsistent view of
1060 // memory, e.g.
1061 //
1062 // atomically $ do
1063 // [a,b] <- mapM readTVar [ta,tb]
1064 // when (a == b) loop
1065 //
1066 // and a is never equal to b given a consistent view of memory.
1067 //
1068 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1069 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1070 debugTrace(DEBUG_sched | DEBUG_stm,
1071 "trec %p found wasting its time", t);
1072
1073 // strip the stack back to the
1074 // ATOMICALLY_FRAME, aborting the (nested)
1075 // transaction, and saving the stack of any
1076 // partially-evaluated thunks on the heap.
1077 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1078
1079 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1080 }
1081 }
1082
1083 //
1084 // If the current thread's allocation limit has run out, send it
1085 // the AllocationLimitExceeded exception.
1086
1087 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1088 // Use a throwToSelf rather than a throwToSingleThreaded, because
1089 // it correctly handles the case where the thread is currently
1090 // inside mask. Also the thread might be blocked (e.g. on an
1091 // MVar), and throwToSingleThreaded doesn't unblock it
1092 // correctly in that case.
1093 throwToSelf(cap, t, allocationLimitExceeded_closure);
1094 ASSIGN_Int64((W_*)&(t->alloc_limit),
1095 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1096 }
1097
1098 /* some statistics gathering in the parallel case */
1099 }
1100
1101 /* -----------------------------------------------------------------------------
1102 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1103 * -------------------------------------------------------------------------- */
1104
1105 static rtsBool
1106 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1107 {
1108 if (cap->r.rHpLim == NULL || cap->context_switch) {
1109 // Sometimes we miss a context switch, e.g. when calling
1110 // primitives in a tight loop, MAYBE_GC() doesn't check the
1111 // context switch flag, and we end up waiting for a GC.
1112 // See #1984, and concurrent/should_run/1984
1113 cap->context_switch = 0;
1114 appendToRunQueue(cap,t);
1115 } else {
1116 pushOnRunQueue(cap,t);
1117 }
1118
1119 // did the task ask for a large block?
1120 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1121 // if so, get one and push it on the front of the nursery.
1122 bdescr *bd;
1123 W_ blocks;
1124
1125 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1126
1127 if (blocks > BLOCKS_PER_MBLOCK) {
1128 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1129 }
1130
1131 debugTrace(DEBUG_sched,
1132 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1133 (long)t->id, what_next_strs[t->what_next], blocks);
1134
1135 // don't do this if the nursery is (nearly) full, we'll GC first.
1136 if (cap->r.rCurrentNursery->link != NULL ||
1137 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1138 // infinite loop if the
1139 // nursery has only one
1140 // block.
1141
1142 bd = allocGroupOnNode_lock(cap->node,blocks);
1143 cap->r.rNursery->n_blocks += blocks;
1144
1145 // link the new group after CurrentNursery
1146 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1147
1148 // initialise it as a nursery block. We initialise the
1149 // step, gen_no, and flags field of *every* sub-block in
1150 // this large block, because this is easier than making
1151 // sure that we always find the block head of a large
1152 // block whenever we call Bdescr() (eg. evacuate() and
1153 // isAlive() in the GC would both have to do this, at
1154 // least).
1155 {
1156 bdescr *x;
1157 for (x = bd; x < bd + blocks; x++) {
1158 initBdescr(x,g0,g0);
1159 x->free = x->start;
1160 x->flags = 0;
1161 }
1162 }
1163
1164 // This assert can be a killer if the app is doing lots
1165 // of large block allocations.
1166 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1167
1168 // now update the nursery to point to the new block
1169 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1170 cap->r.rCurrentNursery = bd;
1171
1172 // we might be unlucky and have another thread get on the
1173 // run queue before us and steal the large block, but in that
1174 // case the thread will just end up requesting another large
1175 // block.
1176 return rtsFalse; /* not actually GC'ing */
1177 }
1178 }
1179
1180 // if we got here because we exceeded large_alloc_lim, then
1181 // proceed straight to GC.
1182 if (g0->n_new_large_words >= large_alloc_lim) {
1183 return rtsTrue;
1184 }
1185
1186 // Otherwise, we just ran out of space in the current nursery.
1187 // Grab another nursery if we can.
1188 if (getNewNursery(cap)) {
1189 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1190 return rtsFalse;
1191 }
1192
1193 return rtsTrue;
1194 /* actual GC is done at the end of the while loop in schedule() */
1195 }
1196
1197 /* -----------------------------------------------------------------------------
1198 * Handle a thread that returned to the scheduler with ThreadYielding
1199 * -------------------------------------------------------------------------- */
1200
1201 static rtsBool
1202 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1203 {
1204 /* put the thread back on the run queue. Then, if we're ready to
1205 * GC, check whether this is the last task to stop. If so, wake
1206 * up the GC thread. getThread will block during a GC until the
1207 * GC is finished.
1208 */
1209
1210 ASSERT(t->_link == END_TSO_QUEUE);
1211
1212 // Shortcut if we're just switching evaluators: don't bother
1213 // doing stack squeezing (which can be expensive), just run the
1214 // thread.
1215 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1216 debugTrace(DEBUG_sched,
1217 "--<< thread %ld (%s) stopped to switch evaluators",
1218 (long)t->id, what_next_strs[t->what_next]);
1219 return rtsTrue;
1220 }
1221
1222 // Reset the context switch flag. We don't do this just before
1223 // running the thread, because that would mean we would lose ticks
1224 // during GC, which can lead to unfair scheduling (a thread hogs
1225 // the CPU because the tick always arrives during GC). This way
1226 // penalises threads that do a lot of allocation, but that seems
1227 // better than the alternative.
1228 if (cap->context_switch != 0) {
1229 cap->context_switch = 0;
1230 appendToRunQueue(cap,t);
1231 } else {
1232 pushOnRunQueue(cap,t);
1233 }
1234
1235 IF_DEBUG(sanity,
1236 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237 checkTSO(t));
1238
1239 return rtsFalse;
1240 }
1241
1242 /* -----------------------------------------------------------------------------
1243 * Handle a thread that returned to the scheduler with ThreadBlocked
1244 * -------------------------------------------------------------------------- */
1245
1246 static void
1247 scheduleHandleThreadBlocked( StgTSO *t
1248 #if !defined(DEBUG)
1249 STG_UNUSED
1250 #endif
1251 )
1252 {
1253
1254 // We don't need to do anything. The thread is blocked, and it
1255 // has tidied up its stack and placed itself on whatever queue
1256 // it needs to be on.
1257
1258 // ASSERT(t->why_blocked != NotBlocked);
1259 // Not true: for example,
1260 // - the thread may have woken itself up already, because
1261 // threadPaused() might have raised a blocked throwTo
1262 // exception, see maybePerformBlockedException().
1263
1264 #ifdef DEBUG
1265 traceThreadStatus(DEBUG_sched, t);
1266 #endif
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270 * Handle a thread that returned to the scheduler with ThreadFinished
1271 * -------------------------------------------------------------------------- */
1272
1273 static rtsBool
1274 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1275 {
1276 /* Need to check whether this was a main thread, and if so,
1277 * return with the return value.
1278 *
1279 * We also end up here if the thread kills itself with an
1280 * uncaught exception, see Exception.cmm.
1281 */
1282
1283 // blocked exceptions can now complete, even if the thread was in
1284 // blocked mode (see #2910).
1285 awakenBlockedExceptionQueue (cap, t);
1286
1287 //
1288 // Check whether the thread that just completed was a bound
1289 // thread, and if so return with the result.
1290 //
1291 // There is an assumption here that all thread completion goes
1292 // through this point; we need to make sure that if a thread
1293 // ends up in the ThreadKilled state, that it stays on the run
1294 // queue so it can be dealt with here.
1295 //
1296
1297 if (t->bound) {
1298
1299 if (t->bound != task->incall) {
1300 #if !defined(THREADED_RTS)
1301 // Must be a bound thread that is not the topmost one. Leave
1302 // it on the run queue until the stack has unwound to the
1303 // point where we can deal with this. Leaving it on the run
1304 // queue also ensures that the garbage collector knows about
1305 // this thread and its return value (it gets dropped from the
1306 // step->threads list so there's no other way to find it).
1307 appendToRunQueue(cap,t);
1308 return rtsFalse;
1309 #else
1310 // this cannot happen in the threaded RTS, because a
1311 // bound thread can only be run by the appropriate Task.
1312 barf("finished bound thread that isn't mine");
1313 #endif
1314 }
1315
1316 ASSERT(task->incall->tso == t);
1317
1318 if (t->what_next == ThreadComplete) {
1319 if (task->incall->ret) {
1320 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1321 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1322 }
1323 task->incall->rstat = Success;
1324 } else {
1325 if (task->incall->ret) {
1326 *(task->incall->ret) = NULL;
1327 }
1328 if (sched_state >= SCHED_INTERRUPTING) {
1329 if (heap_overflow) {
1330 task->incall->rstat = HeapExhausted;
1331 } else {
1332 task->incall->rstat = Interrupted;
1333 }
1334 } else {
1335 task->incall->rstat = Killed;
1336 }
1337 }
1338 #ifdef DEBUG
1339 removeThreadLabel((StgWord)task->incall->tso->id);
1340 #endif
1341
1342 // We no longer consider this thread and task to be bound to
1343 // each other. The TSO lives on until it is GC'd, but the
1344 // task is about to be released by the caller, and we don't
1345 // want anyone following the pointer from the TSO to the
1346 // defunct task (which might have already been
1347 // re-used). This was a real bug: the GC updated
1348 // tso->bound->tso which lead to a deadlock.
1349 t->bound = NULL;
1350 task->incall->tso = NULL;
1351
1352 return rtsTrue; // tells schedule() to return
1353 }
1354
1355 return rtsFalse;
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359 * Perform a heap census
1360 * -------------------------------------------------------------------------- */
1361
1362 static rtsBool
1363 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1364 {
1365 // When we have +RTS -i0 and we're heap profiling, do a census at
1366 // every GC. This lets us get repeatable runs for debugging.
1367 if (performHeapProfile ||
1368 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1369 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1370 return rtsTrue;
1371 } else {
1372 return rtsFalse;
1373 }
1374 }
1375
1376 /* -----------------------------------------------------------------------------
1377 * stopAllCapabilities()
1378 *
1379 * Stop all Haskell execution. This is used when we need to make some global
1380 * change to the system, such as altering the number of capabilities, or
1381 * forking.
1382 *
1383 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1384 * -------------------------------------------------------------------------- */
1385
1386 #if defined(THREADED_RTS)
1387 static void stopAllCapabilities (Capability **pCap, Task *task)
1388 {
1389 rtsBool was_syncing;
1390 SyncType prev_sync_type;
1391
1392 PendingSync sync = {
1393 .type = SYNC_OTHER,
1394 .idle = NULL,
1395 .task = task
1396 };
1397
1398 do {
1399 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1400 } while (was_syncing);
1401
1402 acquireAllCapabilities(*pCap,task);
1403
1404 pending_sync = 0;
1405 }
1406 #endif
1407
1408 /* -----------------------------------------------------------------------------
1409 * requestSync()
1410 *
1411 * Commence a synchronisation between all capabilities. Normally not called
1412 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1413 * has some special synchronisation requirements.
1414 *
1415 * Returns:
1416 * rtsFalse if we successfully got a sync
1417 * rtsTrue if there was another sync request in progress,
1418 * and we yielded to it. The value returned is the
1419 * type of the other sync request.
1420 * -------------------------------------------------------------------------- */
1421
1422 #if defined(THREADED_RTS)
1423 static rtsBool requestSync (
1424 Capability **pcap, Task *task, PendingSync *new_sync,
1425 SyncType *prev_sync_type)
1426 {
1427 PendingSync *sync;
1428
1429 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1430 (StgWord)NULL,
1431 (StgWord)new_sync);
1432
1433 if (sync != NULL)
1434 {
1435 // sync is valid until we have called yieldCapability().
1436 // After the sync is completed, we cannot read that struct any
1437 // more because it has been freed.
1438 *prev_sync_type = sync->type;
1439 do {
1440 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1441 sync->type);
1442 ASSERT(*pcap);
1443 yieldCapability(pcap,task,rtsTrue);
1444 sync = pending_sync;
1445 } while (sync != NULL);
1446
1447 // NOTE: task->cap might have changed now
1448 return rtsTrue;
1449 }
1450 else
1451 {
1452 return rtsFalse;
1453 }
1454 }
1455 #endif
1456
1457 /* -----------------------------------------------------------------------------
1458 * acquireAllCapabilities()
1459 *
1460 * Grab all the capabilities except the one we already hold. Used
1461 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1462 * before a fork (SYNC_OTHER).
1463 *
1464 * Only call this after requestSync(), otherwise a deadlock might
1465 * ensue if another thread is trying to synchronise.
1466 * -------------------------------------------------------------------------- */
1467
1468 #ifdef THREADED_RTS
1469 static void acquireAllCapabilities(Capability *cap, Task *task)
1470 {
1471 Capability *tmpcap;
1472 uint32_t i;
1473
1474 ASSERT(pending_sync != NULL);
1475 for (i=0; i < n_capabilities; i++) {
1476 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1477 i, n_capabilities);
1478 tmpcap = capabilities[i];
1479 if (tmpcap != cap) {
1480 // we better hope this task doesn't get migrated to
1481 // another Capability while we're waiting for this one.
1482 // It won't, because load balancing happens while we have
1483 // all the Capabilities, but even so it's a slightly
1484 // unsavoury invariant.
1485 task->cap = tmpcap;
1486 waitForCapability(&tmpcap, task);
1487 if (tmpcap->no != i) {
1488 barf("acquireAllCapabilities: got the wrong capability");
1489 }
1490 }
1491 }
1492 task->cap = cap;
1493 }
1494 #endif
1495
1496 /* -----------------------------------------------------------------------------
1497 * releaseAllcapabilities()
1498 *
1499 * Assuming this thread holds all the capabilities, release them all except for
1500 * the one passed in as cap.
1501 * -------------------------------------------------------------------------- */
1502
1503 #ifdef THREADED_RTS
1504 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1505 {
1506 uint32_t i;
1507
1508 for (i = 0; i < n; i++) {
1509 if (cap->no != i) {
1510 task->cap = capabilities[i];
1511 releaseCapability(capabilities[i]);
1512 }
1513 }
1514 task->cap = cap;
1515 }
1516 #endif
1517
1518 /* -----------------------------------------------------------------------------
1519 * Perform a garbage collection if necessary
1520 * -------------------------------------------------------------------------- */
1521
1522 static void
1523 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1524 rtsBool force_major)
1525 {
1526 Capability *cap = *pcap;
1527 rtsBool heap_census;
1528 uint32_t collect_gen;
1529 rtsBool major_gc;
1530 #ifdef THREADED_RTS
1531 uint32_t gc_type;
1532 uint32_t i;
1533 uint32_t need_idle;
1534 uint32_t n_gc_threads;
1535 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1536 StgTSO *tso;
1537 rtsBool *idle_cap;
1538 #endif
1539
1540 if (sched_state == SCHED_SHUTTING_DOWN) {
1541 // The final GC has already been done, and the system is
1542 // shutting down. We'll probably deadlock if we try to GC
1543 // now.
1544 return;
1545 }
1546
1547 heap_census = scheduleNeedHeapProfile(rtsTrue);
1548
1549 // Figure out which generation we are collecting, so that we can
1550 // decide whether this is a parallel GC or not.
1551 collect_gen = calcNeeded(force_major || heap_census, NULL);
1552 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1553
1554 #ifdef THREADED_RTS
1555 if (sched_state < SCHED_INTERRUPTING
1556 && RtsFlags.ParFlags.parGcEnabled
1557 && collect_gen >= RtsFlags.ParFlags.parGcGen
1558 && ! oldest_gen->mark)
1559 {
1560 gc_type = SYNC_GC_PAR;
1561 } else {
1562 gc_type = SYNC_GC_SEQ;
1563 }
1564
1565 // In order to GC, there must be no threads running Haskell code.
1566 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1567 // capabilities, and release them after the GC has completed. For parallel
1568 // GC, we synchronise all the running threads using requestSync().
1569 //
1570 // Other capabilities are prevented from running yet more Haskell threads if
1571 // pending_sync is set. Tested inside yieldCapability() and
1572 // releaseCapability() in Capability.c
1573
1574 PendingSync sync = {
1575 .type = gc_type,
1576 .idle = NULL,
1577 .task = task
1578 };
1579
1580 {
1581 SyncType prev_sync = 0;
1582 rtsBool was_syncing;
1583 do {
1584 // If -qn is not set and we have more capabilities than cores, set
1585 // the number of GC threads to #cores. We do this here rather than
1586 // in normaliseRtsOpts() because here it will work if the program
1587 // calls setNumCapabilities.
1588 //
1589 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1590 if (n_gc_threads == 0 &&
1591 enabled_capabilities > getNumberOfProcessors()) {
1592 n_gc_threads = getNumberOfProcessors();
1593 }
1594
1595 // This calculation must be inside the loop because
1596 // enabled_capabilities may change if requestSync() below fails and
1597 // we retry.
1598 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1599 if (n_gc_threads >= enabled_capabilities) {
1600 need_idle = 0;
1601 } else {
1602 need_idle = enabled_capabilities - n_gc_threads;
1603 }
1604 } else {
1605 need_idle = 0;
1606 }
1607
1608 // We need an array of size n_capabilities, but since this may
1609 // change each time around the loop we must allocate it afresh.
1610 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1611 sizeof(rtsBool),
1612 "scheduleDoGC");
1613 sync.idle = idle_cap;
1614
1615 // When using +RTS -qn, we need some capabilities to be idle during
1616 // GC. The best bet is to choose some inactive ones, so we look for
1617 // those first:
1618 uint32_t n_idle = need_idle;
1619 for (i=0; i < n_capabilities; i++) {
1620 if (capabilities[i]->disabled) {
1621 idle_cap[i] = rtsTrue;
1622 } else if (n_idle > 0 &&
1623 capabilities[i]->running_task == NULL) {
1624 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1625 n_idle--;
1626 idle_cap[i] = rtsTrue;
1627 } else {
1628 idle_cap[i] = rtsFalse;
1629 }
1630 }
1631 // If we didn't find enough inactive capabilities, just pick some
1632 // more to be idle.
1633 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1634 if (!idle_cap[i] && i != cap->no) {
1635 idle_cap[i] = rtsTrue;
1636 n_idle--;
1637 }
1638 }
1639 ASSERT(n_idle == 0);
1640
1641 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1642 cap = *pcap;
1643 if (was_syncing) {
1644 stgFree(idle_cap);
1645 }
1646 if (was_syncing &&
1647 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1648 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1649 // someone else had a pending sync request for a GC, so
1650 // let's assume GC has been done and we don't need to GC
1651 // again.
1652 // Exception to this: if SCHED_INTERRUPTING, then we still
1653 // need to do the final GC.
1654 return;
1655 }
1656 if (sched_state == SCHED_SHUTTING_DOWN) {
1657 // The scheduler might now be shutting down. We tested
1658 // this above, but it might have become true since then as
1659 // we yielded the capability in requestSync().
1660 return;
1661 }
1662 } while (was_syncing);
1663 }
1664
1665 stat_startGCSync(gc_threads[cap->no]);
1666
1667 #ifdef DEBUG
1668 unsigned int old_n_capabilities = n_capabilities;
1669 #endif
1670
1671 interruptAllCapabilities();
1672
1673 // The final shutdown GC is always single-threaded, because it's
1674 // possible that some of the Capabilities have no worker threads.
1675
1676 if (gc_type == SYNC_GC_SEQ) {
1677 traceEventRequestSeqGc(cap);
1678 } else {
1679 traceEventRequestParGc(cap);
1680 }
1681
1682 if (gc_type == SYNC_GC_SEQ) {
1683 // single-threaded GC: grab all the capabilities
1684 acquireAllCapabilities(cap,task);
1685 }
1686 else
1687 {
1688 // If we are load-balancing collections in this
1689 // generation, then we require all GC threads to participate
1690 // in the collection. Otherwise, we only require active
1691 // threads to participate, and we set gc_threads[i]->idle for
1692 // any idle capabilities. The rationale here is that waking
1693 // up an idle Capability takes much longer than just doing any
1694 // GC work on its behalf.
1695
1696 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1697 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1698 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1699 {
1700 for (i=0; i < n_capabilities; i++) {
1701 if (capabilities[i]->disabled) {
1702 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1703 if (idle_cap[i]) {
1704 n_idle_caps++;
1705 }
1706 } else {
1707 if (i != cap->no && idle_cap[i]) {
1708 Capability *tmpcap = capabilities[i];
1709 task->cap = tmpcap;
1710 waitForCapability(&tmpcap, task);
1711 n_idle_caps++;
1712 }
1713 }
1714 }
1715 }
1716 else
1717 {
1718 for (i=0; i < n_capabilities; i++) {
1719 if (capabilities[i]->disabled) {
1720 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1721 if (idle_cap[i]) {
1722 n_idle_caps++;
1723 }
1724 } else if (i != cap->no &&
1725 capabilities[i]->idle >=
1726 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1727 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1728 if (idle_cap[i]) {
1729 n_idle_caps++;
1730 } else {
1731 n_failed_trygrab_idles++;
1732 }
1733 }
1734 }
1735 }
1736 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1737
1738 // We set the gc_thread[i]->idle flag if that
1739 // capability/thread is not participating in this collection.
1740 // We also keep a local record of which capabilities are idle
1741 // in idle_cap[], because scheduleDoGC() is re-entrant:
1742 // another thread might start a GC as soon as we've finished
1743 // this one, and thus the gc_thread[]->idle flags are invalid
1744 // as soon as we release any threads after GC. Getting this
1745 // wrong leads to a rare and hard to debug deadlock!
1746
1747 for (i=0; i < n_capabilities; i++) {
1748 gc_threads[i]->idle = idle_cap[i];
1749 capabilities[i]->idle++;
1750 }
1751
1752 // For all capabilities participating in this GC, wait until
1753 // they have stopped mutating and are standing by for GC.
1754 waitForGcThreads(cap);
1755
1756 #if defined(THREADED_RTS)
1757 // Stable point where we can do a global check on our spark counters
1758 ASSERT(checkSparkCountInvariant());
1759 #endif
1760 }
1761
1762 #endif
1763
1764 IF_DEBUG(scheduler, printAllThreads());
1765
1766 delete_threads_and_gc:
1767 /*
1768 * We now have all the capabilities; if we're in an interrupting
1769 * state, then we should take the opportunity to delete all the
1770 * threads in the system.
1771 * Checking for major_gc ensures that the last GC is major.
1772 */
1773 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1774 deleteAllThreads(cap);
1775 #if defined(THREADED_RTS)
1776 // Discard all the sparks from every Capability. Why?
1777 // They'll probably be GC'd anyway since we've killed all the
1778 // threads. It just avoids the GC having to do any work to
1779 // figure out that any remaining sparks are garbage.
1780 for (i = 0; i < n_capabilities; i++) {
1781 capabilities[i]->spark_stats.gcd +=
1782 sparkPoolSize(capabilities[i]->sparks);
1783 // No race here since all Caps are stopped.
1784 discardSparksCap(capabilities[i]);
1785 }
1786 #endif
1787 sched_state = SCHED_SHUTTING_DOWN;
1788 }
1789
1790 /*
1791 * When there are disabled capabilities, we want to migrate any
1792 * threads away from them. Normally this happens in the
1793 * scheduler's loop, but only for unbound threads - it's really
1794 * hard for a bound thread to migrate itself. So we have another
1795 * go here.
1796 */
1797 #if defined(THREADED_RTS)
1798 for (i = enabled_capabilities; i < n_capabilities; i++) {
1799 Capability *tmp_cap, *dest_cap;
1800 tmp_cap = capabilities[i];
1801 ASSERT(tmp_cap->disabled);
1802 if (i != cap->no) {
1803 dest_cap = capabilities[i % enabled_capabilities];
1804 while (!emptyRunQueue(tmp_cap)) {
1805 tso = popRunQueue(tmp_cap);
1806 migrateThread(tmp_cap, tso, dest_cap);
1807 if (tso->bound) {
1808 traceTaskMigrate(tso->bound->task,
1809 tso->bound->task->cap,
1810 dest_cap);
1811 tso->bound->task->cap = dest_cap;
1812 }
1813 }
1814 }
1815 }
1816 #endif
1817
1818 #if defined(THREADED_RTS)
1819 // reset pending_sync *before* GC, so that when the GC threads
1820 // emerge they don't immediately re-enter the GC.
1821 pending_sync = 0;
1822 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1823 #else
1824 GarbageCollect(collect_gen, heap_census, 0, cap);
1825 #endif
1826
1827 traceSparkCounters(cap);
1828
1829 switch (recent_activity) {
1830 case ACTIVITY_INACTIVE:
1831 if (force_major) {
1832 // We are doing a GC because the system has been idle for a
1833 // timeslice and we need to check for deadlock. Record the
1834 // fact that we've done a GC and turn off the timer signal;
1835 // it will get re-enabled if we run any threads after the GC.
1836 recent_activity = ACTIVITY_DONE_GC;
1837 #ifndef PROFILING
1838 stopTimer();
1839 #endif
1840 break;
1841 }
1842 // fall through...
1843
1844 case ACTIVITY_MAYBE_NO:
1845 // the GC might have taken long enough for the timer to set
1846 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1847 // but we aren't necessarily deadlocked:
1848 recent_activity = ACTIVITY_YES;
1849 break;
1850
1851 case ACTIVITY_DONE_GC:
1852 // If we are actually active, the scheduler will reset the
1853 // recent_activity flag and re-enable the timer.
1854 break;
1855 }
1856
1857 #if defined(THREADED_RTS)
1858 // Stable point where we can do a global check on our spark counters
1859 ASSERT(checkSparkCountInvariant());
1860 #endif
1861
1862 // The heap census itself is done during GarbageCollect().
1863 if (heap_census) {
1864 performHeapProfile = rtsFalse;
1865 }
1866
1867 #if defined(THREADED_RTS)
1868
1869 // If n_capabilities has changed during GC, we're in trouble.
1870 ASSERT(n_capabilities == old_n_capabilities);
1871
1872 if (gc_type == SYNC_GC_PAR)
1873 {
1874 releaseGCThreads(cap);
1875 for (i = 0; i < n_capabilities; i++) {
1876 if (i != cap->no) {
1877 if (idle_cap[i]) {
1878 ASSERT(capabilities[i]->running_task == task);
1879 task->cap = capabilities[i];
1880 releaseCapability(capabilities[i]);
1881 } else {
1882 ASSERT(capabilities[i]->running_task != task);
1883 }
1884 }
1885 }
1886 task->cap = cap;
1887 }
1888 #endif
1889
1890 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1891 // GC set the heap_overflow flag, so we should proceed with
1892 // an orderly shutdown now. Ultimately we want the main
1893 // thread to return to its caller with HeapExhausted, at which
1894 // point the caller should call hs_exit(). The first step is
1895 // to delete all the threads.
1896 //
1897 // Another way to do this would be to raise an exception in
1898 // the main thread, which we really should do because it gives
1899 // the program a chance to clean up. But how do we find the
1900 // main thread? It should presumably be the same one that
1901 // gets ^C exceptions, but that's all done on the Haskell side
1902 // (GHC.TopHandler).
1903 sched_state = SCHED_INTERRUPTING;
1904 goto delete_threads_and_gc;
1905 }
1906
1907 #ifdef SPARKBALANCE
1908 /* JB
1909 Once we are all together... this would be the place to balance all
1910 spark pools. No concurrent stealing or adding of new sparks can
1911 occur. Should be defined in Sparks.c. */
1912 balanceSparkPoolsCaps(n_capabilities, capabilities);
1913 #endif
1914
1915 #if defined(THREADED_RTS)
1916 stgFree(idle_cap);
1917
1918 if (gc_type == SYNC_GC_SEQ) {
1919 // release our stash of capabilities.
1920 releaseAllCapabilities(n_capabilities, cap, task);
1921 }
1922 #endif
1923
1924 return;
1925 }
1926
1927 /* ---------------------------------------------------------------------------
1928 * Singleton fork(). Do not copy any running threads.
1929 * ------------------------------------------------------------------------- */
1930
1931 pid_t
1932 forkProcess(HsStablePtr *entry
1933 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1934 STG_UNUSED
1935 #endif
1936 )
1937 {
1938 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1939 pid_t pid;
1940 StgTSO* t,*next;
1941 Capability *cap;
1942 uint32_t g;
1943 Task *task = NULL;
1944 uint32_t i;
1945
1946 debugTrace(DEBUG_sched, "forking!");
1947
1948 task = newBoundTask();
1949
1950 cap = NULL;
1951 waitForCapability(&cap, task);
1952
1953 #ifdef THREADED_RTS
1954 stopAllCapabilities(&cap, task);
1955 #endif
1956
1957 // no funny business: hold locks while we fork, otherwise if some
1958 // other thread is holding a lock when the fork happens, the data
1959 // structure protected by the lock will forever be in an
1960 // inconsistent state in the child. See also #1391.
1961 ACQUIRE_LOCK(&sched_mutex);
1962 ACQUIRE_LOCK(&sm_mutex);
1963 ACQUIRE_LOCK(&stable_mutex);
1964 ACQUIRE_LOCK(&task->lock);
1965
1966 for (i=0; i < n_capabilities; i++) {
1967 ACQUIRE_LOCK(&capabilities[i]->lock);
1968 }
1969
1970 #ifdef THREADED_RTS
1971 ACQUIRE_LOCK(&all_tasks_mutex);
1972 #endif
1973
1974 stopTimer(); // See #4074
1975
1976 #if defined(TRACING)
1977 flushEventLog(); // so that child won't inherit dirty file buffers
1978 #endif
1979
1980 pid = fork();
1981
1982 if (pid) { // parent
1983
1984 startTimer(); // #4074
1985
1986 RELEASE_LOCK(&sched_mutex);
1987 RELEASE_LOCK(&sm_mutex);
1988 RELEASE_LOCK(&stable_mutex);
1989 RELEASE_LOCK(&task->lock);
1990
1991 for (i=0; i < n_capabilities; i++) {
1992 releaseCapability_(capabilities[i],rtsFalse);
1993 RELEASE_LOCK(&capabilities[i]->lock);
1994 }
1995
1996 #ifdef THREADED_RTS
1997 RELEASE_LOCK(&all_tasks_mutex);
1998 #endif
1999
2000 boundTaskExiting(task);
2001
2002 // just return the pid
2003 return pid;
2004
2005 } else { // child
2006
2007 #if defined(THREADED_RTS)
2008 initMutex(&sched_mutex);
2009 initMutex(&sm_mutex);
2010 initMutex(&stable_mutex);
2011 initMutex(&task->lock);
2012
2013 for (i=0; i < n_capabilities; i++) {
2014 initMutex(&capabilities[i]->lock);
2015 }
2016
2017 initMutex(&all_tasks_mutex);
2018 #endif
2019
2020 #ifdef TRACING
2021 resetTracing();
2022 #endif
2023
2024 // Now, all OS threads except the thread that forked are
2025 // stopped. We need to stop all Haskell threads, including
2026 // those involved in foreign calls. Also we need to delete
2027 // all Tasks, because they correspond to OS threads that are
2028 // now gone.
2029
2030 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2031 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2032 next = t->global_link;
2033 // don't allow threads to catch the ThreadKilled
2034 // exception, but we do want to raiseAsync() because these
2035 // threads may be evaluating thunks that we need later.
2036 deleteThread_(t->cap,t);
2037
2038 // stop the GC from updating the InCall to point to
2039 // the TSO. This is only necessary because the
2040 // OSThread bound to the TSO has been killed, and
2041 // won't get a chance to exit in the usual way (see
2042 // also scheduleHandleThreadFinished).
2043 t->bound = NULL;
2044 }
2045 }
2046
2047 discardTasksExcept(task);
2048
2049 for (i=0; i < n_capabilities; i++) {
2050 cap = capabilities[i];
2051
2052 // Empty the run queue. It seems tempting to let all the
2053 // killed threads stay on the run queue as zombies to be
2054 // cleaned up later, but some of them may correspond to
2055 // bound threads for which the corresponding Task does not
2056 // exist.
2057 truncateRunQueue(cap);
2058 cap->n_run_queue = 0;
2059
2060 // Any suspended C-calling Tasks are no more, their OS threads
2061 // don't exist now:
2062 cap->suspended_ccalls = NULL;
2063 cap->n_suspended_ccalls = 0;
2064
2065 #if defined(THREADED_RTS)
2066 // Wipe our spare workers list, they no longer exist. New
2067 // workers will be created if necessary.
2068 cap->spare_workers = NULL;
2069 cap->n_spare_workers = 0;
2070 cap->returning_tasks_hd = NULL;
2071 cap->returning_tasks_tl = NULL;
2072 cap->n_returning_tasks = 0;
2073 #endif
2074
2075 // Release all caps except 0, we'll use that for starting
2076 // the IO manager and running the client action below.
2077 if (cap->no != 0) {
2078 task->cap = cap;
2079 releaseCapability(cap);
2080 }
2081 }
2082 cap = capabilities[0];
2083 task->cap = cap;
2084
2085 // Empty the threads lists. Otherwise, the garbage
2086 // collector may attempt to resurrect some of these threads.
2087 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2088 generations[g].threads = END_TSO_QUEUE;
2089 }
2090
2091 // On Unix, all timers are reset in the child, so we need to start
2092 // the timer again.
2093 initTimer();
2094 startTimer();
2095
2096 // TODO: need to trace various other things in the child
2097 // like startup event, capabilities, process info etc
2098 traceTaskCreate(task, cap);
2099
2100 #if defined(THREADED_RTS)
2101 ioManagerStartCap(&cap);
2102 #endif
2103
2104 rts_evalStableIO(&cap, entry, NULL); // run the action
2105 rts_checkSchedStatus("forkProcess",cap);
2106
2107 rts_unlock(cap);
2108 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2109 }
2110 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2111 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2112 #endif
2113 }
2114
2115 /* ---------------------------------------------------------------------------
2116 * Changing the number of Capabilities
2117 *
2118 * Changing the number of Capabilities is very tricky! We can only do
2119 * it with the system fully stopped, so we do a full sync with
2120 * requestSync(SYNC_OTHER) and grab all the capabilities.
2121 *
2122 * Then we resize the appropriate data structures, and update all
2123 * references to the old data structures which have now moved.
2124 * Finally we release the Capabilities we are holding, and start
2125 * worker Tasks on the new Capabilities we created.
2126 *
2127 * ------------------------------------------------------------------------- */
2128
2129 void
2130 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2131 {
2132 #if !defined(THREADED_RTS)
2133 if (new_n_capabilities != 1) {
2134 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2135 }
2136 return;
2137 #elif defined(NOSMP)
2138 if (new_n_capabilities != 1) {
2139 errorBelch("setNumCapabilities: not supported on this platform");
2140 }
2141 return;
2142 #else
2143 Task *task;
2144 Capability *cap;
2145 uint32_t n;
2146 Capability *old_capabilities = NULL;
2147 uint32_t old_n_capabilities = n_capabilities;
2148
2149 if (new_n_capabilities == enabled_capabilities) return;
2150
2151 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2152 enabled_capabilities, new_n_capabilities);
2153
2154 cap = rts_lock();
2155 task = cap->running_task;
2156
2157 stopAllCapabilities(&cap, task);
2158
2159 if (new_n_capabilities < enabled_capabilities)
2160 {
2161 // Reducing the number of capabilities: we do not actually
2162 // remove the extra capabilities, we just mark them as
2163 // "disabled". This has the following effects:
2164 //
2165 // - threads on a disabled capability are migrated away by the
2166 // scheduler loop
2167 //
2168 // - disabled capabilities do not participate in GC
2169 // (see scheduleDoGC())
2170 //
2171 // - No spark threads are created on this capability
2172 // (see scheduleActivateSpark())
2173 //
2174 // - We do not attempt to migrate threads *to* a disabled
2175 // capability (see schedulePushWork()).
2176 //
2177 // but in other respects, a disabled capability remains
2178 // alive. Threads may be woken up on a disabled capability,
2179 // but they will be immediately migrated away.
2180 //
2181 // This approach is much easier than trying to actually remove
2182 // the capability; we don't have to worry about GC data
2183 // structures, the nursery, etc.
2184 //
2185 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2186 capabilities[n]->disabled = rtsTrue;
2187 traceCapDisable(capabilities[n]);
2188 }
2189 enabled_capabilities = new_n_capabilities;
2190 }
2191 else
2192 {
2193 // Increasing the number of enabled capabilities.
2194 //
2195 // enable any disabled capabilities, up to the required number
2196 for (n = enabled_capabilities;
2197 n < new_n_capabilities && n < n_capabilities; n++) {
2198 capabilities[n]->disabled = rtsFalse;
2199 traceCapEnable(capabilities[n]);
2200 }
2201 enabled_capabilities = n;
2202
2203 if (new_n_capabilities > n_capabilities) {
2204 #if defined(TRACING)
2205 // Allocate eventlog buffers for the new capabilities. Note this
2206 // must be done before calling moreCapabilities(), because that
2207 // will emit events about creating the new capabilities and adding
2208 // them to existing capsets.
2209 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2210 #endif
2211
2212 // Resize the capabilities array
2213 // NB. after this, capabilities points somewhere new. Any pointers
2214 // of type (Capability *) are now invalid.
2215 moreCapabilities(n_capabilities, new_n_capabilities);
2216
2217 // Resize and update storage manager data structures
2218 storageAddCapabilities(n_capabilities, new_n_capabilities);
2219 }
2220 }
2221
2222 // update n_capabilities before things start running
2223 if (new_n_capabilities > n_capabilities) {
2224 n_capabilities = enabled_capabilities = new_n_capabilities;
2225 }
2226
2227 // We're done: release the original Capabilities
2228 releaseAllCapabilities(old_n_capabilities, cap,task);
2229
2230 // We can't free the old array until now, because we access it
2231 // while updating pointers in updateCapabilityRefs().
2232 if (old_capabilities) {
2233 stgFree(old_capabilities);
2234 }
2235
2236 // Notify IO manager that the number of capabilities has changed.
2237 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2238
2239 rts_unlock(cap);
2240
2241 #endif // THREADED_RTS
2242 }
2243
2244
2245
2246 /* ---------------------------------------------------------------------------
2247 * Delete all the threads in the system
2248 * ------------------------------------------------------------------------- */
2249
2250 static void
2251 deleteAllThreads ( Capability *cap )
2252 {
2253 // NOTE: only safe to call if we own all capabilities.
2254
2255 StgTSO* t, *next;
2256 uint32_t g;
2257
2258 debugTrace(DEBUG_sched,"deleting all threads");
2259 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2260 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2261 next = t->global_link;
2262 deleteThread(cap,t);
2263 }
2264 }
2265
2266 // The run queue now contains a bunch of ThreadKilled threads. We
2267 // must not throw these away: the main thread(s) will be in there
2268 // somewhere, and the main scheduler loop has to deal with it.
2269 // Also, the run queue is the only thing keeping these threads from
2270 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2271
2272 #if !defined(THREADED_RTS)
2273 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2274 ASSERT(sleeping_queue == END_TSO_QUEUE);
2275 #endif
2276 }
2277
2278 /* -----------------------------------------------------------------------------
2279 Managing the suspended_ccalls list.
2280 Locks required: sched_mutex
2281 -------------------------------------------------------------------------- */
2282
2283 STATIC_INLINE void
2284 suspendTask (Capability *cap, Task *task)
2285 {
2286 InCall *incall;
2287
2288 incall = task->incall;
2289 ASSERT(incall->next == NULL && incall->prev == NULL);
2290 incall->next = cap->suspended_ccalls;
2291 incall->prev = NULL;
2292 if (cap->suspended_ccalls) {
2293 cap->suspended_ccalls->prev = incall;
2294 }
2295 cap->suspended_ccalls = incall;
2296 cap->n_suspended_ccalls++;
2297 }
2298
2299 STATIC_INLINE void
2300 recoverSuspendedTask (Capability *cap, Task *task)
2301 {
2302 InCall *incall;
2303
2304 incall = task->incall;
2305 if (incall->prev) {
2306 incall->prev->next = incall->next;
2307 } else {
2308 ASSERT(cap->suspended_ccalls == incall);
2309 cap->suspended_ccalls = incall->next;
2310 }
2311 if (incall->next) {
2312 incall->next->prev = incall->prev;
2313 }
2314 incall->next = incall->prev = NULL;
2315 cap->n_suspended_ccalls--;
2316 }
2317
2318 /* ---------------------------------------------------------------------------
2319 * Suspending & resuming Haskell threads.
2320 *
2321 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2322 * its capability before calling the C function. This allows another
2323 * task to pick up the capability and carry on running Haskell
2324 * threads. It also means that if the C call blocks, it won't lock
2325 * the whole system.
2326 *
2327 * The Haskell thread making the C call is put to sleep for the
2328 * duration of the call, on the suspended_ccalling_threads queue. We
2329 * give out a token to the task, which it can use to resume the thread
2330 * on return from the C function.
2331 *
2332 * If this is an interruptible C call, this means that the FFI call may be
2333 * unceremoniously terminated and should be scheduled on an
2334 * unbound worker thread.
2335 * ------------------------------------------------------------------------- */
2336
2337 void *
2338 suspendThread (StgRegTable *reg, rtsBool interruptible)
2339 {
2340 Capability *cap;
2341 int saved_errno;
2342 StgTSO *tso;
2343 Task *task;
2344 #if mingw32_HOST_OS
2345 StgWord32 saved_winerror;
2346 #endif
2347
2348 saved_errno = errno;
2349 #if mingw32_HOST_OS
2350 saved_winerror = GetLastError();
2351 #endif
2352
2353 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2354 */
2355 cap = regTableToCapability(reg);
2356
2357 task = cap->running_task;
2358 tso = cap->r.rCurrentTSO;
2359
2360 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2361
2362 // XXX this might not be necessary --SDM
2363 tso->what_next = ThreadRunGHC;
2364
2365 threadPaused(cap,tso);
2366
2367 if (interruptible) {
2368 tso->why_blocked = BlockedOnCCall_Interruptible;
2369 } else {
2370 tso->why_blocked = BlockedOnCCall;
2371 }
2372
2373 // Hand back capability
2374 task->incall->suspended_tso = tso;
2375 task->incall->suspended_cap = cap;
2376
2377 // Otherwise allocate() will write to invalid memory.
2378 cap->r.rCurrentTSO = NULL;
2379
2380 ACQUIRE_LOCK(&cap->lock);
2381
2382 suspendTask(cap,task);
2383 cap->in_haskell = rtsFalse;
2384 releaseCapability_(cap,rtsFalse);
2385
2386 RELEASE_LOCK(&cap->lock);
2387
2388 errno = saved_errno;
2389 #if mingw32_HOST_OS
2390 SetLastError(saved_winerror);
2391 #endif
2392 return task;
2393 }
2394
2395 StgRegTable *
2396 resumeThread (void *task_)
2397 {
2398 StgTSO *tso;
2399 InCall *incall;
2400 Capability *cap;
2401 Task *task = task_;
2402 int saved_errno;
2403 #if mingw32_HOST_OS
2404 StgWord32 saved_winerror;
2405 #endif
2406
2407 saved_errno = errno;
2408 #if mingw32_HOST_OS
2409 saved_winerror = GetLastError();
2410 #endif
2411
2412 incall = task->incall;
2413 cap = incall->suspended_cap;
2414 task->cap = cap;
2415
2416 // Wait for permission to re-enter the RTS with the result.
2417 waitForCapability(&cap,task);
2418 // we might be on a different capability now... but if so, our
2419 // entry on the suspended_ccalls list will also have been
2420 // migrated.
2421
2422 // Remove the thread from the suspended list
2423 recoverSuspendedTask(cap,task);
2424
2425 tso = incall->suspended_tso;
2426 incall->suspended_tso = NULL;
2427 incall->suspended_cap = NULL;
2428 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2429
2430 traceEventRunThread(cap, tso);
2431
2432 /* Reset blocking status */
2433 tso->why_blocked = NotBlocked;
2434
2435 if ((tso->flags & TSO_BLOCKEX) == 0) {
2436 // avoid locking the TSO if we don't have to
2437 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2438 maybePerformBlockedException(cap,tso);
2439 }
2440 }
2441
2442 cap->r.rCurrentTSO = tso;
2443 cap->in_haskell = rtsTrue;
2444 errno = saved_errno;
2445 #if mingw32_HOST_OS
2446 SetLastError(saved_winerror);
2447 #endif
2448
2449 /* We might have GC'd, mark the TSO dirty again */
2450 dirty_TSO(cap,tso);
2451 dirty_STACK(cap,tso->stackobj);
2452
2453 IF_DEBUG(sanity, checkTSO(tso));
2454
2455 return &cap->r;
2456 }
2457
2458 /* ---------------------------------------------------------------------------
2459 * scheduleThread()
2460 *
2461 * scheduleThread puts a thread on the end of the runnable queue.
2462 * This will usually be done immediately after a thread is created.
2463 * The caller of scheduleThread must create the thread using e.g.
2464 * createThread and push an appropriate closure
2465 * on this thread's stack before the scheduler is invoked.
2466 * ------------------------------------------------------------------------ */
2467
2468 void
2469 scheduleThread(Capability *cap, StgTSO *tso)
2470 {
2471 // The thread goes at the *end* of the run-queue, to avoid possible
2472 // starvation of any threads already on the queue.
2473 appendToRunQueue(cap,tso);
2474 }
2475
2476 void
2477 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2478 {
2479 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2480 // move this thread from now on.
2481 #if defined(THREADED_RTS)
2482 cpu %= enabled_capabilities;
2483 if (cpu == cap->no) {
2484 appendToRunQueue(cap,tso);
2485 } else {
2486 migrateThread(cap, tso, capabilities[cpu]);
2487 }
2488 #else
2489 appendToRunQueue(cap,tso);
2490 #endif
2491 }
2492
2493 void
2494 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2495 {
2496 Task *task;
2497 DEBUG_ONLY( StgThreadID id );
2498 Capability *cap;
2499
2500 cap = *pcap;
2501
2502 // We already created/initialised the Task
2503 task = cap->running_task;
2504
2505 // This TSO is now a bound thread; make the Task and TSO
2506 // point to each other.
2507 tso->bound = task->incall;
2508 tso->cap = cap;
2509
2510 task->incall->tso = tso;
2511 task->incall->ret = ret;
2512 task->incall->rstat = NoStatus;
2513
2514 appendToRunQueue(cap,tso);
2515
2516 DEBUG_ONLY( id = tso->id );
2517 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2518
2519 cap = schedule(cap,task);
2520
2521 ASSERT(task->incall->rstat != NoStatus);
2522 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2523
2524 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2525 *pcap = cap;
2526 }
2527
2528 /* ----------------------------------------------------------------------------
2529 * Starting Tasks
2530 * ------------------------------------------------------------------------- */
2531
2532 #if defined(THREADED_RTS)
2533 void scheduleWorker (Capability *cap, Task *task)
2534 {
2535 // schedule() runs without a lock.
2536 cap = schedule(cap,task);
2537
2538 // On exit from schedule(), we have a Capability, but possibly not
2539 // the same one we started with.
2540
2541 // During shutdown, the requirement is that after all the
2542 // Capabilities are shut down, all workers that are shutting down
2543 // have finished workerTaskStop(). This is why we hold on to
2544 // cap->lock until we've finished workerTaskStop() below.
2545 //
2546 // There may be workers still involved in foreign calls; those
2547 // will just block in waitForCapability() because the
2548 // Capability has been shut down.
2549 //
2550 ACQUIRE_LOCK(&cap->lock);
2551 releaseCapability_(cap,rtsFalse);
2552 workerTaskStop(task);
2553 RELEASE_LOCK(&cap->lock);
2554 }
2555 #endif
2556
2557 /* ---------------------------------------------------------------------------
2558 * Start new worker tasks on Capabilities from--to
2559 * -------------------------------------------------------------------------- */
2560
2561 static void
2562 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2563 {
2564 #if defined(THREADED_RTS)
2565 uint32_t i;
2566 Capability *cap;
2567
2568 for (i = from; i < to; i++) {
2569 cap = capabilities[i];
2570 ACQUIRE_LOCK(&cap->lock);
2571 startWorkerTask(cap);
2572 RELEASE_LOCK(&cap->lock);
2573 }
2574 #endif
2575 }
2576
2577 /* ---------------------------------------------------------------------------
2578 * initScheduler()
2579 *
2580 * Initialise the scheduler. This resets all the queues - if the
2581 * queues contained any threads, they'll be garbage collected at the
2582 * next pass.
2583 *
2584 * ------------------------------------------------------------------------ */
2585
2586 void
2587 initScheduler(void)
2588 {
2589 #if !defined(THREADED_RTS)
2590 blocked_queue_hd = END_TSO_QUEUE;
2591 blocked_queue_tl = END_TSO_QUEUE;
2592 sleeping_queue = END_TSO_QUEUE;
2593 #endif
2594
2595 sched_state = SCHED_RUNNING;
2596 recent_activity = ACTIVITY_YES;
2597
2598 #if defined(THREADED_RTS)
2599 /* Initialise the mutex and condition variables used by
2600 * the scheduler. */
2601 initMutex(&sched_mutex);
2602 #endif
2603
2604 ACQUIRE_LOCK(&sched_mutex);
2605
2606 /* A capability holds the state a native thread needs in
2607 * order to execute STG code. At least one capability is
2608 * floating around (only THREADED_RTS builds have more than one).
2609 */
2610 initCapabilities();
2611
2612 initTaskManager();
2613
2614 /*
2615 * Eagerly start one worker to run each Capability, except for
2616 * Capability 0. The idea is that we're probably going to start a
2617 * bound thread on Capability 0 pretty soon, so we don't want a
2618 * worker task hogging it.
2619 */
2620 startWorkerTasks(1, n_capabilities);
2621
2622 RELEASE_LOCK(&sched_mutex);
2623
2624 }
2625
2626 void
2627 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2628 /* see Capability.c, shutdownCapability() */
2629 {
2630 Task *task = NULL;
2631
2632 task = newBoundTask();
2633
2634 // If we haven't killed all the threads yet, do it now.
2635 if (sched_state < SCHED_SHUTTING_DOWN) {
2636 sched_state = SCHED_INTERRUPTING;
2637 Capability *cap = task->cap;
2638 waitForCapability(&cap,task);
2639 scheduleDoGC(&cap,task,rtsTrue);
2640 ASSERT(task->incall->tso == NULL);
2641 releaseCapability(cap);
2642 }
2643 sched_state = SCHED_SHUTTING_DOWN;
2644
2645 shutdownCapabilities(task, wait_foreign);
2646
2647 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2648 // n_failed_trygrab_idles, n_idle_caps);
2649
2650 boundTaskExiting(task);
2651 }
2652
2653 void
2654 freeScheduler( void )
2655 {
2656 uint32_t still_running;
2657
2658 ACQUIRE_LOCK(&sched_mutex);
2659 still_running = freeTaskManager();
2660 // We can only free the Capabilities if there are no Tasks still
2661 // running. We might have a Task about to return from a foreign
2662 // call into waitForCapability(), for example (actually,
2663 // this should be the *only* thing that a still-running Task can
2664 // do at this point, and it will block waiting for the
2665 // Capability).
2666 if (still_running == 0) {
2667 freeCapabilities();
2668 }
2669 RELEASE_LOCK(&sched_mutex);
2670 #if defined(THREADED_RTS)
2671 closeMutex(&sched_mutex);
2672 #endif
2673 }
2674
2675 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2676 void *user USED_IF_NOT_THREADS)
2677 {
2678 #if !defined(THREADED_RTS)
2679 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2680 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2681 evac(user, (StgClosure **)(void *)&sleeping_queue);
2682 #endif
2683 }
2684
2685 /* -----------------------------------------------------------------------------
2686 performGC
2687
2688 This is the interface to the garbage collector from Haskell land.
2689 We provide this so that external C code can allocate and garbage
2690 collect when called from Haskell via _ccall_GC.
2691 -------------------------------------------------------------------------- */
2692
2693 static void
2694 performGC_(rtsBool force_major)
2695 {
2696 Task *task;
2697 Capability *cap = NULL;
2698
2699 // We must grab a new Task here, because the existing Task may be
2700 // associated with a particular Capability, and chained onto the
2701 // suspended_ccalls queue.
2702 task = newBoundTask();
2703
2704 // TODO: do we need to traceTask*() here?
2705
2706 waitForCapability(&cap,task);
2707 scheduleDoGC(&cap,task,force_major);
2708 releaseCapability(cap);
2709 boundTaskExiting(task);
2710 }
2711
2712 void
2713 performGC(void)
2714 {
2715 performGC_(rtsFalse);
2716 }
2717
2718 void
2719 performMajorGC(void)
2720 {
2721 performGC_(rtsTrue);
2722 }
2723
2724 /* ---------------------------------------------------------------------------
2725 Interrupt execution.
2726 Might be called inside a signal handler so it mustn't do anything fancy.
2727 ------------------------------------------------------------------------ */
2728
2729 void
2730 interruptStgRts(void)
2731 {
2732 sched_state = SCHED_INTERRUPTING;
2733 interruptAllCapabilities();
2734 #if defined(THREADED_RTS)
2735 wakeUpRts();
2736 #endif
2737 }
2738
2739 /* -----------------------------------------------------------------------------
2740 Wake up the RTS
2741
2742 This function causes at least one OS thread to wake up and run the
2743 scheduler loop. It is invoked when the RTS might be deadlocked, or
2744 an external event has arrived that may need servicing (eg. a
2745 keyboard interrupt).
2746
2747 In the single-threaded RTS we don't do anything here; we only have
2748 one thread anyway, and the event that caused us to want to wake up
2749 will have interrupted any blocking system call in progress anyway.
2750 -------------------------------------------------------------------------- */
2751
2752 #if defined(THREADED_RTS)
2753 void wakeUpRts(void)
2754 {
2755 // This forces the IO Manager thread to wakeup, which will
2756 // in turn ensure that some OS thread wakes up and runs the
2757 // scheduler loop, which will cause a GC and deadlock check.
2758 ioManagerWakeup();
2759 }
2760 #endif
2761
2762 /* -----------------------------------------------------------------------------
2763 Deleting threads
2764
2765 This is used for interruption (^C) and forking, and corresponds to
2766 raising an exception but without letting the thread catch the
2767 exception.
2768 -------------------------------------------------------------------------- */
2769
2770 static void
2771 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2772 {
2773 // NOTE: must only be called on a TSO that we have exclusive
2774 // access to, because we will call throwToSingleThreaded() below.
2775 // The TSO must be on the run queue of the Capability we own, or
2776 // we must own all Capabilities.
2777
2778 if (tso->why_blocked != BlockedOnCCall &&
2779 tso->why_blocked != BlockedOnCCall_Interruptible) {
2780 throwToSingleThreaded(tso->cap,tso,NULL);
2781 }
2782 }
2783
2784 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2785 static void
2786 deleteThread_(Capability *cap, StgTSO *tso)
2787 { // for forkProcess only:
2788 // like deleteThread(), but we delete threads in foreign calls, too.
2789
2790 if (tso->why_blocked == BlockedOnCCall ||
2791 tso->why_blocked == BlockedOnCCall_Interruptible) {
2792 tso->what_next = ThreadKilled;
2793 appendToRunQueue(tso->cap, tso);
2794 } else {
2795 deleteThread(cap,tso);
2796 }
2797 }
2798 #endif
2799
2800 /* -----------------------------------------------------------------------------
2801 raiseExceptionHelper
2802
2803 This function is called by the raise# primitve, just so that we can
2804 move some of the tricky bits of raising an exception from C-- into
2805 C. Who knows, it might be a useful re-useable thing here too.
2806 -------------------------------------------------------------------------- */
2807
2808 StgWord
2809 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2810 {
2811 Capability *cap = regTableToCapability(reg);
2812 StgThunk *raise_closure = NULL;
2813 StgPtr p, next;
2814 const StgRetInfoTable *info;
2815 //
2816 // This closure represents the expression 'raise# E' where E
2817 // is the exception raise. It is used to overwrite all the
2818 // thunks which are currently under evaluataion.
2819 //
2820
2821 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2822 // LDV profiling: stg_raise_info has THUNK as its closure
2823 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2824 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2825 // 1 does not cause any problem unless profiling is performed.
2826 // However, when LDV profiling goes on, we need to linearly scan
2827 // small object pool, where raise_closure is stored, so we should
2828 // use MIN_UPD_SIZE.
2829 //
2830 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2831 // sizeofW(StgClosure)+1);
2832 //
2833
2834 //
2835 // Walk up the stack, looking for the catch frame. On the way,
2836 // we update any closures pointed to from update frames with the
2837 // raise closure that we just built.
2838 //
2839 p = tso->stackobj->sp;
2840 while(1) {
2841 info = get_ret_itbl((StgClosure *)p);
2842 next = p + stack_frame_sizeW((StgClosure *)p);
2843 switch (info->i.type) {
2844
2845 case UPDATE_FRAME:
2846 // Only create raise_closure if we need to.
2847 if (raise_closure == NULL) {
2848 raise_closure =
2849 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2850 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2851 raise_closure->payload[0] = exception;
2852 }
2853 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2854 (StgClosure *)raise_closure);
2855 p = next;
2856 continue;
2857
2858 case ATOMICALLY_FRAME:
2859 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2860 tso->stackobj->sp = p;
2861 return ATOMICALLY_FRAME;
2862
2863 case CATCH_FRAME:
2864 tso->stackobj->sp = p;
2865 return CATCH_FRAME;
2866
2867 case CATCH_STM_FRAME:
2868 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2869 tso->stackobj->sp = p;
2870 return CATCH_STM_FRAME;
2871
2872 case UNDERFLOW_FRAME:
2873 tso->stackobj->sp = p;
2874 threadStackUnderflow(cap,tso);
2875 p = tso->stackobj->sp;
2876 continue;
2877
2878 case STOP_FRAME:
2879 tso->stackobj->sp = p;
2880 return STOP_FRAME;
2881
2882 case CATCH_RETRY_FRAME: {
2883 StgTRecHeader *trec = tso -> trec;
2884 StgTRecHeader *outer = trec -> enclosing_trec;
2885 debugTrace(DEBUG_stm,
2886 "found CATCH_RETRY_FRAME at %p during raise", p);
2887 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2888 stmAbortTransaction(cap, trec);
2889 stmFreeAbortedTRec(cap, trec);
2890 tso -> trec = outer;
2891 p = next;
2892 continue;
2893 }
2894
2895 default:
2896 p = next;
2897 continue;
2898 }
2899 }
2900 }
2901
2902
2903 /* -----------------------------------------------------------------------------
2904 findRetryFrameHelper
2905
2906 This function is called by the retry# primitive. It traverses the stack
2907 leaving tso->sp referring to the frame which should handle the retry.
2908
2909 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2910 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2911
2912 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2913 create) because retries are not considered to be exceptions, despite the
2914 similar implementation.
2915
2916 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2917 not be created within memory transactions.
2918 -------------------------------------------------------------------------- */
2919
2920 StgWord
2921 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2922 {
2923 const StgRetInfoTable *info;
2924 StgPtr p, next;
2925
2926 p = tso->stackobj->sp;
2927 while (1) {
2928 info = get_ret_itbl((const StgClosure *)p);
2929 next = p + stack_frame_sizeW((StgClosure *)p);
2930 switch (info->i.type) {
2931
2932 case ATOMICALLY_FRAME:
2933 debugTrace(DEBUG_stm,
2934 "found ATOMICALLY_FRAME at %p during retry", p);
2935 tso->stackobj->sp = p;
2936 return ATOMICALLY_FRAME;
2937
2938 case CATCH_RETRY_FRAME:
2939 debugTrace(DEBUG_stm,
2940 "found CATCH_RETRY_FRAME at %p during retry", p);
2941 tso->stackobj->sp = p;
2942 return CATCH_RETRY_FRAME;
2943
2944 case CATCH_STM_FRAME: {
2945 StgTRecHeader *trec = tso -> trec;
2946 StgTRecHeader *outer = trec -> enclosing_trec;
2947 debugTrace(DEBUG_stm,
2948 "found CATCH_STM_FRAME at %p during retry", p);
2949 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2950 stmAbortTransaction(cap, trec);
2951 stmFreeAbortedTRec(cap, trec);
2952 tso -> trec = outer;
2953 p = next;
2954 continue;
2955 }
2956
2957 case UNDERFLOW_FRAME:
2958 tso->stackobj->sp = p;
2959 threadStackUnderflow(cap,tso);
2960 p = tso->stackobj->sp;
2961 continue;
2962
2963 default:
2964 ASSERT(info->i.type != CATCH_FRAME);
2965 ASSERT(info->i.type != STOP_FRAME);
2966 p = next;
2967 continue;
2968 }
2969 }
2970 }
2971
2972 /* -----------------------------------------------------------------------------
2973 resurrectThreads is called after garbage collection on the list of
2974 threads found to be garbage. Each of these threads will be woken
2975 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2976 on an MVar, or NonTermination if the thread was blocked on a Black
2977 Hole.
2978
2979 Locks: assumes we hold *all* the capabilities.
2980 -------------------------------------------------------------------------- */
2981
2982 void
2983 resurrectThreads (StgTSO *threads)
2984 {
2985 StgTSO *tso, *next;
2986 Capability *cap;
2987 generation *gen;
2988
2989 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2990 next = tso->global_link;
2991
2992 gen = Bdescr((P_)tso)->gen;
2993 tso->global_link = gen->threads;
2994 gen->threads = tso;
2995
2996 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2997
2998 // Wake up the thread on the Capability it was last on
2999 cap = tso->cap;
3000
3001 switch (tso->why_blocked) {
3002 case BlockedOnMVar:
3003 case BlockedOnMVarRead:
3004 /* Called by GC - sched_mutex lock is currently held. */
3005 throwToSingleThreaded(cap, tso,
3006 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3007 break;
3008 case BlockedOnBlackHole:
3009 throwToSingleThreaded(cap, tso,
3010 (StgClosure *)nonTermination_closure);
3011 break;
3012 case BlockedOnSTM:
3013 throwToSingleThreaded(cap, tso,
3014 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3015 break;
3016 case NotBlocked:
3017 /* This might happen if the thread was blocked on a black hole
3018 * belonging to a thread that we've just woken up (raiseAsync
3019 * can wake up threads, remember...).
3020 */
3021 continue;
3022 case BlockedOnMsgThrowTo:
3023 // This can happen if the target is masking, blocks on a
3024 // black hole, and then is found to be unreachable. In
3025 // this case, we want to let the target wake up and carry
3026 // on, and do nothing to this thread.
3027 continue;
3028 default:
3029 barf("resurrectThreads: thread blocked in a strange way: %d",
3030 tso->why_blocked);
3031 }
3032 }
3033 }