Disable aging when doing deadlock detection GC
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47 #include "sm/NonMoving.h"
48 #include "sm/NonMovingMark.h"
49
50 #if defined(HAVE_SYS_TYPES_H)
51 #include <sys/types.h>
52 #endif
53 #if defined(HAVE_UNISTD_H)
54 #include <unistd.h>
55 #endif
56
57 #include <string.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60
61 #if defined(HAVE_ERRNO_H)
62 #include <errno.h>
63 #endif
64
65 #if defined(TRACING)
66 #include "eventlog/EventLog.h"
67 #endif
68 /* -----------------------------------------------------------------------------
69 * Global variables
70 * -------------------------------------------------------------------------- */
71
72 #if !defined(THREADED_RTS)
73 // Blocked/sleeping threads
74 StgTSO *blocked_queue_hd = NULL;
75 StgTSO *blocked_queue_tl = NULL;
76 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
77 #endif
78
79 // Bytes allocated since the last time a HeapOverflow exception was thrown by
80 // the RTS
81 uint64_t allocated_bytes_at_heapoverflow = 0;
82
83 /* Set to true when the latest garbage collection failed to reclaim enough
84 * space, and the runtime should proceed to shut itself down in an orderly
85 * fashion (emitting profiling info etc.), OR throw an exception to the main
86 * thread, if it is still alive.
87 */
88 bool heap_overflow = false;
89
90 /* flag that tracks whether we have done any execution in this time slice.
91 * LOCK: currently none, perhaps we should lock (but needs to be
92 * updated in the fast path of the scheduler).
93 *
94 * NB. must be StgWord, we do xchg() on it.
95 */
96 volatile StgWord recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes monotonically)
100 */
101 volatile StgWord sched_state = SCHED_RUNNING;
102
103 /*
104 * This mutex protects most of the global scheduler data in
105 * the THREADED_RTS runtime.
106 */
107 #if defined(THREADED_RTS)
108 Mutex sched_mutex;
109 #endif
110
111 #if !defined(mingw32_HOST_OS)
112 #define FORKPROCESS_PRIMOP_SUPPORTED
113 #endif
114
115 /*
116 * sync_finished_cond allows threads which do not own any capability (e.g. the
117 * concurrent mark thread) to participate in the sync protocol. In particular,
118 * if such a thread requests a sync while sync is already in progress it will
119 * block on sync_finished_cond, which will be signalled when the sync is
120 * finished (by releaseAllCapabilities).
121 */
122 #if defined(THREADED_RTS)
123 static Condition sync_finished_cond;
124 static Mutex sync_finished_mutex;
125 #endif
126
127
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These functions all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability **pcap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 #if defined(THREADED_RTS)
145 static bool requestSync (Capability **pcap, Task *task,
146 PendingSync *sync_type, SyncType *prev_sync_type);
147 static void acquireAllCapabilities(Capability *cap, Task *task);
148 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
149 uint32_t to USED_IF_THREADS);
150 #endif
151 static void scheduleStartSignalHandlers (Capability *cap);
152 static void scheduleCheckBlockedThreads (Capability *cap);
153 static void scheduleProcessInbox(Capability **cap);
154 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
155 static void schedulePushWork(Capability *cap, Task *task);
156 #if defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
162 uint32_t prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
165 StgTSO *t );
166 static bool scheduleNeedHeapProfile(bool ready_to_gc);
167 static void scheduleDoGC( Capability **pcap, Task *task,
168 bool force_major, bool deadlock_detect );
169
170 static void deleteThread (StgTSO *tso);
171 static void deleteAllThreads (void);
172
173 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
174 static void deleteThread_(StgTSO *tso);
175 #endif
176
177 /* ---------------------------------------------------------------------------
178 Main scheduling loop.
179
180 We use round-robin scheduling, each thread returning to the
181 scheduler loop when one of these conditions is detected:
182
183 * out of heap space
184 * timer expires (thread yields)
185 * thread blocks
186 * thread ends
187 * stack overflow
188
189 ------------------------------------------------------------------------ */
190
191 static Capability *
192 schedule (Capability *initialCapability, Task *task)
193 {
194 StgTSO *t;
195 Capability *cap;
196 StgThreadReturnCode ret;
197 uint32_t prev_what_next;
198 bool ready_to_gc;
199
200 cap = initialCapability;
201
202 // Pre-condition: this task owns initialCapability.
203 // The sched_mutex is *NOT* held
204 // NB. on return, we still hold a capability.
205
206 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
207
208 schedulePreLoop();
209
210 // -----------------------------------------------------------
211 // Scheduler loop starts here:
212
213 while (1) {
214
215 // Check whether we have re-entered the RTS from Haskell without
216 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
217 // call).
218 if (cap->in_haskell) {
219 errorBelch("schedule: re-entered unsafely.\n"
220 " Perhaps a 'foreign import unsafe' should be 'safe'?");
221 stg_exit(EXIT_FAILURE);
222 }
223
224 // Note [shutdown]: The interruption / shutdown sequence.
225 //
226 // In order to cleanly shut down the runtime, we want to:
227 // * make sure that all main threads return to their callers
228 // with the state 'Interrupted'.
229 // * clean up all OS threads assocated with the runtime
230 // * free all memory etc.
231 //
232 // So the sequence goes like this:
233 //
234 // * The shutdown sequence is initiated by calling hs_exit(),
235 // interruptStgRts(), or running out of memory in the GC.
236 //
237 // * Set sched_state = SCHED_INTERRUPTING
238 //
239 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
240 // scheduleDoGC(), which halts the whole runtime by acquiring all the
241 // capabilities, does a GC and then calls deleteAllThreads() to kill all
242 // the remaining threads. The zombies are left on the run queue for
243 // cleaning up. We can't kill threads involved in foreign calls.
244 //
245 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
246 //
247 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
248 // enforced by the scheduler, which won't run any Haskell code when
249 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
250 // other capabilities by doing the GC earlier.
251 //
252 // * all workers exit when the run queue on their capability
253 // drains. All main threads will also exit when their TSO
254 // reaches the head of the run queue and they can return.
255 //
256 // * eventually all Capabilities will shut down, and the RTS can
257 // exit.
258 //
259 // * We might be left with threads blocked in foreign calls,
260 // we should really attempt to kill these somehow (TODO).
261
262 switch (sched_state) {
263 case SCHED_RUNNING:
264 break;
265 case SCHED_INTERRUPTING:
266 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
267 /* scheduleDoGC() deletes all the threads */
268 scheduleDoGC(&cap,task,true,false);
269
270 // after scheduleDoGC(), we must be shutting down. Either some
271 // other Capability did the final GC, or we did it above,
272 // either way we can fall through to the SCHED_SHUTTING_DOWN
273 // case now.
274 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
275 // fall through
276
277 case SCHED_SHUTTING_DOWN:
278 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
279 // If we are a worker, just exit. If we're a bound thread
280 // then we will exit below when we've removed our TSO from
281 // the run queue.
282 if (!isBoundTask(task) && emptyRunQueue(cap)) {
283 return cap;
284 }
285 break;
286 default:
287 barf("sched_state: %" FMT_Word, sched_state);
288 }
289
290 scheduleFindWork(&cap);
291
292 /* work pushing, currently relevant only for THREADED_RTS:
293 (pushes threads, wakes up idle capabilities for stealing) */
294 schedulePushWork(cap,task);
295
296 scheduleDetectDeadlock(&cap,task);
297
298 // Normally, the only way we can get here with no threads to
299 // run is if a keyboard interrupt received during
300 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
301 // Additionally, it is not fatal for the
302 // threaded RTS to reach here with no threads to run.
303 //
304 // win32: might be here due to awaitEvent() being abandoned
305 // as a result of a console event having been delivered.
306
307 #if defined(THREADED_RTS)
308 scheduleYield(&cap,task);
309
310 if (emptyRunQueue(cap)) continue; // look for work again
311 #endif
312
313 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
314 if ( emptyRunQueue(cap) ) {
315 ASSERT(sched_state >= SCHED_INTERRUPTING);
316 }
317 #endif
318
319 //
320 // Get a thread to run
321 //
322 t = popRunQueue(cap);
323
324 // Sanity check the thread we're about to run. This can be
325 // expensive if there is lots of thread switching going on...
326 IF_DEBUG(sanity,checkTSO(t));
327
328 #if defined(THREADED_RTS)
329 // Check whether we can run this thread in the current task.
330 // If not, we have to pass our capability to the right task.
331 {
332 InCall *bound = t->bound;
333
334 if (bound) {
335 if (bound->task == task) {
336 // yes, the Haskell thread is bound to the current native thread
337 } else {
338 debugTrace(DEBUG_sched,
339 "thread %lu bound to another OS thread",
340 (unsigned long)t->id);
341 // no, bound to a different Haskell thread: pass to that thread
342 pushOnRunQueue(cap,t);
343 continue;
344 }
345 } else {
346 // The thread we want to run is unbound.
347 if (task->incall->tso) {
348 debugTrace(DEBUG_sched,
349 "this OS thread cannot run thread %lu",
350 (unsigned long)t->id);
351 // no, the current native thread is bound to a different
352 // Haskell thread, so pass it to any worker thread
353 pushOnRunQueue(cap,t);
354 continue;
355 }
356 }
357 }
358 #endif
359
360 // If we're shutting down, and this thread has not yet been
361 // killed, kill it now. This sometimes happens when a finalizer
362 // thread is created by the final GC, or a thread previously
363 // in a foreign call returns.
364 if (sched_state >= SCHED_INTERRUPTING &&
365 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
366 deleteThread(t);
367 }
368
369 // If this capability is disabled, migrate the thread away rather
370 // than running it. NB. but not if the thread is bound: it is
371 // really hard for a bound thread to migrate itself. Believe me,
372 // I tried several ways and couldn't find a way to do it.
373 // Instead, when everything is stopped for GC, we migrate all the
374 // threads on the run queue then (see scheduleDoGC()).
375 //
376 // ToDo: what about TSO_LOCKED? Currently we're migrating those
377 // when the number of capabilities drops, but we never migrate
378 // them back if it rises again. Presumably we should, but after
379 // the thread has been migrated we no longer know what capability
380 // it was originally on.
381 #if defined(THREADED_RTS)
382 if (cap->disabled && !t->bound) {
383 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
384 migrateThread(cap, t, dest_cap);
385 continue;
386 }
387 #endif
388
389 /* context switches are initiated by the timer signal, unless
390 * the user specified "context switch as often as possible", with
391 * +RTS -C0
392 */
393 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
394 && !emptyThreadQueues(cap)) {
395 cap->context_switch = 1;
396 }
397
398 run_thread:
399
400 // CurrentTSO is the thread to run. It might be different if we
401 // loop back to run_thread, so make sure to set CurrentTSO after
402 // that.
403 cap->r.rCurrentTSO = t;
404
405 startHeapProfTimer();
406
407 // ----------------------------------------------------------------------
408 // Run the current thread
409
410 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411 ASSERT(t->cap == cap);
412 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
413
414 prev_what_next = t->what_next;
415
416 errno = t->saved_errno;
417 #if defined(mingw32_HOST_OS)
418 SetLastError(t->saved_winerror);
419 #endif
420
421 // reset the interrupt flag before running Haskell code
422 cap->interrupt = 0;
423
424 cap->in_haskell = true;
425 cap->idle = 0;
426
427 dirty_TSO(cap,t);
428 dirty_STACK(cap,t->stackobj);
429
430 switch (recent_activity)
431 {
432 case ACTIVITY_DONE_GC: {
433 // ACTIVITY_DONE_GC means we turned off the timer signal to
434 // conserve power (see #1623). Re-enable it here.
435 uint32_t prev;
436 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
437 if (prev == ACTIVITY_DONE_GC) {
438 #if !defined(PROFILING)
439 startTimer();
440 #endif
441 }
442 break;
443 }
444 case ACTIVITY_INACTIVE:
445 // If we reached ACTIVITY_INACTIVE, then don't reset it until
446 // we've done the GC. The thread running here might just be
447 // the IO manager thread that handle_tick() woke up via
448 // wakeUpRts().
449 break;
450 default:
451 recent_activity = ACTIVITY_YES;
452 }
453
454 traceEventRunThread(cap, t);
455
456 switch (prev_what_next) {
457
458 case ThreadKilled:
459 case ThreadComplete:
460 /* Thread already finished, return to scheduler. */
461 ret = ThreadFinished;
462 break;
463
464 case ThreadRunGHC:
465 {
466 StgRegTable *r;
467 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
468 cap = regTableToCapability(r);
469 ret = r->rRet;
470 break;
471 }
472
473 case ThreadInterpret:
474 cap = interpretBCO(cap);
475 ret = cap->r.rRet;
476 break;
477
478 default:
479 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
480 }
481
482 cap->in_haskell = false;
483
484 // The TSO might have moved, eg. if it re-entered the RTS and a GC
485 // happened. So find the new location:
486 t = cap->r.rCurrentTSO;
487
488 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
489 // don't want it set when not running a Haskell thread.
490 cap->r.rCurrentTSO = NULL;
491
492 // And save the current errno in this thread.
493 // XXX: possibly bogus for SMP because this thread might already
494 // be running again, see code below.
495 t->saved_errno = errno;
496 #if defined(mingw32_HOST_OS)
497 // Similarly for Windows error code
498 t->saved_winerror = GetLastError();
499 #endif
500
501 if (ret == ThreadBlocked) {
502 if (t->why_blocked == BlockedOnBlackHole) {
503 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
504 traceEventStopThread(cap, t, t->why_blocked + 6,
505 owner != NULL ? owner->id : 0);
506 } else {
507 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
508 }
509 } else {
510 if (ret == StackOverflow) {
511 traceEventStopThread(cap, t, ret, t->tot_stack_size);
512 } else {
513 traceEventStopThread(cap, t, ret, 0);
514 }
515 }
516
517 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
518 ASSERT(t->cap == cap);
519
520 // ----------------------------------------------------------------------
521
522 // Costs for the scheduler are assigned to CCS_SYSTEM
523 stopHeapProfTimer();
524 #if defined(PROFILING)
525 cap->r.rCCCS = CCS_SYSTEM;
526 #endif
527
528 schedulePostRunThread(cap,t);
529
530 ready_to_gc = false;
531
532 switch (ret) {
533 case HeapOverflow:
534 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
535 break;
536
537 case StackOverflow:
538 // just adjust the stack for this thread, then pop it back
539 // on the run queue.
540 threadStackOverflow(cap, t);
541 pushOnRunQueue(cap,t);
542 break;
543
544 case ThreadYielding:
545 if (scheduleHandleYield(cap, t, prev_what_next)) {
546 // shortcut for switching between compiler/interpreter:
547 goto run_thread;
548 }
549 break;
550
551 case ThreadBlocked:
552 scheduleHandleThreadBlocked(t);
553 break;
554
555 case ThreadFinished:
556 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
557 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
558 break;
559
560 default:
561 barf("schedule: invalid thread return code %d", (int)ret);
562 }
563
564 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
565 scheduleDoGC(&cap,task,false,false);
566 }
567 } /* end of while() */
568 }
569
570 /* -----------------------------------------------------------------------------
571 * Run queue operations
572 * -------------------------------------------------------------------------- */
573
574 static void
575 removeFromRunQueue (Capability *cap, StgTSO *tso)
576 {
577 if (tso->block_info.prev == END_TSO_QUEUE) {
578 ASSERT(cap->run_queue_hd == tso);
579 cap->run_queue_hd = tso->_link;
580 } else {
581 setTSOLink(cap, tso->block_info.prev, tso->_link);
582 }
583 if (tso->_link == END_TSO_QUEUE) {
584 ASSERT(cap->run_queue_tl == tso);
585 cap->run_queue_tl = tso->block_info.prev;
586 } else {
587 setTSOPrev(cap, tso->_link, tso->block_info.prev);
588 }
589 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
590 cap->n_run_queue--;
591
592 IF_DEBUG(sanity, checkRunQueue(cap));
593 }
594
595 void
596 promoteInRunQueue (Capability *cap, StgTSO *tso)
597 {
598 removeFromRunQueue(cap, tso);
599 pushOnRunQueue(cap, tso);
600 }
601
602 /* ----------------------------------------------------------------------------
603 * Setting up the scheduler loop
604 * ------------------------------------------------------------------------- */
605
606 static void
607 schedulePreLoop(void)
608 {
609 // initialisation for scheduler - what cannot go into initScheduler()
610
611 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
612 win32AllocStack();
613 #endif
614 }
615
616 /* -----------------------------------------------------------------------------
617 * scheduleFindWork()
618 *
619 * Search for work to do, and handle messages from elsewhere.
620 * -------------------------------------------------------------------------- */
621
622 static void
623 scheduleFindWork (Capability **pcap)
624 {
625 scheduleStartSignalHandlers(*pcap);
626
627 scheduleProcessInbox(pcap);
628
629 scheduleCheckBlockedThreads(*pcap);
630
631 #if defined(THREADED_RTS)
632 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
633 #endif
634 }
635
636 #if defined(THREADED_RTS)
637 STATIC_INLINE bool
638 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
639 {
640 // we need to yield this capability to someone else if..
641 // - another thread is initiating a GC, and we didn't just do a GC
642 // (see Note [GC livelock])
643 // - another Task is returning from a foreign call
644 // - the thread at the head of the run queue cannot be run
645 // by this Task (it is bound to another Task, or it is unbound
646 // and this task it bound).
647 //
648 // Note [GC livelock]
649 //
650 // If we are interrupted to do a GC, then we do not immediately do
651 // another one. This avoids a starvation situation where one
652 // Capability keeps forcing a GC and the other Capabilities make no
653 // progress at all.
654
655 return ((pending_sync && !didGcLast) ||
656 cap->n_returning_tasks != 0 ||
657 (!emptyRunQueue(cap) && (task->incall->tso == NULL
658 ? peekRunQueue(cap)->bound != NULL
659 : peekRunQueue(cap)->bound != task->incall)));
660 }
661
662 // This is the single place where a Task goes to sleep. There are
663 // two reasons it might need to sleep:
664 // - there are no threads to run
665 // - we need to yield this Capability to someone else
666 // (see shouldYieldCapability())
667 //
668 // Careful: the scheduler loop is quite delicate. Make sure you run
669 // the tests in testsuite/concurrent (all ways) after modifying this,
670 // and also check the benchmarks in nofib/parallel for regressions.
671
672 static void
673 scheduleYield (Capability **pcap, Task *task)
674 {
675 Capability *cap = *pcap;
676 bool didGcLast = false;
677
678 // if we have work, and we don't need to give up the Capability, continue.
679 //
680 if (!shouldYieldCapability(cap,task,false) &&
681 (!emptyRunQueue(cap) ||
682 !emptyInbox(cap) ||
683 sched_state >= SCHED_INTERRUPTING)) {
684 return;
685 }
686
687 // otherwise yield (sleep), and keep yielding if necessary.
688 do {
689 if (doIdleGCWork(cap, false)) {
690 // there's more idle GC work to do
691 didGcLast = false;
692 } else {
693 // no more idle GC work to do
694 didGcLast = yieldCapability(&cap,task, !didGcLast);
695 }
696 }
697 while (shouldYieldCapability(cap,task,didGcLast));
698
699 // note there may still be no threads on the run queue at this
700 // point, the caller has to check.
701
702 *pcap = cap;
703 return;
704 }
705 #endif
706
707 /* -----------------------------------------------------------------------------
708 * schedulePushWork()
709 *
710 * Push work to other Capabilities if we have some.
711 * -------------------------------------------------------------------------- */
712
713 static void
714 schedulePushWork(Capability *cap USED_IF_THREADS,
715 Task *task USED_IF_THREADS)
716 {
717 #if defined(THREADED_RTS)
718
719 Capability *free_caps[n_capabilities], *cap0;
720 uint32_t i, n_wanted_caps, n_free_caps;
721
722 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
723
724 // migration can be turned off with +RTS -qm
725 if (!RtsFlags.ParFlags.migrate) {
726 spare_threads = 0;
727 }
728
729 // Figure out how many capabilities we want to wake up. We need at least
730 // sparkPoolSize(cap) plus the number of spare threads we have.
731 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
732 if (n_wanted_caps == 0) return;
733
734 // First grab as many free Capabilities as we can. ToDo: we should use
735 // capabilities on the same NUMA node preferably, but not exclusively.
736 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
737 n_free_caps < n_wanted_caps && i != cap->no;
738 i = (i + 1) % n_capabilities) {
739 cap0 = capabilities[i];
740 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
741 if (!emptyRunQueue(cap0)
742 || cap0->n_returning_tasks != 0
743 || !emptyInbox(cap0)) {
744 // it already has some work, we just grabbed it at
745 // the wrong moment. Or maybe it's deadlocked!
746 releaseCapability(cap0);
747 } else {
748 free_caps[n_free_caps++] = cap0;
749 }
750 }
751 }
752
753 // We now have n_free_caps free capabilities stashed in
754 // free_caps[]. Attempt to share our run queue equally with them.
755 // This is complicated slightly by the fact that we can't move
756 // some threads:
757 //
758 // - threads that have TSO_LOCKED cannot migrate
759 // - a thread that is bound to the current Task cannot be migrated
760 //
761 // This is about the simplest thing we could do; improvements we
762 // might want to do include:
763 //
764 // - giving high priority to moving relatively new threads, on
765 // the gournds that they haven't had time to build up a
766 // working set in the cache on this CPU/Capability.
767 //
768 // - giving low priority to moving long-lived threads
769
770 if (n_free_caps > 0) {
771 StgTSO *prev, *t, *next;
772
773 debugTrace(DEBUG_sched,
774 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
775 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
776 n_free_caps);
777
778 // There are n_free_caps+1 caps in total. We will share the threads
779 // evently between them, *except* that if the run queue does not divide
780 // evenly by n_free_caps+1 then we bias towards the current capability.
781 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
782 uint32_t keep_threads =
783 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
784
785 // This also ensures that we don't give away all our threads, since
786 // (x + y) / (y + 1) >= 1 when x >= 1.
787
788 // The number of threads we have left.
789 uint32_t n = cap->n_run_queue;
790
791 // prev = the previous thread on this cap's run queue
792 prev = END_TSO_QUEUE;
793
794 // We're going to walk through the run queue, migrating threads to other
795 // capabilities until we have only keep_threads left. We might
796 // encounter a thread that cannot be migrated, in which case we add it
797 // to the current run queue and decrement keep_threads.
798 for (t = cap->run_queue_hd, i = 0;
799 t != END_TSO_QUEUE && n > keep_threads;
800 t = next)
801 {
802 next = t->_link;
803 t->_link = END_TSO_QUEUE;
804
805 // Should we keep this thread?
806 if (t->bound == task->incall // don't move my bound thread
807 || tsoLocked(t) // don't move a locked thread
808 ) {
809 if (prev == END_TSO_QUEUE) {
810 cap->run_queue_hd = t;
811 } else {
812 setTSOLink(cap, prev, t);
813 }
814 setTSOPrev(cap, t, prev);
815 prev = t;
816 if (keep_threads > 0) keep_threads--;
817 }
818
819 // Or migrate it?
820 else {
821 appendToRunQueue(free_caps[i],t);
822 traceEventMigrateThread (cap, t, free_caps[i]->no);
823
824 if (t->bound) { t->bound->task->cap = free_caps[i]; }
825 t->cap = free_caps[i];
826 n--; // we have one fewer threads now
827 i++; // move on to the next free_cap
828 if (i == n_free_caps) i = 0;
829 }
830 }
831
832 // Join up the beginning of the queue (prev)
833 // with the rest of the queue (t)
834 if (t == END_TSO_QUEUE) {
835 cap->run_queue_tl = prev;
836 } else {
837 setTSOPrev(cap, t, prev);
838 }
839 if (prev == END_TSO_QUEUE) {
840 cap->run_queue_hd = t;
841 } else {
842 setTSOLink(cap, prev, t);
843 }
844 cap->n_run_queue = n;
845
846 IF_DEBUG(sanity, checkRunQueue(cap));
847
848 // release the capabilities
849 for (i = 0; i < n_free_caps; i++) {
850 task->cap = free_caps[i];
851 if (sparkPoolSizeCap(cap) > 0) {
852 // If we have sparks to steal, wake up a worker on the
853 // capability, even if it has no threads to run.
854 releaseAndWakeupCapability(free_caps[i]);
855 } else {
856 releaseCapability(free_caps[i]);
857 }
858 }
859 }
860 task->cap = cap; // reset to point to our Capability.
861
862 #endif /* THREADED_RTS */
863
864 }
865
866 /* ----------------------------------------------------------------------------
867 * Start any pending signal handlers
868 * ------------------------------------------------------------------------- */
869
870 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
871 static void
872 scheduleStartSignalHandlers(Capability *cap)
873 {
874 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
875 // safe outside the lock
876 startSignalHandlers(cap);
877 }
878 }
879 #else
880 static void
881 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
882 {
883 }
884 #endif
885
886 /* ----------------------------------------------------------------------------
887 * Check for blocked threads that can be woken up.
888 * ------------------------------------------------------------------------- */
889
890 static void
891 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
892 {
893 #if !defined(THREADED_RTS)
894 //
895 // Check whether any waiting threads need to be woken up. If the
896 // run queue is empty, and there are no other tasks running, we
897 // can wait indefinitely for something to happen.
898 //
899 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
900 {
901 awaitEvent (emptyRunQueue(cap));
902 }
903 #endif
904 }
905
906 /* ----------------------------------------------------------------------------
907 * Detect deadlock conditions and attempt to resolve them.
908 * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleDetectDeadlock (Capability **pcap, Task *task)
912 {
913 Capability *cap = *pcap;
914 /*
915 * Detect deadlock: when we have no threads to run, there are no
916 * threads blocked, waiting for I/O, or sleeping, and all the
917 * other tasks are waiting for work, we must have a deadlock of
918 * some description.
919 */
920 if ( emptyThreadQueues(cap) )
921 {
922 #if defined(THREADED_RTS)
923 /*
924 * In the threaded RTS, we only check for deadlock if there
925 * has been no activity in a complete timeslice. This means
926 * we won't eagerly start a full GC just because we don't have
927 * any threads to run currently.
928 */
929 if (recent_activity != ACTIVITY_INACTIVE) return;
930 #endif
931
932 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
933
934 // Garbage collection can release some new threads due to
935 // either (a) finalizers or (b) threads resurrected because
936 // they are unreachable and will therefore be sent an
937 // exception. Any threads thus released will be immediately
938 // runnable.
939 scheduleDoGC (pcap, task, true/*force major GC*/, true/*deadlock detection*/);
940 cap = *pcap;
941 // when force_major == true. scheduleDoGC sets
942 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
943 // signal.
944
945 if ( !emptyRunQueue(cap) ) return;
946
947 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
948 /* If we have user-installed signal handlers, then wait
949 * for signals to arrive rather then bombing out with a
950 * deadlock.
951 */
952 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
953 debugTrace(DEBUG_sched,
954 "still deadlocked, waiting for signals...");
955
956 awaitUserSignals();
957
958 if (signals_pending()) {
959 startSignalHandlers(cap);
960 }
961
962 // either we have threads to run, or we were interrupted:
963 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
964
965 return;
966 }
967 #endif
968
969 #if !defined(THREADED_RTS)
970 /* Probably a real deadlock. Send the current main thread the
971 * Deadlock exception.
972 */
973 if (task->incall->tso) {
974 switch (task->incall->tso->why_blocked) {
975 case BlockedOnSTM:
976 case BlockedOnBlackHole:
977 case BlockedOnMsgThrowTo:
978 case BlockedOnMVar:
979 case BlockedOnMVarRead:
980 throwToSingleThreaded(cap, task->incall->tso,
981 (StgClosure *)nonTermination_closure);
982 return;
983 default:
984 barf("deadlock: main thread blocked in a strange way");
985 }
986 }
987 return;
988 #endif
989 }
990 }
991
992
993 /* ----------------------------------------------------------------------------
994 * Process message in the current Capability's inbox
995 * ------------------------------------------------------------------------- */
996
997 static void
998 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
999 {
1000 #if defined(THREADED_RTS)
1001 Message *m, *next;
1002 PutMVar *p, *pnext;
1003 int r;
1004 Capability *cap = *pcap;
1005
1006 while (!emptyInbox(cap)) {
1007 // Executing messages might use heap, so we should check for GC.
1008 if (doYouWantToGC(cap)) {
1009 scheduleDoGC(pcap, cap->running_task, false, false);
1010 cap = *pcap;
1011 }
1012
1013 // don't use a blocking acquire; if the lock is held by
1014 // another thread then just carry on. This seems to avoid
1015 // getting stuck in a message ping-pong situation with other
1016 // processors. We'll check the inbox again later anyway.
1017 //
1018 // We should really use a more efficient queue data structure
1019 // here. The trickiness is that we must ensure a Capability
1020 // never goes idle if the inbox is non-empty, which is why we
1021 // use cap->lock (cap->lock is released as the last thing
1022 // before going idle; see Capability.c:releaseCapability()).
1023 r = TRY_ACQUIRE_LOCK(&cap->lock);
1024 if (r != 0) return;
1025
1026 m = cap->inbox;
1027 p = cap->putMVars;
1028 cap->inbox = (Message*)END_TSO_QUEUE;
1029 cap->putMVars = NULL;
1030
1031 RELEASE_LOCK(&cap->lock);
1032
1033 while (m != (Message*)END_TSO_QUEUE) {
1034 next = m->link;
1035 executeMessage(cap, m);
1036 m = next;
1037 }
1038
1039 while (p != NULL) {
1040 pnext = p->link;
1041 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1042 Unit_closure);
1043 freeStablePtr(p->mvar);
1044 stgFree(p);
1045 p = pnext;
1046 }
1047 }
1048 #endif
1049 }
1050
1051
1052 /* ----------------------------------------------------------------------------
1053 * Activate spark threads (THREADED_RTS)
1054 * ------------------------------------------------------------------------- */
1055
1056 #if defined(THREADED_RTS)
1057 static void
1058 scheduleActivateSpark(Capability *cap)
1059 {
1060 if (anySparks() && !cap->disabled)
1061 {
1062 createSparkThread(cap);
1063 debugTrace(DEBUG_sched, "creating a spark thread");
1064 }
1065 }
1066 #endif // THREADED_RTS
1067
1068 /* ----------------------------------------------------------------------------
1069 * After running a thread...
1070 * ------------------------------------------------------------------------- */
1071
1072 static void
1073 schedulePostRunThread (Capability *cap, StgTSO *t)
1074 {
1075 // We have to be able to catch transactions that are in an
1076 // infinite loop as a result of seeing an inconsistent view of
1077 // memory, e.g.
1078 //
1079 // atomically $ do
1080 // [a,b] <- mapM readTVar [ta,tb]
1081 // when (a == b) loop
1082 //
1083 // and a is never equal to b given a consistent view of memory.
1084 //
1085 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1086 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1087 debugTrace(DEBUG_sched | DEBUG_stm,
1088 "trec %p found wasting its time", t);
1089
1090 // strip the stack back to the
1091 // ATOMICALLY_FRAME, aborting the (nested)
1092 // transaction, and saving the stack of any
1093 // partially-evaluated thunks on the heap.
1094 throwToSingleThreaded_(cap, t, NULL, true);
1095
1096 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1097 }
1098 }
1099
1100 //
1101 // If the current thread's allocation limit has run out, send it
1102 // the AllocationLimitExceeded exception.
1103
1104 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1105 // Use a throwToSelf rather than a throwToSingleThreaded, because
1106 // it correctly handles the case where the thread is currently
1107 // inside mask. Also the thread might be blocked (e.g. on an
1108 // MVar), and throwToSingleThreaded doesn't unblock it
1109 // correctly in that case.
1110 throwToSelf(cap, t, allocationLimitExceeded_closure);
1111 ASSIGN_Int64((W_*)&(t->alloc_limit),
1112 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1113 }
1114
1115 /* some statistics gathering in the parallel case */
1116 }
1117
1118 /* -----------------------------------------------------------------------------
1119 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1120 * -------------------------------------------------------------------------- */
1121
1122 static bool
1123 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1124 {
1125 if (cap->r.rHpLim == NULL || cap->context_switch) {
1126 // Sometimes we miss a context switch, e.g. when calling
1127 // primitives in a tight loop, MAYBE_GC() doesn't check the
1128 // context switch flag, and we end up waiting for a GC.
1129 // See #1984, and concurrent/should_run/1984
1130 cap->context_switch = 0;
1131 appendToRunQueue(cap,t);
1132 } else {
1133 pushOnRunQueue(cap,t);
1134 }
1135
1136 // did the task ask for a large block?
1137 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1138 // if so, get one and push it on the front of the nursery.
1139 bdescr *bd;
1140 W_ blocks;
1141
1142 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1143
1144 if (blocks > BLOCKS_PER_MBLOCK) {
1145 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1146 }
1147
1148 debugTrace(DEBUG_sched,
1149 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1150 (long)t->id, what_next_strs[t->what_next], blocks);
1151
1152 // don't do this if the nursery is (nearly) full, we'll GC first.
1153 if (cap->r.rCurrentNursery->link != NULL ||
1154 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1155 // infinite loop if the
1156 // nursery has only one
1157 // block.
1158
1159 bd = allocGroupOnNode_lock(cap->node,blocks);
1160 cap->r.rNursery->n_blocks += blocks;
1161
1162 // link the new group after CurrentNursery
1163 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1164
1165 // initialise it as a nursery block. We initialise the
1166 // step, gen_no, and flags field of *every* sub-block in
1167 // this large block, because this is easier than making
1168 // sure that we always find the block head of a large
1169 // block whenever we call Bdescr() (eg. evacuate() and
1170 // isAlive() in the GC would both have to do this, at
1171 // least).
1172 {
1173 bdescr *x;
1174 for (x = bd; x < bd + blocks; x++) {
1175 initBdescr(x,g0,g0);
1176 x->free = x->start;
1177 x->flags = 0;
1178 }
1179 }
1180
1181 // This assert can be a killer if the app is doing lots
1182 // of large block allocations.
1183 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1184
1185 // now update the nursery to point to the new block
1186 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1187 cap->r.rCurrentNursery = bd;
1188
1189 // we might be unlucky and have another thread get on the
1190 // run queue before us and steal the large block, but in that
1191 // case the thread will just end up requesting another large
1192 // block.
1193 return false; /* not actually GC'ing */
1194 }
1195 }
1196
1197 return doYouWantToGC(cap);
1198 /* actual GC is done at the end of the while loop in schedule() */
1199 }
1200
1201 /* -----------------------------------------------------------------------------
1202 * Handle a thread that returned to the scheduler with ThreadYielding
1203 * -------------------------------------------------------------------------- */
1204
1205 static bool
1206 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1207 {
1208 /* put the thread back on the run queue. Then, if we're ready to
1209 * GC, check whether this is the last task to stop. If so, wake
1210 * up the GC thread. getThread will block during a GC until the
1211 * GC is finished.
1212 */
1213
1214 ASSERT(t->_link == END_TSO_QUEUE);
1215
1216 // Shortcut if we're just switching evaluators: just run the thread. See
1217 // Note [avoiding threadPaused] in Interpreter.c.
1218 if (t->what_next != prev_what_next) {
1219 debugTrace(DEBUG_sched,
1220 "--<< thread %ld (%s) stopped to switch evaluators",
1221 (long)t->id, what_next_strs[t->what_next]);
1222 return true;
1223 }
1224
1225 // Reset the context switch flag. We don't do this just before
1226 // running the thread, because that would mean we would lose ticks
1227 // during GC, which can lead to unfair scheduling (a thread hogs
1228 // the CPU because the tick always arrives during GC). This way
1229 // penalises threads that do a lot of allocation, but that seems
1230 // better than the alternative.
1231 if (cap->context_switch != 0) {
1232 cap->context_switch = 0;
1233 appendToRunQueue(cap,t);
1234 } else {
1235 pushOnRunQueue(cap,t);
1236 }
1237
1238 IF_DEBUG(sanity,
1239 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1240 checkTSO(t));
1241
1242 return false;
1243 }
1244
1245 /* -----------------------------------------------------------------------------
1246 * Handle a thread that returned to the scheduler with ThreadBlocked
1247 * -------------------------------------------------------------------------- */
1248
1249 static void
1250 scheduleHandleThreadBlocked( StgTSO *t
1251 #if !defined(DEBUG)
1252 STG_UNUSED
1253 #endif
1254 )
1255 {
1256
1257 // We don't need to do anything. The thread is blocked, and it
1258 // has tidied up its stack and placed itself on whatever queue
1259 // it needs to be on.
1260
1261 // ASSERT(t->why_blocked != NotBlocked);
1262 // Not true: for example,
1263 // - the thread may have woken itself up already, because
1264 // threadPaused() might have raised a blocked throwTo
1265 // exception, see maybePerformBlockedException().
1266
1267 #if defined(DEBUG)
1268 traceThreadStatus(DEBUG_sched, t);
1269 #endif
1270 }
1271
1272 /* -----------------------------------------------------------------------------
1273 * Handle a thread that returned to the scheduler with ThreadFinished
1274 * -------------------------------------------------------------------------- */
1275
1276 static bool
1277 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1278 {
1279 /* Need to check whether this was a main thread, and if so,
1280 * return with the return value.
1281 *
1282 * We also end up here if the thread kills itself with an
1283 * uncaught exception, see Exception.cmm.
1284 */
1285
1286 // blocked exceptions can now complete, even if the thread was in
1287 // blocked mode (see #2910).
1288 awakenBlockedExceptionQueue (cap, t);
1289
1290 //
1291 // Check whether the thread that just completed was a bound
1292 // thread, and if so return with the result.
1293 //
1294 // There is an assumption here that all thread completion goes
1295 // through this point; we need to make sure that if a thread
1296 // ends up in the ThreadKilled state, that it stays on the run
1297 // queue so it can be dealt with here.
1298 //
1299
1300 if (t->bound) {
1301
1302 if (t->bound != task->incall) {
1303 #if !defined(THREADED_RTS)
1304 // Must be a bound thread that is not the topmost one. Leave
1305 // it on the run queue until the stack has unwound to the
1306 // point where we can deal with this. Leaving it on the run
1307 // queue also ensures that the garbage collector knows about
1308 // this thread and its return value (it gets dropped from the
1309 // step->threads list so there's no other way to find it).
1310 appendToRunQueue(cap,t);
1311 return false;
1312 #else
1313 // this cannot happen in the threaded RTS, because a
1314 // bound thread can only be run by the appropriate Task.
1315 barf("finished bound thread that isn't mine");
1316 #endif
1317 }
1318
1319 ASSERT(task->incall->tso == t);
1320
1321 if (t->what_next == ThreadComplete) {
1322 if (task->incall->ret) {
1323 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1324 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1325 }
1326 task->incall->rstat = Success;
1327 } else {
1328 if (task->incall->ret) {
1329 *(task->incall->ret) = NULL;
1330 }
1331 if (sched_state >= SCHED_INTERRUPTING) {
1332 if (heap_overflow) {
1333 task->incall->rstat = HeapExhausted;
1334 } else {
1335 task->incall->rstat = Interrupted;
1336 }
1337 } else {
1338 task->incall->rstat = Killed;
1339 }
1340 }
1341 #if defined(DEBUG)
1342 removeThreadLabel((StgWord)task->incall->tso->id);
1343 #endif
1344
1345 // We no longer consider this thread and task to be bound to
1346 // each other. The TSO lives on until it is GC'd, but the
1347 // task is about to be released by the caller, and we don't
1348 // want anyone following the pointer from the TSO to the
1349 // defunct task (which might have already been
1350 // re-used). This was a real bug: the GC updated
1351 // tso->bound->tso which lead to a deadlock.
1352 t->bound = NULL;
1353 task->incall->tso = NULL;
1354
1355 return true; // tells schedule() to return
1356 }
1357
1358 return false;
1359 }
1360
1361 /* -----------------------------------------------------------------------------
1362 * Perform a heap census
1363 * -------------------------------------------------------------------------- */
1364
1365 static bool
1366 scheduleNeedHeapProfile( bool ready_to_gc )
1367 {
1368 // When we have +RTS -i0 and we're heap profiling, do a census at
1369 // every GC. This lets us get repeatable runs for debugging.
1370 if (performHeapProfile ||
1371 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1372 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1373 return true;
1374 } else {
1375 return false;
1376 }
1377 }
1378
1379 /* -----------------------------------------------------------------------------
1380 * stopAllCapabilities()
1381 *
1382 * Stop all Haskell execution. This is used when we need to make some global
1383 * change to the system, such as altering the number of capabilities, or
1384 * forking.
1385 *
1386 * pCap may be NULL in the event that the caller doesn't yet own a capability.
1387 *
1388 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1389 * -------------------------------------------------------------------------- */
1390
1391 #if defined(THREADED_RTS)
1392 void stopAllCapabilities (Capability **pCap, Task *task)
1393 {
1394 stopAllCapabilitiesWith(pCap, task, SYNC_OTHER);
1395 }
1396
1397 void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type)
1398 {
1399 bool was_syncing;
1400 SyncType prev_sync_type;
1401
1402 PendingSync sync = {
1403 .type = sync_type,
1404 .idle = NULL,
1405 .task = task
1406 };
1407
1408 do {
1409 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1410 } while (was_syncing);
1411
1412 acquireAllCapabilities(pCap ? *pCap : NULL, task);
1413
1414 pending_sync = 0;
1415 signalCondition(&sync_finished_cond);
1416 }
1417 #endif
1418
1419 /* -----------------------------------------------------------------------------
1420 * requestSync()
1421 *
1422 * Commence a synchronisation between all capabilities. Normally not called
1423 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1424 * has some special synchronisation requirements.
1425 *
1426 * Note that this can be called in two ways:
1427 *
1428 * - where *pcap points to a capability owned by the caller: in this case
1429 * *prev_sync_type will reflect the in-progress sync type on return, if one
1430 * *was found
1431 *
1432 * - where pcap == NULL: in this case the caller doesn't hold a capability.
1433 * we only return whether or not a pending sync was found and prev_sync_type
1434 * is unchanged.
1435 *
1436 * Returns:
1437 * false if we successfully got a sync
1438 * true if there was another sync request in progress,
1439 * and we yielded to it. The value returned is the
1440 * type of the other sync request.
1441 * -------------------------------------------------------------------------- */
1442
1443 #if defined(THREADED_RTS)
1444 static bool requestSync (
1445 Capability **pcap, Task *task, PendingSync *new_sync,
1446 SyncType *prev_sync_type)
1447 {
1448 PendingSync *sync;
1449
1450 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1451 (StgWord)NULL,
1452 (StgWord)new_sync);
1453
1454 if (sync != NULL)
1455 {
1456 // sync is valid until we have called yieldCapability().
1457 // After the sync is completed, we cannot read that struct any
1458 // more because it has been freed.
1459 *prev_sync_type = sync->type;
1460 if (pcap == NULL) {
1461 // The caller does not hold a capability (e.g. may be a concurrent
1462 // mark thread). Consequently we must wait until the pending sync is
1463 // finished before proceeding to ensure we don't loop.
1464 // TODO: Don't busy-wait
1465 ACQUIRE_LOCK(&sync_finished_mutex);
1466 while (pending_sync) {
1467 waitCondition(&sync_finished_cond, &sync_finished_mutex);
1468 }
1469 RELEASE_LOCK(&sync_finished_mutex);
1470 } else {
1471 do {
1472 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1473 sync->type);
1474 ASSERT(*pcap);
1475 yieldCapability(pcap,task,true);
1476 sync = pending_sync;
1477 } while (sync != NULL);
1478 }
1479
1480 // NOTE: task->cap might have changed now
1481 return true;
1482 }
1483 else
1484 {
1485 return false;
1486 }
1487 }
1488 #endif
1489
1490 /* -----------------------------------------------------------------------------
1491 * acquireAllCapabilities()
1492 *
1493 * Grab all the capabilities except the one we already hold (cap may be NULL is
1494 * the caller does not currently hold a capability). Used when synchronising
1495 * before a single-threaded GC (SYNC_SEQ_GC), and before a fork (SYNC_OTHER).
1496 *
1497 * Only call this after requestSync(), otherwise a deadlock might
1498 * ensue if another thread is trying to synchronise.
1499 * -------------------------------------------------------------------------- */
1500
1501 #if defined(THREADED_RTS)
1502 static void acquireAllCapabilities(Capability *cap, Task *task)
1503 {
1504 Capability *tmpcap;
1505 uint32_t i;
1506
1507 ASSERT(pending_sync != NULL);
1508 for (i=0; i < n_capabilities; i++) {
1509 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1510 i, n_capabilities);
1511 tmpcap = capabilities[i];
1512 if (tmpcap != cap) {
1513 // we better hope this task doesn't get migrated to
1514 // another Capability while we're waiting for this one.
1515 // It won't, because load balancing happens while we have
1516 // all the Capabilities, but even so it's a slightly
1517 // unsavoury invariant.
1518 task->cap = tmpcap;
1519 waitForCapability(&tmpcap, task);
1520 if (tmpcap->no != i) {
1521 barf("acquireAllCapabilities: got the wrong capability");
1522 }
1523 }
1524 }
1525 task->cap = cap == NULL ? tmpcap : cap;
1526 }
1527 #endif
1528
1529 /* -----------------------------------------------------------------------------
1530 * releaseAllCapabilities()
1531 *
1532 * Assuming this thread holds all the capabilities, release them all (except for
1533 * the one passed in as keep_cap, if non-NULL).
1534 * -------------------------------------------------------------------------- */
1535
1536 #if defined(THREADED_RTS)
1537 void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task)
1538 {
1539 uint32_t i;
1540
1541 for (i = 0; i < n; i++) {
1542 Capability *tmpcap = capabilities[i];
1543 if (keep_cap != tmpcap) {
1544 task->cap = tmpcap;
1545 releaseCapability(tmpcap);
1546 }
1547 }
1548 task->cap = keep_cap;
1549 }
1550 #endif
1551
1552 /* -----------------------------------------------------------------------------
1553 * Perform a garbage collection if necessary
1554 * -------------------------------------------------------------------------- */
1555
1556 // N.B. See Note [Deadlock detection under nonmoving collector] for rationale
1557 // behind deadlock_detect argument.
1558 static void
1559 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1560 bool force_major, bool deadlock_detect)
1561 {
1562 Capability *cap = *pcap;
1563 bool heap_census;
1564 uint32_t collect_gen;
1565 bool major_gc;
1566 #if defined(THREADED_RTS)
1567 uint32_t gc_type;
1568 uint32_t i;
1569 uint32_t need_idle;
1570 uint32_t n_gc_threads;
1571 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1572 StgTSO *tso;
1573 bool *idle_cap;
1574 // idle_cap is an array (allocated later) of size n_capabilities, where
1575 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1576 // cycle.
1577 #endif
1578
1579 if (sched_state == SCHED_SHUTTING_DOWN) {
1580 // The final GC has already been done, and the system is
1581 // shutting down. We'll probably deadlock if we try to GC
1582 // now.
1583 return;
1584 }
1585
1586 heap_census = scheduleNeedHeapProfile(true);
1587
1588 // Figure out which generation we are collecting, so that we can
1589 // decide whether this is a parallel GC or not.
1590 collect_gen = calcNeeded(force_major || heap_census, NULL);
1591 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1592
1593 #if defined(THREADED_RTS)
1594 if (sched_state < SCHED_INTERRUPTING
1595 && RtsFlags.ParFlags.parGcEnabled
1596 && collect_gen >= RtsFlags.ParFlags.parGcGen
1597 && ! oldest_gen->mark)
1598 {
1599 gc_type = SYNC_GC_PAR;
1600 } else {
1601 gc_type = SYNC_GC_SEQ;
1602 }
1603
1604 // In order to GC, there must be no threads running Haskell code.
1605 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1606 // capabilities, and release them after the GC has completed. For parallel
1607 // GC, we synchronise all the running threads using requestSync().
1608 //
1609 // Other capabilities are prevented from running yet more Haskell threads if
1610 // pending_sync is set. Tested inside yieldCapability() and
1611 // releaseCapability() in Capability.c
1612
1613 PendingSync sync = {
1614 .type = gc_type,
1615 .idle = NULL,
1616 .task = task
1617 };
1618
1619 {
1620 SyncType prev_sync = 0;
1621 bool was_syncing;
1622 do {
1623 // If -qn is not set and we have more capabilities than cores, set
1624 // the number of GC threads to #cores. We do this here rather than
1625 // in normaliseRtsOpts() because here it will work if the program
1626 // calls setNumCapabilities.
1627 //
1628 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1629 if (n_gc_threads == 0 &&
1630 enabled_capabilities > getNumberOfProcessors()) {
1631 n_gc_threads = getNumberOfProcessors();
1632 }
1633
1634 // This calculation must be inside the loop because
1635 // enabled_capabilities may change if requestSync() below fails and
1636 // we retry.
1637 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1638 if (n_gc_threads >= enabled_capabilities) {
1639 need_idle = 0;
1640 } else {
1641 need_idle = enabled_capabilities - n_gc_threads;
1642 }
1643 } else {
1644 need_idle = 0;
1645 }
1646
1647 // We need an array of size n_capabilities, but since this may
1648 // change each time around the loop we must allocate it afresh.
1649 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1650 sizeof(bool),
1651 "scheduleDoGC");
1652 sync.idle = idle_cap;
1653
1654 // When using +RTS -qn, we need some capabilities to be idle during
1655 // GC. The best bet is to choose some inactive ones, so we look for
1656 // those first:
1657 uint32_t n_idle = need_idle;
1658 for (i=0; i < n_capabilities; i++) {
1659 if (capabilities[i]->disabled) {
1660 idle_cap[i] = true;
1661 } else if (n_idle > 0 &&
1662 capabilities[i]->running_task == NULL) {
1663 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1664 n_idle--;
1665 idle_cap[i] = true;
1666 } else {
1667 idle_cap[i] = false;
1668 }
1669 }
1670 // If we didn't find enough inactive capabilities, just pick some
1671 // more to be idle.
1672 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1673 if (!idle_cap[i] && i != cap->no) {
1674 idle_cap[i] = true;
1675 n_idle--;
1676 }
1677 }
1678 ASSERT(n_idle == 0);
1679
1680 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1681 cap = *pcap;
1682 if (was_syncing) {
1683 stgFree(idle_cap);
1684 }
1685 if (was_syncing &&
1686 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1687 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1688 // someone else had a pending sync request for a GC, so
1689 // let's assume GC has been done and we don't need to GC
1690 // again.
1691 // Exception to this: if SCHED_INTERRUPTING, then we still
1692 // need to do the final GC.
1693 return;
1694 }
1695 if (sched_state == SCHED_SHUTTING_DOWN) {
1696 // The scheduler might now be shutting down. We tested
1697 // this above, but it might have become true since then as
1698 // we yielded the capability in requestSync().
1699 return;
1700 }
1701 } while (was_syncing);
1702 }
1703
1704 stat_startGCSync(gc_threads[cap->no]);
1705
1706 #if defined(DEBUG)
1707 unsigned int old_n_capabilities = n_capabilities;
1708 #endif
1709
1710 interruptAllCapabilities();
1711
1712 // The final shutdown GC is always single-threaded, because it's
1713 // possible that some of the Capabilities have no worker threads.
1714
1715 if (gc_type == SYNC_GC_SEQ) {
1716 traceEventRequestSeqGc(cap);
1717 } else {
1718 traceEventRequestParGc(cap);
1719 }
1720
1721 if (gc_type == SYNC_GC_SEQ) {
1722 // single-threaded GC: grab all the capabilities
1723 acquireAllCapabilities(cap,task);
1724 }
1725 else
1726 {
1727 // If we are load-balancing collections in this
1728 // generation, then we require all GC threads to participate
1729 // in the collection. Otherwise, we only require active
1730 // threads to participate, and we set gc_threads[i]->idle for
1731 // any idle capabilities. The rationale here is that waking
1732 // up an idle Capability takes much longer than just doing any
1733 // GC work on its behalf.
1734
1735 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1736 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1737 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1738 {
1739 for (i=0; i < n_capabilities; i++) {
1740 if (capabilities[i]->disabled) {
1741 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1742 if (idle_cap[i]) {
1743 n_idle_caps++;
1744 }
1745 } else {
1746 if (i != cap->no && idle_cap[i]) {
1747 Capability *tmpcap = capabilities[i];
1748 task->cap = tmpcap;
1749 waitForCapability(&tmpcap, task);
1750 n_idle_caps++;
1751 }
1752 }
1753 }
1754 }
1755 else
1756 {
1757 for (i=0; i < n_capabilities; i++) {
1758 if (capabilities[i]->disabled) {
1759 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1760 if (idle_cap[i]) {
1761 n_idle_caps++;
1762 }
1763 } else if (i != cap->no &&
1764 capabilities[i]->idle >=
1765 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1766 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1767 if (idle_cap[i]) {
1768 n_idle_caps++;
1769 } else {
1770 n_failed_trygrab_idles++;
1771 }
1772 }
1773 }
1774 }
1775 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1776
1777 for (i=0; i < n_capabilities; i++) {
1778 capabilities[i]->idle++;
1779 }
1780
1781 // For all capabilities participating in this GC, wait until
1782 // they have stopped mutating and are standing by for GC.
1783 waitForGcThreads(cap, idle_cap);
1784
1785 // Stable point where we can do a global check on our spark counters
1786 ASSERT(checkSparkCountInvariant());
1787 }
1788
1789 #endif
1790
1791 IF_DEBUG(scheduler, printAllThreads());
1792
1793 delete_threads_and_gc:
1794 /*
1795 * We now have all the capabilities; if we're in an interrupting
1796 * state, then we should take the opportunity to delete all the
1797 * threads in the system.
1798 * Checking for major_gc ensures that the last GC is major.
1799 */
1800 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1801 deleteAllThreads();
1802 #if defined(THREADED_RTS)
1803 // Discard all the sparks from every Capability. Why?
1804 // They'll probably be GC'd anyway since we've killed all the
1805 // threads. It just avoids the GC having to do any work to
1806 // figure out that any remaining sparks are garbage.
1807 for (i = 0; i < n_capabilities; i++) {
1808 capabilities[i]->spark_stats.gcd +=
1809 sparkPoolSize(capabilities[i]->sparks);
1810 // No race here since all Caps are stopped.
1811 discardSparksCap(capabilities[i]);
1812 }
1813 #endif
1814 sched_state = SCHED_SHUTTING_DOWN;
1815 }
1816
1817 /*
1818 * When there are disabled capabilities, we want to migrate any
1819 * threads away from them. Normally this happens in the
1820 * scheduler's loop, but only for unbound threads - it's really
1821 * hard for a bound thread to migrate itself. So we have another
1822 * go here.
1823 */
1824 #if defined(THREADED_RTS)
1825 for (i = enabled_capabilities; i < n_capabilities; i++) {
1826 Capability *tmp_cap, *dest_cap;
1827 tmp_cap = capabilities[i];
1828 ASSERT(tmp_cap->disabled);
1829 if (i != cap->no) {
1830 dest_cap = capabilities[i % enabled_capabilities];
1831 while (!emptyRunQueue(tmp_cap)) {
1832 tso = popRunQueue(tmp_cap);
1833 migrateThread(tmp_cap, tso, dest_cap);
1834 if (tso->bound) {
1835 traceTaskMigrate(tso->bound->task,
1836 tso->bound->task->cap,
1837 dest_cap);
1838 tso->bound->task->cap = dest_cap;
1839 }
1840 }
1841 }
1842 }
1843 #endif
1844
1845 // Do any remaining idle GC work from the previous GC
1846 doIdleGCWork(cap, true /* all of it */);
1847
1848 #if defined(THREADED_RTS)
1849 // reset pending_sync *before* GC, so that when the GC threads
1850 // emerge they don't immediately re-enter the GC.
1851 pending_sync = 0;
1852 signalCondition(&sync_finished_cond);
1853 GarbageCollect(collect_gen, heap_census, deadlock_detect, gc_type, cap, idle_cap);
1854 #else
1855 GarbageCollect(collect_gen, heap_census, deadlock_detect, 0, cap, NULL);
1856 #endif
1857
1858 // If we're shutting down, don't leave any idle GC work to do.
1859 if (sched_state == SCHED_SHUTTING_DOWN) {
1860 doIdleGCWork(cap, true /* all of it */);
1861 }
1862
1863 traceSparkCounters(cap);
1864
1865 switch (recent_activity) {
1866 case ACTIVITY_INACTIVE:
1867 if (force_major) {
1868 // We are doing a GC because the system has been idle for a
1869 // timeslice and we need to check for deadlock. Record the
1870 // fact that we've done a GC and turn off the timer signal;
1871 // it will get re-enabled if we run any threads after the GC.
1872 recent_activity = ACTIVITY_DONE_GC;
1873 #if !defined(PROFILING)
1874 stopTimer();
1875 #endif
1876 break;
1877 }
1878 // fall through...
1879
1880 case ACTIVITY_MAYBE_NO:
1881 // the GC might have taken long enough for the timer to set
1882 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1883 // but we aren't necessarily deadlocked:
1884 recent_activity = ACTIVITY_YES;
1885 break;
1886
1887 case ACTIVITY_DONE_GC:
1888 // If we are actually active, the scheduler will reset the
1889 // recent_activity flag and re-enable the timer.
1890 break;
1891 }
1892
1893 #if defined(THREADED_RTS)
1894 // Stable point where we can do a global check on our spark counters
1895 ASSERT(checkSparkCountInvariant());
1896 #endif
1897
1898 // The heap census itself is done during GarbageCollect().
1899 if (heap_census) {
1900 performHeapProfile = false;
1901 }
1902
1903 #if defined(THREADED_RTS)
1904
1905 // If n_capabilities has changed during GC, we're in trouble.
1906 ASSERT(n_capabilities == old_n_capabilities);
1907
1908 if (gc_type == SYNC_GC_PAR)
1909 {
1910 for (i = 0; i < n_capabilities; i++) {
1911 if (i != cap->no) {
1912 if (idle_cap[i]) {
1913 ASSERT(capabilities[i]->running_task == task);
1914 task->cap = capabilities[i];
1915 releaseCapability(capabilities[i]);
1916 } else {
1917 ASSERT(capabilities[i]->running_task != task);
1918 }
1919 }
1920 }
1921 task->cap = cap;
1922
1923 // releaseGCThreads() happens *after* we have released idle
1924 // capabilities. Otherwise what can happen is one of the released
1925 // threads starts a new GC, and finds that it can't acquire some of
1926 // the disabled capabilities, because the previous GC still holds
1927 // them, so those disabled capabilities will not be idle during the
1928 // next GC round. However, if we release the capabilities first,
1929 // then they will be free (because they're disabled) when the next
1930 // GC cycle happens.
1931 releaseGCThreads(cap, idle_cap);
1932 }
1933 #endif
1934 if (heap_overflow && sched_state == SCHED_RUNNING) {
1935 // GC set the heap_overflow flag. We should throw an exception if we
1936 // can, or shut down otherwise.
1937
1938 // Get the thread to which Ctrl-C is thrown
1939 StgTSO *main_thread = getTopHandlerThread();
1940 if (main_thread == NULL) {
1941 // GC set the heap_overflow flag, and there is no main thread to
1942 // throw an exception to, so we should proceed with an orderly
1943 // shutdown now. Ultimately we want the main thread to return to
1944 // its caller with HeapExhausted, at which point the caller should
1945 // call hs_exit(). The first step is to delete all the threads.
1946 sched_state = SCHED_INTERRUPTING;
1947 goto delete_threads_and_gc;
1948 }
1949
1950 heap_overflow = false;
1951 const uint64_t allocation_count = getAllocations();
1952 if (RtsFlags.GcFlags.heapLimitGrace <
1953 allocation_count - allocated_bytes_at_heapoverflow ||
1954 allocated_bytes_at_heapoverflow == 0) {
1955 allocated_bytes_at_heapoverflow = allocation_count;
1956 // We used to simply exit, but throwing an exception gives the
1957 // program a chance to clean up. It also lets the exception be
1958 // caught.
1959
1960 // FIXME this is not a good way to tell a program to release
1961 // resources. It is neither reliable (the RTS crashes if it fails
1962 // to allocate memory from the OS) nor very usable (it is always
1963 // thrown to the main thread, which might not be able to do anything
1964 // useful with it). We really should have a more general way to
1965 // release resources in low-memory conditions. Nevertheless, this
1966 // is still a big improvement over just exiting.
1967
1968 // FIXME again: perhaps we should throw a synchronous exception
1969 // instead an asynchronous one, or have a way for the program to
1970 // register a handler to be called when heap overflow happens.
1971 throwToSelf(cap, main_thread, heapOverflow_closure);
1972 }
1973 }
1974
1975 #if defined(THREADED_RTS)
1976 stgFree(idle_cap);
1977
1978 if (gc_type == SYNC_GC_SEQ) {
1979 // release our stash of capabilities.
1980 releaseAllCapabilities(n_capabilities, cap, task);
1981 }
1982 #endif
1983
1984 return;
1985 }
1986
1987 /* ---------------------------------------------------------------------------
1988 * Singleton fork(). Do not copy any running threads.
1989 * ------------------------------------------------------------------------- */
1990
1991 pid_t
1992 forkProcess(HsStablePtr *entry
1993 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1994 STG_UNUSED
1995 #endif
1996 )
1997 {
1998 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1999 pid_t pid;
2000 StgTSO* t,*next;
2001 Capability *cap;
2002 uint32_t g;
2003 Task *task = NULL;
2004 uint32_t i;
2005
2006 debugTrace(DEBUG_sched, "forking!");
2007
2008 task = newBoundTask();
2009
2010 cap = NULL;
2011 waitForCapability(&cap, task);
2012
2013 #if defined(THREADED_RTS)
2014 stopAllCapabilities(&cap, task);
2015 #endif
2016
2017 // no funny business: hold locks while we fork, otherwise if some
2018 // other thread is holding a lock when the fork happens, the data
2019 // structure protected by the lock will forever be in an
2020 // inconsistent state in the child. See also #1391.
2021 ACQUIRE_LOCK(&sched_mutex);
2022 ACQUIRE_LOCK(&sm_mutex);
2023 ACQUIRE_LOCK(&stable_ptr_mutex);
2024 ACQUIRE_LOCK(&stable_name_mutex);
2025 ACQUIRE_LOCK(&task->lock);
2026
2027 for (i=0; i < n_capabilities; i++) {
2028 ACQUIRE_LOCK(&capabilities[i]->lock);
2029 }
2030
2031 #if defined(THREADED_RTS)
2032 ACQUIRE_LOCK(&all_tasks_mutex);
2033 #endif
2034
2035 stopTimer(); // See #4074
2036
2037 #if defined(TRACING)
2038 flushEventLog(); // so that child won't inherit dirty file buffers
2039 #endif
2040
2041 pid = fork();
2042
2043 if (pid) { // parent
2044
2045 startTimer(); // #4074
2046
2047 RELEASE_LOCK(&sched_mutex);
2048 RELEASE_LOCK(&sm_mutex);
2049 RELEASE_LOCK(&stable_ptr_mutex);
2050 RELEASE_LOCK(&stable_name_mutex);
2051 RELEASE_LOCK(&task->lock);
2052
2053 #if defined(THREADED_RTS)
2054 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2055 RELEASE_LOCK(&all_tasks_mutex);
2056 #endif
2057
2058 for (i=0; i < n_capabilities; i++) {
2059 releaseCapability_(capabilities[i],false);
2060 RELEASE_LOCK(&capabilities[i]->lock);
2061 }
2062
2063 boundTaskExiting(task);
2064
2065 // just return the pid
2066 return pid;
2067
2068 } else { // child
2069
2070 // Current process times reset in the child process, so we should reset
2071 // the stats too. See #16102.
2072 resetChildProcessStats();
2073
2074 #if defined(THREADED_RTS)
2075 initMutex(&sched_mutex);
2076 initMutex(&sm_mutex);
2077 initMutex(&stable_ptr_mutex);
2078 initMutex(&stable_name_mutex);
2079 initMutex(&task->lock);
2080
2081 for (i=0; i < n_capabilities; i++) {
2082 initMutex(&capabilities[i]->lock);
2083 }
2084
2085 initMutex(&all_tasks_mutex);
2086 #endif
2087
2088 #if defined(TRACING)
2089 resetTracing();
2090 #endif
2091
2092 // Now, all OS threads except the thread that forked are
2093 // stopped. We need to stop all Haskell threads, including
2094 // those involved in foreign calls. Also we need to delete
2095 // all Tasks, because they correspond to OS threads that are
2096 // now gone.
2097
2098 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2099 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2100 next = t->global_link;
2101 // don't allow threads to catch the ThreadKilled
2102 // exception, but we do want to raiseAsync() because these
2103 // threads may be evaluating thunks that we need later.
2104 deleteThread_(t);
2105
2106 // stop the GC from updating the InCall to point to
2107 // the TSO. This is only necessary because the
2108 // OSThread bound to the TSO has been killed, and
2109 // won't get a chance to exit in the usual way (see
2110 // also scheduleHandleThreadFinished).
2111 t->bound = NULL;
2112 }
2113 }
2114
2115 discardTasksExcept(task);
2116
2117 for (i=0; i < n_capabilities; i++) {
2118 cap = capabilities[i];
2119
2120 // Empty the run queue. It seems tempting to let all the
2121 // killed threads stay on the run queue as zombies to be
2122 // cleaned up later, but some of them may correspond to
2123 // bound threads for which the corresponding Task does not
2124 // exist.
2125 truncateRunQueue(cap);
2126 cap->n_run_queue = 0;
2127
2128 // Any suspended C-calling Tasks are no more, their OS threads
2129 // don't exist now:
2130 cap->suspended_ccalls = NULL;
2131 cap->n_suspended_ccalls = 0;
2132
2133 #if defined(THREADED_RTS)
2134 // Wipe our spare workers list, they no longer exist. New
2135 // workers will be created if necessary.
2136 cap->spare_workers = NULL;
2137 cap->n_spare_workers = 0;
2138 cap->returning_tasks_hd = NULL;
2139 cap->returning_tasks_tl = NULL;
2140 cap->n_returning_tasks = 0;
2141 #endif
2142
2143 // Release all caps except 0, we'll use that for starting
2144 // the IO manager and running the client action below.
2145 if (cap->no != 0) {
2146 task->cap = cap;
2147 releaseCapability(cap);
2148 }
2149 }
2150 cap = capabilities[0];
2151 task->cap = cap;
2152
2153 // Empty the threads lists. Otherwise, the garbage
2154 // collector may attempt to resurrect some of these threads.
2155 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2156 generations[g].threads = END_TSO_QUEUE;
2157 }
2158
2159 // On Unix, all timers are reset in the child, so we need to start
2160 // the timer again.
2161 initTimer();
2162 startTimer();
2163
2164 // TODO: need to trace various other things in the child
2165 // like startup event, capabilities, process info etc
2166 traceTaskCreate(task, cap);
2167
2168 #if defined(THREADED_RTS)
2169 ioManagerStartCap(&cap);
2170 #endif
2171
2172 // Install toplevel exception handlers, so interruption
2173 // signal will be sent to the main thread.
2174 // See #12903
2175 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2176 rts_checkSchedStatus("forkProcess",cap);
2177
2178 rts_unlock(cap);
2179 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2180 }
2181 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2182 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2183 #endif
2184 }
2185
2186 /* ---------------------------------------------------------------------------
2187 * Changing the number of Capabilities
2188 *
2189 * Changing the number of Capabilities is very tricky! We can only do
2190 * it with the system fully stopped, so we do a full sync with
2191 * requestSync(SYNC_OTHER) and grab all the capabilities.
2192 *
2193 * Then we resize the appropriate data structures, and update all
2194 * references to the old data structures which have now moved.
2195 * Finally we release the Capabilities we are holding, and start
2196 * worker Tasks on the new Capabilities we created.
2197 *
2198 * ------------------------------------------------------------------------- */
2199
2200 void
2201 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2202 {
2203 #if !defined(THREADED_RTS)
2204 if (new_n_capabilities != 1) {
2205 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2206 }
2207 return;
2208 #elif defined(NOSMP)
2209 if (new_n_capabilities != 1) {
2210 errorBelch("setNumCapabilities: not supported on this platform");
2211 }
2212 return;
2213 #else
2214 Task *task;
2215 Capability *cap;
2216 uint32_t n;
2217 Capability *old_capabilities = NULL;
2218 uint32_t old_n_capabilities = n_capabilities;
2219
2220 if (new_n_capabilities == enabled_capabilities) {
2221 return;
2222 } else if (new_n_capabilities <= 0) {
2223 errorBelch("setNumCapabilities: Capability count must be positive");
2224 return;
2225 }
2226
2227
2228 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2229 enabled_capabilities, new_n_capabilities);
2230
2231 cap = rts_lock();
2232 task = cap->running_task;
2233
2234 stopAllCapabilities(&cap, task);
2235
2236 if (new_n_capabilities < enabled_capabilities)
2237 {
2238 // Reducing the number of capabilities: we do not actually
2239 // remove the extra capabilities, we just mark them as
2240 // "disabled". This has the following effects:
2241 //
2242 // - threads on a disabled capability are migrated away by the
2243 // scheduler loop
2244 //
2245 // - disabled capabilities do not participate in GC
2246 // (see scheduleDoGC())
2247 //
2248 // - No spark threads are created on this capability
2249 // (see scheduleActivateSpark())
2250 //
2251 // - We do not attempt to migrate threads *to* a disabled
2252 // capability (see schedulePushWork()).
2253 //
2254 // but in other respects, a disabled capability remains
2255 // alive. Threads may be woken up on a disabled capability,
2256 // but they will be immediately migrated away.
2257 //
2258 // This approach is much easier than trying to actually remove
2259 // the capability; we don't have to worry about GC data
2260 // structures, the nursery, etc.
2261 //
2262 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2263 capabilities[n]->disabled = true;
2264 traceCapDisable(capabilities[n]);
2265 }
2266 enabled_capabilities = new_n_capabilities;
2267 }
2268 else
2269 {
2270 // Increasing the number of enabled capabilities.
2271 //
2272 // enable any disabled capabilities, up to the required number
2273 for (n = enabled_capabilities;
2274 n < new_n_capabilities && n < n_capabilities; n++) {
2275 capabilities[n]->disabled = false;
2276 traceCapEnable(capabilities[n]);
2277 }
2278 enabled_capabilities = n;
2279
2280 if (new_n_capabilities > n_capabilities) {
2281 #if defined(TRACING)
2282 // Allocate eventlog buffers for the new capabilities. Note this
2283 // must be done before calling moreCapabilities(), because that
2284 // will emit events about creating the new capabilities and adding
2285 // them to existing capsets.
2286 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2287 #endif
2288
2289 // Resize the capabilities array
2290 // NB. after this, capabilities points somewhere new. Any pointers
2291 // of type (Capability *) are now invalid.
2292 moreCapabilities(n_capabilities, new_n_capabilities);
2293
2294 // Resize and update storage manager data structures
2295 storageAddCapabilities(n_capabilities, new_n_capabilities);
2296 }
2297 }
2298
2299 // update n_capabilities before things start running
2300 if (new_n_capabilities > n_capabilities) {
2301 n_capabilities = enabled_capabilities = new_n_capabilities;
2302 }
2303
2304 // We're done: release the original Capabilities
2305 releaseAllCapabilities(old_n_capabilities, cap,task);
2306
2307 // We can't free the old array until now, because we access it
2308 // while updating pointers in updateCapabilityRefs().
2309 if (old_capabilities) {
2310 stgFree(old_capabilities);
2311 }
2312
2313 // Notify IO manager that the number of capabilities has changed.
2314 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2315
2316 rts_unlock(cap);
2317
2318 #endif // THREADED_RTS
2319 }
2320
2321
2322
2323 /* ---------------------------------------------------------------------------
2324 * Delete all the threads in the system
2325 * ------------------------------------------------------------------------- */
2326
2327 static void
2328 deleteAllThreads ()
2329 {
2330 // NOTE: only safe to call if we own all capabilities.
2331
2332 StgTSO* t, *next;
2333 uint32_t g;
2334
2335 debugTrace(DEBUG_sched,"deleting all threads");
2336 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2337 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2338 next = t->global_link;
2339 deleteThread(t);
2340 }
2341 }
2342
2343 // The run queue now contains a bunch of ThreadKilled threads. We
2344 // must not throw these away: the main thread(s) will be in there
2345 // somewhere, and the main scheduler loop has to deal with it.
2346 // Also, the run queue is the only thing keeping these threads from
2347 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2348
2349 #if !defined(THREADED_RTS)
2350 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2351 ASSERT(sleeping_queue == END_TSO_QUEUE);
2352 #endif
2353 }
2354
2355 /* -----------------------------------------------------------------------------
2356 Managing the suspended_ccalls list.
2357 Locks required: sched_mutex
2358 -------------------------------------------------------------------------- */
2359
2360 STATIC_INLINE void
2361 suspendTask (Capability *cap, Task *task)
2362 {
2363 InCall *incall;
2364
2365 incall = task->incall;
2366 ASSERT(incall->next == NULL && incall->prev == NULL);
2367 incall->next = cap->suspended_ccalls;
2368 incall->prev = NULL;
2369 if (cap->suspended_ccalls) {
2370 cap->suspended_ccalls->prev = incall;
2371 }
2372 cap->suspended_ccalls = incall;
2373 cap->n_suspended_ccalls++;
2374 }
2375
2376 STATIC_INLINE void
2377 recoverSuspendedTask (Capability *cap, Task *task)
2378 {
2379 InCall *incall;
2380
2381 incall = task->incall;
2382 if (incall->prev) {
2383 incall->prev->next = incall->next;
2384 } else {
2385 ASSERT(cap->suspended_ccalls == incall);
2386 cap->suspended_ccalls = incall->next;
2387 }
2388 if (incall->next) {
2389 incall->next->prev = incall->prev;
2390 }
2391 incall->next = incall->prev = NULL;
2392 cap->n_suspended_ccalls--;
2393 }
2394
2395 /* ---------------------------------------------------------------------------
2396 * Suspending & resuming Haskell threads.
2397 *
2398 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2399 * its capability before calling the C function. This allows another
2400 * task to pick up the capability and carry on running Haskell
2401 * threads. It also means that if the C call blocks, it won't lock
2402 * the whole system.
2403 *
2404 * The Haskell thread making the C call is put to sleep for the
2405 * duration of the call, on the suspended_ccalling_threads queue. We
2406 * give out a token to the task, which it can use to resume the thread
2407 * on return from the C function.
2408 *
2409 * If this is an interruptible C call, this means that the FFI call may be
2410 * unceremoniously terminated and should be scheduled on an
2411 * unbound worker thread.
2412 * ------------------------------------------------------------------------- */
2413
2414 void *
2415 suspendThread (StgRegTable *reg, bool interruptible)
2416 {
2417 Capability *cap;
2418 int saved_errno;
2419 StgTSO *tso;
2420 Task *task;
2421 #if defined(mingw32_HOST_OS)
2422 StgWord32 saved_winerror;
2423 #endif
2424
2425 saved_errno = errno;
2426 #if defined(mingw32_HOST_OS)
2427 saved_winerror = GetLastError();
2428 #endif
2429
2430 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2431 */
2432 cap = regTableToCapability(reg);
2433
2434 task = cap->running_task;
2435 tso = cap->r.rCurrentTSO;
2436
2437 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2438
2439 // XXX this might not be necessary --SDM
2440 tso->what_next = ThreadRunGHC;
2441
2442 threadPaused(cap,tso);
2443
2444 if (interruptible) {
2445 tso->why_blocked = BlockedOnCCall_Interruptible;
2446 } else {
2447 tso->why_blocked = BlockedOnCCall;
2448 }
2449
2450 // Hand back capability
2451 task->incall->suspended_tso = tso;
2452 task->incall->suspended_cap = cap;
2453
2454 // Otherwise allocate() will write to invalid memory.
2455 cap->r.rCurrentTSO = NULL;
2456
2457 ACQUIRE_LOCK(&cap->lock);
2458
2459 suspendTask(cap,task);
2460 cap->in_haskell = false;
2461 releaseCapability_(cap,false);
2462
2463 RELEASE_LOCK(&cap->lock);
2464
2465 errno = saved_errno;
2466 #if defined(mingw32_HOST_OS)
2467 SetLastError(saved_winerror);
2468 #endif
2469 return task;
2470 }
2471
2472 StgRegTable *
2473 resumeThread (void *task_)
2474 {
2475 StgTSO *tso;
2476 InCall *incall;
2477 Capability *cap;
2478 Task *task = task_;
2479 int saved_errno;
2480 #if defined(mingw32_HOST_OS)
2481 StgWord32 saved_winerror;
2482 #endif
2483
2484 saved_errno = errno;
2485 #if defined(mingw32_HOST_OS)
2486 saved_winerror = GetLastError();
2487 #endif
2488
2489 incall = task->incall;
2490 cap = incall->suspended_cap;
2491 task->cap = cap;
2492
2493 // Wait for permission to re-enter the RTS with the result.
2494 waitForCapability(&cap,task);
2495 // we might be on a different capability now... but if so, our
2496 // entry on the suspended_ccalls list will also have been
2497 // migrated.
2498
2499 // Remove the thread from the suspended list
2500 recoverSuspendedTask(cap,task);
2501
2502 tso = incall->suspended_tso;
2503 incall->suspended_tso = NULL;
2504 incall->suspended_cap = NULL;
2505 // we will modify tso->_link
2506 if (RTS_UNLIKELY(nonmoving_write_barrier_enabled)) {
2507 updateRemembSetPushClosure(cap, (StgClosure *)tso->_link);
2508 }
2509 tso->_link = END_TSO_QUEUE;
2510
2511 traceEventRunThread(cap, tso);
2512
2513 /* Reset blocking status */
2514 tso->why_blocked = NotBlocked;
2515
2516 if ((tso->flags & TSO_BLOCKEX) == 0) {
2517 // avoid locking the TSO if we don't have to
2518 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2519 maybePerformBlockedException(cap,tso);
2520 }
2521 }
2522
2523 cap->r.rCurrentTSO = tso;
2524 cap->in_haskell = true;
2525 errno = saved_errno;
2526 #if defined(mingw32_HOST_OS)
2527 SetLastError(saved_winerror);
2528 #endif
2529
2530 /* We might have GC'd, mark the TSO dirty again */
2531 dirty_TSO(cap,tso);
2532 dirty_STACK(cap,tso->stackobj);
2533
2534 IF_DEBUG(sanity, checkTSO(tso));
2535
2536 return &cap->r;
2537 }
2538
2539 /* ---------------------------------------------------------------------------
2540 * scheduleThread()
2541 *
2542 * scheduleThread puts a thread on the end of the runnable queue.
2543 * This will usually be done immediately after a thread is created.
2544 * The caller of scheduleThread must create the thread using e.g.
2545 * createThread and push an appropriate closure
2546 * on this thread's stack before the scheduler is invoked.
2547 * ------------------------------------------------------------------------ */
2548
2549 void
2550 scheduleThread(Capability *cap, StgTSO *tso)
2551 {
2552 // The thread goes at the *end* of the run-queue, to avoid possible
2553 // starvation of any threads already on the queue.
2554 appendToRunQueue(cap,tso);
2555 }
2556
2557 void
2558 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2559 {
2560 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2561 // move this thread from now on.
2562 #if defined(THREADED_RTS)
2563 cpu %= enabled_capabilities;
2564 if (cpu == cap->no) {
2565 appendToRunQueue(cap,tso);
2566 } else {
2567 migrateThread(cap, tso, capabilities[cpu]);
2568 }
2569 #else
2570 appendToRunQueue(cap,tso);
2571 #endif
2572 }
2573
2574 void
2575 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2576 {
2577 Task *task;
2578 DEBUG_ONLY( StgThreadID id );
2579 Capability *cap;
2580
2581 cap = *pcap;
2582
2583 // We already created/initialised the Task
2584 task = cap->running_task;
2585
2586 // This TSO is now a bound thread; make the Task and TSO
2587 // point to each other.
2588 tso->bound = task->incall;
2589 tso->cap = cap;
2590
2591 task->incall->tso = tso;
2592 task->incall->ret = ret;
2593 task->incall->rstat = NoStatus;
2594
2595 appendToRunQueue(cap,tso);
2596
2597 DEBUG_ONLY( id = tso->id );
2598 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2599
2600 cap = schedule(cap,task);
2601
2602 ASSERT(task->incall->rstat != NoStatus);
2603 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2604
2605 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2606 *pcap = cap;
2607 }
2608
2609 /* ----------------------------------------------------------------------------
2610 * Starting Tasks
2611 * ------------------------------------------------------------------------- */
2612
2613 #if defined(THREADED_RTS)
2614 void scheduleWorker (Capability *cap, Task *task)
2615 {
2616 // schedule() runs without a lock.
2617 cap = schedule(cap,task);
2618
2619 // On exit from schedule(), we have a Capability, but possibly not
2620 // the same one we started with.
2621
2622 // During shutdown, the requirement is that after all the
2623 // Capabilities are shut down, all workers that are shutting down
2624 // have finished workerTaskStop(). This is why we hold on to
2625 // cap->lock until we've finished workerTaskStop() below.
2626 //
2627 // There may be workers still involved in foreign calls; those
2628 // will just block in waitForCapability() because the
2629 // Capability has been shut down.
2630 //
2631 ACQUIRE_LOCK(&cap->lock);
2632 releaseCapability_(cap,false);
2633 workerTaskStop(task);
2634 RELEASE_LOCK(&cap->lock);
2635 }
2636 #endif
2637
2638 /* ---------------------------------------------------------------------------
2639 * Start new worker tasks on Capabilities from--to
2640 * -------------------------------------------------------------------------- */
2641
2642 static void
2643 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2644 {
2645 #if defined(THREADED_RTS)
2646 uint32_t i;
2647 Capability *cap;
2648
2649 for (i = from; i < to; i++) {
2650 cap = capabilities[i];
2651 ACQUIRE_LOCK(&cap->lock);
2652 startWorkerTask(cap);
2653 RELEASE_LOCK(&cap->lock);
2654 }
2655 #endif
2656 }
2657
2658 /* ---------------------------------------------------------------------------
2659 * initScheduler()
2660 *
2661 * Initialise the scheduler. This resets all the queues - if the
2662 * queues contained any threads, they'll be garbage collected at the
2663 * next pass.
2664 *
2665 * ------------------------------------------------------------------------ */
2666
2667 void
2668 initScheduler(void)
2669 {
2670 #if !defined(THREADED_RTS)
2671 blocked_queue_hd = END_TSO_QUEUE;
2672 blocked_queue_tl = END_TSO_QUEUE;
2673 sleeping_queue = END_TSO_QUEUE;
2674 #endif
2675
2676 sched_state = SCHED_RUNNING;
2677 recent_activity = ACTIVITY_YES;
2678
2679 #if defined(THREADED_RTS)
2680 /* Initialise the mutex and condition variables used by
2681 * the scheduler. */
2682 initMutex(&sched_mutex);
2683 initMutex(&sync_finished_mutex);
2684 initCondition(&sync_finished_cond);
2685 #endif
2686
2687 ACQUIRE_LOCK(&sched_mutex);
2688
2689 allocated_bytes_at_heapoverflow = 0;
2690
2691 /* A capability holds the state a native thread needs in
2692 * order to execute STG code. At least one capability is
2693 * floating around (only THREADED_RTS builds have more than one).
2694 */
2695 initCapabilities();
2696
2697 initTaskManager();
2698
2699 /*
2700 * Eagerly start one worker to run each Capability, except for
2701 * Capability 0. The idea is that we're probably going to start a
2702 * bound thread on Capability 0 pretty soon, so we don't want a
2703 * worker task hogging it.
2704 */
2705 startWorkerTasks(1, n_capabilities);
2706
2707 RELEASE_LOCK(&sched_mutex);
2708
2709 }
2710
2711 void
2712 exitScheduler (bool wait_foreign USED_IF_THREADS)
2713 /* see Capability.c, shutdownCapability() */
2714 {
2715 Task *task = newBoundTask();
2716
2717 // If we haven't killed all the threads yet, do it now.
2718 if (sched_state < SCHED_SHUTTING_DOWN) {
2719 sched_state = SCHED_INTERRUPTING;
2720 nonmovingExit();
2721 Capability *cap = task->cap;
2722 waitForCapability(&cap,task);
2723 scheduleDoGC(&cap,task,true,false);
2724 ASSERT(task->incall->tso == NULL);
2725 releaseCapability(cap);
2726 }
2727 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2728
2729 shutdownCapabilities(task, wait_foreign);
2730
2731 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2732 // n_failed_trygrab_idles, n_idle_caps);
2733
2734 boundTaskExiting(task);
2735 }
2736
2737 void
2738 freeScheduler( void )
2739 {
2740 uint32_t still_running;
2741
2742 ACQUIRE_LOCK(&sched_mutex);
2743 still_running = freeTaskManager();
2744 // We can only free the Capabilities if there are no Tasks still
2745 // running. We might have a Task about to return from a foreign
2746 // call into waitForCapability(), for example (actually,
2747 // this should be the *only* thing that a still-running Task can
2748 // do at this point, and it will block waiting for the
2749 // Capability).
2750 if (still_running == 0) {
2751 freeCapabilities();
2752 }
2753 RELEASE_LOCK(&sched_mutex);
2754 #if defined(THREADED_RTS)
2755 closeMutex(&sched_mutex);
2756 #endif
2757 }
2758
2759 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2760 void *user USED_IF_NOT_THREADS)
2761 {
2762 #if !defined(THREADED_RTS)
2763 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2764 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2765 evac(user, (StgClosure **)(void *)&sleeping_queue);
2766 #endif
2767 }
2768
2769 /* -----------------------------------------------------------------------------
2770 performGC
2771
2772 This is the interface to the garbage collector from Haskell land.
2773 We provide this so that external C code can allocate and garbage
2774 collect when called from Haskell via _ccall_GC.
2775 -------------------------------------------------------------------------- */
2776
2777 static void
2778 performGC_(bool force_major)
2779 {
2780 Task *task;
2781 Capability *cap = NULL;
2782
2783 // We must grab a new Task here, because the existing Task may be
2784 // associated with a particular Capability, and chained onto the
2785 // suspended_ccalls queue.
2786 task = newBoundTask();
2787
2788 // TODO: do we need to traceTask*() here?
2789
2790 waitForCapability(&cap,task);
2791 scheduleDoGC(&cap,task,force_major,false);
2792 releaseCapability(cap);
2793 boundTaskExiting(task);
2794 }
2795
2796 void
2797 performGC(void)
2798 {
2799 performGC_(false);
2800 }
2801
2802 void
2803 performMajorGC(void)
2804 {
2805 performGC_(true);
2806 }
2807
2808 /* ---------------------------------------------------------------------------
2809 Interrupt execution.
2810 Might be called inside a signal handler so it mustn't do anything fancy.
2811 ------------------------------------------------------------------------ */
2812
2813 void
2814 interruptStgRts(void)
2815 {
2816 ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2817 sched_state = SCHED_INTERRUPTING;
2818 interruptAllCapabilities();
2819 #if defined(THREADED_RTS)
2820 wakeUpRts();
2821 #endif
2822 }
2823
2824 /* -----------------------------------------------------------------------------
2825 Wake up the RTS
2826
2827 This function causes at least one OS thread to wake up and run the
2828 scheduler loop. It is invoked when the RTS might be deadlocked, or
2829 an external event has arrived that may need servicing (eg. a
2830 keyboard interrupt).
2831
2832 In the single-threaded RTS we don't do anything here; we only have
2833 one thread anyway, and the event that caused us to want to wake up
2834 will have interrupted any blocking system call in progress anyway.
2835 -------------------------------------------------------------------------- */
2836
2837 #if defined(THREADED_RTS)
2838 void wakeUpRts(void)
2839 {
2840 // This forces the IO Manager thread to wakeup, which will
2841 // in turn ensure that some OS thread wakes up and runs the
2842 // scheduler loop, which will cause a GC and deadlock check.
2843 ioManagerWakeup();
2844 }
2845 #endif
2846
2847 /* -----------------------------------------------------------------------------
2848 Deleting threads
2849
2850 This is used for interruption (^C) and forking, and corresponds to
2851 raising an exception but without letting the thread catch the
2852 exception.
2853 -------------------------------------------------------------------------- */
2854
2855 static void
2856 deleteThread (StgTSO *tso)
2857 {
2858 // NOTE: must only be called on a TSO that we have exclusive
2859 // access to, because we will call throwToSingleThreaded() below.
2860 // The TSO must be on the run queue of the Capability we own, or
2861 // we must own all Capabilities.
2862
2863 if (tso->why_blocked != BlockedOnCCall &&
2864 tso->why_blocked != BlockedOnCCall_Interruptible) {
2865 throwToSingleThreaded(tso->cap,tso,NULL);
2866 }
2867 }
2868
2869 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2870 static void
2871 deleteThread_(StgTSO *tso)
2872 { // for forkProcess only:
2873 // like deleteThread(), but we delete threads in foreign calls, too.
2874
2875 if (tso->why_blocked == BlockedOnCCall ||
2876 tso->why_blocked == BlockedOnCCall_Interruptible) {
2877 tso->what_next = ThreadKilled;
2878 appendToRunQueue(tso->cap, tso);
2879 } else {
2880 deleteThread(tso);
2881 }
2882 }
2883 #endif
2884
2885 /* -----------------------------------------------------------------------------
2886 raiseExceptionHelper
2887
2888 This function is called by the raise# primitive, just so that we can
2889 move some of the tricky bits of raising an exception from C-- into
2890 C. Who knows, it might be a useful re-useable thing here too.
2891 -------------------------------------------------------------------------- */
2892
2893 StgWord
2894 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2895 {
2896 Capability *cap = regTableToCapability(reg);
2897 StgThunk *raise_closure = NULL;
2898 StgPtr p, next;
2899 const StgRetInfoTable *info;
2900 //
2901 // This closure represents the expression 'raise# E' where E
2902 // is the exception raise. It is used to overwrite all the
2903 // thunks which are currently under evaluation.
2904 //
2905
2906 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2907 // LDV profiling: stg_raise_info has THUNK as its closure
2908 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2909 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2910 // 1 does not cause any problem unless profiling is performed.
2911 // However, when LDV profiling goes on, we need to linearly scan
2912 // small object pool, where raise_closure is stored, so we should
2913 // use MIN_UPD_SIZE.
2914 //
2915 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2916 // sizeofW(StgClosure)+1);
2917 //
2918
2919 //
2920 // Walk up the stack, looking for the catch frame. On the way,
2921 // we update any closures pointed to from update frames with the
2922 // raise closure that we just built.
2923 //
2924 p = tso->stackobj->sp;
2925 while(1) {
2926 info = get_ret_itbl((StgClosure *)p);
2927 next = p + stack_frame_sizeW((StgClosure *)p);
2928 switch (info->i.type) {
2929
2930 case UPDATE_FRAME:
2931 // Only create raise_closure if we need to.
2932 if (raise_closure == NULL) {
2933 raise_closure =
2934 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2935 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2936 raise_closure->payload[0] = exception;
2937 }
2938 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2939 (StgClosure *)raise_closure);
2940 p = next;
2941 continue;
2942
2943 case ATOMICALLY_FRAME:
2944 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2945 tso->stackobj->sp = p;
2946 return ATOMICALLY_FRAME;
2947
2948 case CATCH_FRAME:
2949 tso->stackobj->sp = p;
2950 return CATCH_FRAME;
2951
2952 case CATCH_STM_FRAME:
2953 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2954 tso->stackobj->sp = p;
2955 return CATCH_STM_FRAME;
2956
2957 case UNDERFLOW_FRAME:
2958 tso->stackobj->sp = p;
2959 threadStackUnderflow(cap,tso);
2960 p = tso->stackobj->sp;
2961 continue;
2962
2963 case STOP_FRAME:
2964 tso->stackobj->sp = p;
2965 return STOP_FRAME;
2966
2967 case CATCH_RETRY_FRAME: {
2968 StgTRecHeader *trec = tso -> trec;
2969 StgTRecHeader *outer = trec -> enclosing_trec;
2970 debugTrace(DEBUG_stm,
2971 "found CATCH_RETRY_FRAME at %p during raise", p);
2972 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2973 stmAbortTransaction(cap, trec);
2974 stmFreeAbortedTRec(cap, trec);
2975 tso -> trec = outer;
2976 p = next;
2977 continue;
2978 }
2979
2980 default:
2981 p = next;
2982 continue;
2983 }
2984 }
2985 }
2986
2987
2988 /* -----------------------------------------------------------------------------
2989 findRetryFrameHelper
2990
2991 This function is called by the retry# primitive. It traverses the stack
2992 leaving tso->sp referring to the frame which should handle the retry.
2993
2994 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2995 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2996
2997 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2998 create) because retries are not considered to be exceptions, despite the
2999 similar implementation.
3000
3001 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3002 not be created within memory transactions.
3003 -------------------------------------------------------------------------- */
3004
3005 StgWord
3006 findRetryFrameHelper (Capability *cap, StgTSO *tso)
3007 {
3008 const StgRetInfoTable *info;
3009 StgPtr p, next;
3010
3011 p = tso->stackobj->sp;
3012 while (1) {
3013 info = get_ret_itbl((const StgClosure *)p);
3014 next = p + stack_frame_sizeW((StgClosure *)p);
3015 switch (info->i.type) {
3016
3017 case ATOMICALLY_FRAME:
3018 debugTrace(DEBUG_stm,
3019 "found ATOMICALLY_FRAME at %p during retry", p);
3020 tso->stackobj->sp = p;
3021 return ATOMICALLY_FRAME;
3022
3023 case CATCH_RETRY_FRAME:
3024 debugTrace(DEBUG_stm,
3025 "found CATCH_RETRY_FRAME at %p during retry", p);
3026 tso->stackobj->sp = p;
3027 return CATCH_RETRY_FRAME;
3028
3029 case CATCH_STM_FRAME: {
3030 StgTRecHeader *trec = tso -> trec;
3031 StgTRecHeader *outer = trec -> enclosing_trec;
3032 debugTrace(DEBUG_stm,
3033 "found CATCH_STM_FRAME at %p during retry", p);
3034 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3035 stmAbortTransaction(cap, trec);
3036 stmFreeAbortedTRec(cap, trec);
3037 tso -> trec = outer;
3038 p = next;
3039 continue;
3040 }
3041
3042 case UNDERFLOW_FRAME:
3043 tso->stackobj->sp = p;
3044 threadStackUnderflow(cap,tso);
3045 p = tso->stackobj->sp;
3046 continue;
3047
3048 default:
3049 ASSERT(info->i.type != CATCH_FRAME);
3050 ASSERT(info->i.type != STOP_FRAME);
3051 p = next;
3052 continue;
3053 }
3054 }
3055 }
3056
3057 /* -----------------------------------------------------------------------------
3058 resurrectThreads is called after garbage collection on the list of
3059 threads found to be garbage. Each of these threads will be woken
3060 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3061 on an MVar, or NonTermination if the thread was blocked on a Black
3062 Hole.
3063
3064 Locks: assumes we hold *all* the capabilities.
3065 -------------------------------------------------------------------------- */
3066
3067 void
3068 resurrectThreads (StgTSO *threads)
3069 {
3070 StgTSO *tso, *next;
3071 Capability *cap;
3072 generation *gen;
3073
3074 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3075 next = tso->global_link;
3076
3077 gen = Bdescr((P_)tso)->gen;
3078 tso->global_link = gen->threads;
3079 gen->threads = tso;
3080
3081 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3082
3083 // Wake up the thread on the Capability it was last on
3084 cap = tso->cap;
3085
3086 switch (tso->why_blocked) {
3087 case BlockedOnMVar:
3088 case BlockedOnMVarRead:
3089 /* Called by GC - sched_mutex lock is currently held. */
3090 throwToSingleThreaded(cap, tso,
3091 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3092 break;
3093 case BlockedOnBlackHole:
3094 throwToSingleThreaded(cap, tso,
3095 (StgClosure *)nonTermination_closure);
3096 break;
3097 case BlockedOnSTM:
3098 throwToSingleThreaded(cap, tso,
3099 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3100 break;
3101 case NotBlocked:
3102 /* This might happen if the thread was blocked on a black hole
3103 * belonging to a thread that we've just woken up (raiseAsync
3104 * can wake up threads, remember...).
3105 */
3106 continue;
3107 case BlockedOnMsgThrowTo:
3108 // This can happen if the target is masking, blocks on a
3109 // black hole, and then is found to be unreachable. In
3110 // this case, we want to let the target wake up and carry
3111 // on, and do nothing to this thread.
3112 continue;
3113 default:
3114 barf("resurrectThreads: thread blocked in a strange way: %d",
3115 tso->why_blocked);
3116 }
3117 }
3118 }