8d82daf381ea22787066a9491304189f98af7a61
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47 #include "sm/NonMoving.h"
48 #include "sm/NonMovingMark.h"
49
50 #if defined(HAVE_SYS_TYPES_H)
51 #include <sys/types.h>
52 #endif
53 #if defined(HAVE_UNISTD_H)
54 #include <unistd.h>
55 #endif
56
57 #include <string.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60
61 #if defined(HAVE_ERRNO_H)
62 #include <errno.h>
63 #endif
64
65 #if defined(TRACING)
66 #include "eventlog/EventLog.h"
67 #endif
68 /* -----------------------------------------------------------------------------
69 * Global variables
70 * -------------------------------------------------------------------------- */
71
72 #if !defined(THREADED_RTS)
73 // Blocked/sleeping threads
74 StgTSO *blocked_queue_hd = NULL;
75 StgTSO *blocked_queue_tl = NULL;
76 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
77 #endif
78
79 // Bytes allocated since the last time a HeapOverflow exception was thrown by
80 // the RTS
81 uint64_t allocated_bytes_at_heapoverflow = 0;
82
83 /* Set to true when the latest garbage collection failed to reclaim enough
84 * space, and the runtime should proceed to shut itself down in an orderly
85 * fashion (emitting profiling info etc.), OR throw an exception to the main
86 * thread, if it is still alive.
87 */
88 bool heap_overflow = false;
89
90 /* flag that tracks whether we have done any execution in this time slice.
91 * LOCK: currently none, perhaps we should lock (but needs to be
92 * updated in the fast path of the scheduler).
93 *
94 * NB. must be StgWord, we do xchg() on it.
95 */
96 volatile StgWord recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes monotonically)
100 */
101 volatile StgWord sched_state = SCHED_RUNNING;
102
103 /*
104 * This mutex protects most of the global scheduler data in
105 * the THREADED_RTS runtime.
106 */
107 #if defined(THREADED_RTS)
108 Mutex sched_mutex;
109 #endif
110
111 #if !defined(mingw32_HOST_OS)
112 #define FORKPROCESS_PRIMOP_SUPPORTED
113 #endif
114
115 /*
116 * sync_finished_cond allows threads which do not own any capability (e.g. the
117 * concurrent mark thread) to participate in the sync protocol. In particular,
118 * if such a thread requests a sync while sync is already in progress it will
119 * block on sync_finished_cond, which will be signalled when the sync is
120 * finished (by releaseAllCapabilities).
121 */
122 #if defined(THREADED_RTS)
123 static Condition sync_finished_cond;
124 static Mutex sync_finished_mutex;
125 #endif
126
127
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These functions all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability **pcap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 #if defined(THREADED_RTS)
145 static bool requestSync (Capability **pcap, Task *task,
146 PendingSync *sync_type, SyncType *prev_sync_type);
147 static void acquireAllCapabilities(Capability *cap, Task *task);
148 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
149 uint32_t to USED_IF_THREADS);
150 #endif
151 static void scheduleStartSignalHandlers (Capability *cap);
152 static void scheduleCheckBlockedThreads (Capability *cap);
153 static void scheduleProcessInbox(Capability **cap);
154 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
155 static void schedulePushWork(Capability *cap, Task *task);
156 #if defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
162 uint32_t prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
165 StgTSO *t );
166 static bool scheduleNeedHeapProfile(bool ready_to_gc);
167 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
168
169 static void deleteThread (StgTSO *tso);
170 static void deleteAllThreads (void);
171
172 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
173 static void deleteThread_(StgTSO *tso);
174 #endif
175
176 /* ---------------------------------------------------------------------------
177 Main scheduling loop.
178
179 We use round-robin scheduling, each thread returning to the
180 scheduler loop when one of these conditions is detected:
181
182 * out of heap space
183 * timer expires (thread yields)
184 * thread blocks
185 * thread ends
186 * stack overflow
187
188 ------------------------------------------------------------------------ */
189
190 static Capability *
191 schedule (Capability *initialCapability, Task *task)
192 {
193 StgTSO *t;
194 Capability *cap;
195 StgThreadReturnCode ret;
196 uint32_t prev_what_next;
197 bool ready_to_gc;
198
199 cap = initialCapability;
200
201 // Pre-condition: this task owns initialCapability.
202 // The sched_mutex is *NOT* held
203 // NB. on return, we still hold a capability.
204
205 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
206
207 schedulePreLoop();
208
209 // -----------------------------------------------------------
210 // Scheduler loop starts here:
211
212 while (1) {
213
214 // Check whether we have re-entered the RTS from Haskell without
215 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
216 // call).
217 if (cap->in_haskell) {
218 errorBelch("schedule: re-entered unsafely.\n"
219 " Perhaps a 'foreign import unsafe' should be 'safe'?");
220 stg_exit(EXIT_FAILURE);
221 }
222
223 // Note [shutdown]: The interruption / shutdown sequence.
224 //
225 // In order to cleanly shut down the runtime, we want to:
226 // * make sure that all main threads return to their callers
227 // with the state 'Interrupted'.
228 // * clean up all OS threads assocated with the runtime
229 // * free all memory etc.
230 //
231 // So the sequence goes like this:
232 //
233 // * The shutdown sequence is initiated by calling hs_exit(),
234 // interruptStgRts(), or running out of memory in the GC.
235 //
236 // * Set sched_state = SCHED_INTERRUPTING
237 //
238 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
239 // scheduleDoGC(), which halts the whole runtime by acquiring all the
240 // capabilities, does a GC and then calls deleteAllThreads() to kill all
241 // the remaining threads. The zombies are left on the run queue for
242 // cleaning up. We can't kill threads involved in foreign calls.
243 //
244 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
245 //
246 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
247 // enforced by the scheduler, which won't run any Haskell code when
248 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
249 // other capabilities by doing the GC earlier.
250 //
251 // * all workers exit when the run queue on their capability
252 // drains. All main threads will also exit when their TSO
253 // reaches the head of the run queue and they can return.
254 //
255 // * eventually all Capabilities will shut down, and the RTS can
256 // exit.
257 //
258 // * We might be left with threads blocked in foreign calls,
259 // we should really attempt to kill these somehow (TODO).
260
261 switch (sched_state) {
262 case SCHED_RUNNING:
263 break;
264 case SCHED_INTERRUPTING:
265 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
266 /* scheduleDoGC() deletes all the threads */
267 scheduleDoGC(&cap,task,true);
268
269 // after scheduleDoGC(), we must be shutting down. Either some
270 // other Capability did the final GC, or we did it above,
271 // either way we can fall through to the SCHED_SHUTTING_DOWN
272 // case now.
273 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
274 // fall through
275
276 case SCHED_SHUTTING_DOWN:
277 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
278 // If we are a worker, just exit. If we're a bound thread
279 // then we will exit below when we've removed our TSO from
280 // the run queue.
281 if (!isBoundTask(task) && emptyRunQueue(cap)) {
282 return cap;
283 }
284 break;
285 default:
286 barf("sched_state: %" FMT_Word, sched_state);
287 }
288
289 scheduleFindWork(&cap);
290
291 /* work pushing, currently relevant only for THREADED_RTS:
292 (pushes threads, wakes up idle capabilities for stealing) */
293 schedulePushWork(cap,task);
294
295 scheduleDetectDeadlock(&cap,task);
296
297 // Normally, the only way we can get here with no threads to
298 // run is if a keyboard interrupt received during
299 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
300 // Additionally, it is not fatal for the
301 // threaded RTS to reach here with no threads to run.
302 //
303 // win32: might be here due to awaitEvent() being abandoned
304 // as a result of a console event having been delivered.
305
306 #if defined(THREADED_RTS)
307 scheduleYield(&cap,task);
308
309 if (emptyRunQueue(cap)) continue; // look for work again
310 #endif
311
312 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
313 if ( emptyRunQueue(cap) ) {
314 ASSERT(sched_state >= SCHED_INTERRUPTING);
315 }
316 #endif
317
318 //
319 // Get a thread to run
320 //
321 t = popRunQueue(cap);
322
323 // Sanity check the thread we're about to run. This can be
324 // expensive if there is lots of thread switching going on...
325 IF_DEBUG(sanity,checkTSO(t));
326
327 #if defined(THREADED_RTS)
328 // Check whether we can run this thread in the current task.
329 // If not, we have to pass our capability to the right task.
330 {
331 InCall *bound = t->bound;
332
333 if (bound) {
334 if (bound->task == task) {
335 // yes, the Haskell thread is bound to the current native thread
336 } else {
337 debugTrace(DEBUG_sched,
338 "thread %lu bound to another OS thread",
339 (unsigned long)t->id);
340 // no, bound to a different Haskell thread: pass to that thread
341 pushOnRunQueue(cap,t);
342 continue;
343 }
344 } else {
345 // The thread we want to run is unbound.
346 if (task->incall->tso) {
347 debugTrace(DEBUG_sched,
348 "this OS thread cannot run thread %lu",
349 (unsigned long)t->id);
350 // no, the current native thread is bound to a different
351 // Haskell thread, so pass it to any worker thread
352 pushOnRunQueue(cap,t);
353 continue;
354 }
355 }
356 }
357 #endif
358
359 // If we're shutting down, and this thread has not yet been
360 // killed, kill it now. This sometimes happens when a finalizer
361 // thread is created by the final GC, or a thread previously
362 // in a foreign call returns.
363 if (sched_state >= SCHED_INTERRUPTING &&
364 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
365 deleteThread(t);
366 }
367
368 // If this capability is disabled, migrate the thread away rather
369 // than running it. NB. but not if the thread is bound: it is
370 // really hard for a bound thread to migrate itself. Believe me,
371 // I tried several ways and couldn't find a way to do it.
372 // Instead, when everything is stopped for GC, we migrate all the
373 // threads on the run queue then (see scheduleDoGC()).
374 //
375 // ToDo: what about TSO_LOCKED? Currently we're migrating those
376 // when the number of capabilities drops, but we never migrate
377 // them back if it rises again. Presumably we should, but after
378 // the thread has been migrated we no longer know what capability
379 // it was originally on.
380 #if defined(THREADED_RTS)
381 if (cap->disabled && !t->bound) {
382 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
383 migrateThread(cap, t, dest_cap);
384 continue;
385 }
386 #endif
387
388 /* context switches are initiated by the timer signal, unless
389 * the user specified "context switch as often as possible", with
390 * +RTS -C0
391 */
392 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
393 && !emptyThreadQueues(cap)) {
394 cap->context_switch = 1;
395 }
396
397 run_thread:
398
399 // CurrentTSO is the thread to run. It might be different if we
400 // loop back to run_thread, so make sure to set CurrentTSO after
401 // that.
402 cap->r.rCurrentTSO = t;
403
404 startHeapProfTimer();
405
406 // ----------------------------------------------------------------------
407 // Run the current thread
408
409 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
410 ASSERT(t->cap == cap);
411 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
412
413 prev_what_next = t->what_next;
414
415 errno = t->saved_errno;
416 #if defined(mingw32_HOST_OS)
417 SetLastError(t->saved_winerror);
418 #endif
419
420 // reset the interrupt flag before running Haskell code
421 cap->interrupt = 0;
422
423 cap->in_haskell = true;
424 cap->idle = 0;
425
426 dirty_TSO(cap,t);
427 dirty_STACK(cap,t->stackobj);
428
429 switch (recent_activity)
430 {
431 case ACTIVITY_DONE_GC: {
432 // ACTIVITY_DONE_GC means we turned off the timer signal to
433 // conserve power (see #1623). Re-enable it here.
434 uint32_t prev;
435 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
436 if (prev == ACTIVITY_DONE_GC) {
437 #if !defined(PROFILING)
438 startTimer();
439 #endif
440 }
441 break;
442 }
443 case ACTIVITY_INACTIVE:
444 // If we reached ACTIVITY_INACTIVE, then don't reset it until
445 // we've done the GC. The thread running here might just be
446 // the IO manager thread that handle_tick() woke up via
447 // wakeUpRts().
448 break;
449 default:
450 recent_activity = ACTIVITY_YES;
451 }
452
453 traceEventRunThread(cap, t);
454
455 switch (prev_what_next) {
456
457 case ThreadKilled:
458 case ThreadComplete:
459 /* Thread already finished, return to scheduler. */
460 ret = ThreadFinished;
461 break;
462
463 case ThreadRunGHC:
464 {
465 StgRegTable *r;
466 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
467 cap = regTableToCapability(r);
468 ret = r->rRet;
469 break;
470 }
471
472 case ThreadInterpret:
473 cap = interpretBCO(cap);
474 ret = cap->r.rRet;
475 break;
476
477 default:
478 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
479 }
480
481 cap->in_haskell = false;
482
483 // The TSO might have moved, eg. if it re-entered the RTS and a GC
484 // happened. So find the new location:
485 t = cap->r.rCurrentTSO;
486
487 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
488 // don't want it set when not running a Haskell thread.
489 cap->r.rCurrentTSO = NULL;
490
491 // And save the current errno in this thread.
492 // XXX: possibly bogus for SMP because this thread might already
493 // be running again, see code below.
494 t->saved_errno = errno;
495 #if defined(mingw32_HOST_OS)
496 // Similarly for Windows error code
497 t->saved_winerror = GetLastError();
498 #endif
499
500 if (ret == ThreadBlocked) {
501 if (t->why_blocked == BlockedOnBlackHole) {
502 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
503 traceEventStopThread(cap, t, t->why_blocked + 6,
504 owner != NULL ? owner->id : 0);
505 } else {
506 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
507 }
508 } else {
509 if (ret == StackOverflow) {
510 traceEventStopThread(cap, t, ret, t->tot_stack_size);
511 } else {
512 traceEventStopThread(cap, t, ret, 0);
513 }
514 }
515
516 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517 ASSERT(t->cap == cap);
518
519 // ----------------------------------------------------------------------
520
521 // Costs for the scheduler are assigned to CCS_SYSTEM
522 stopHeapProfTimer();
523 #if defined(PROFILING)
524 cap->r.rCCCS = CCS_SYSTEM;
525 #endif
526
527 schedulePostRunThread(cap,t);
528
529 ready_to_gc = false;
530
531 switch (ret) {
532 case HeapOverflow:
533 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
534 break;
535
536 case StackOverflow:
537 // just adjust the stack for this thread, then pop it back
538 // on the run queue.
539 threadStackOverflow(cap, t);
540 pushOnRunQueue(cap,t);
541 break;
542
543 case ThreadYielding:
544 if (scheduleHandleYield(cap, t, prev_what_next)) {
545 // shortcut for switching between compiler/interpreter:
546 goto run_thread;
547 }
548 break;
549
550 case ThreadBlocked:
551 scheduleHandleThreadBlocked(t);
552 break;
553
554 case ThreadFinished:
555 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
556 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
557 break;
558
559 default:
560 barf("schedule: invalid thread return code %d", (int)ret);
561 }
562
563 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
564 scheduleDoGC(&cap,task,false);
565 }
566 } /* end of while() */
567 }
568
569 /* -----------------------------------------------------------------------------
570 * Run queue operations
571 * -------------------------------------------------------------------------- */
572
573 static void
574 removeFromRunQueue (Capability *cap, StgTSO *tso)
575 {
576 if (tso->block_info.prev == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_hd == tso);
578 cap->run_queue_hd = tso->_link;
579 } else {
580 setTSOLink(cap, tso->block_info.prev, tso->_link);
581 }
582 if (tso->_link == END_TSO_QUEUE) {
583 ASSERT(cap->run_queue_tl == tso);
584 cap->run_queue_tl = tso->block_info.prev;
585 } else {
586 setTSOPrev(cap, tso->_link, tso->block_info.prev);
587 }
588 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
589 cap->n_run_queue--;
590
591 IF_DEBUG(sanity, checkRunQueue(cap));
592 }
593
594 void
595 promoteInRunQueue (Capability *cap, StgTSO *tso)
596 {
597 removeFromRunQueue(cap, tso);
598 pushOnRunQueue(cap, tso);
599 }
600
601 /* ----------------------------------------------------------------------------
602 * Setting up the scheduler loop
603 * ------------------------------------------------------------------------- */
604
605 static void
606 schedulePreLoop(void)
607 {
608 // initialisation for scheduler - what cannot go into initScheduler()
609
610 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
611 win32AllocStack();
612 #endif
613 }
614
615 /* -----------------------------------------------------------------------------
616 * scheduleFindWork()
617 *
618 * Search for work to do, and handle messages from elsewhere.
619 * -------------------------------------------------------------------------- */
620
621 static void
622 scheduleFindWork (Capability **pcap)
623 {
624 scheduleStartSignalHandlers(*pcap);
625
626 scheduleProcessInbox(pcap);
627
628 scheduleCheckBlockedThreads(*pcap);
629
630 #if defined(THREADED_RTS)
631 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
632 #endif
633 }
634
635 #if defined(THREADED_RTS)
636 STATIC_INLINE bool
637 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
638 {
639 // we need to yield this capability to someone else if..
640 // - another thread is initiating a GC, and we didn't just do a GC
641 // (see Note [GC livelock])
642 // - another Task is returning from a foreign call
643 // - the thread at the head of the run queue cannot be run
644 // by this Task (it is bound to another Task, or it is unbound
645 // and this task it bound).
646 //
647 // Note [GC livelock]
648 //
649 // If we are interrupted to do a GC, then we do not immediately do
650 // another one. This avoids a starvation situation where one
651 // Capability keeps forcing a GC and the other Capabilities make no
652 // progress at all.
653
654 return ((pending_sync && !didGcLast) ||
655 cap->n_returning_tasks != 0 ||
656 (!emptyRunQueue(cap) && (task->incall->tso == NULL
657 ? peekRunQueue(cap)->bound != NULL
658 : peekRunQueue(cap)->bound != task->incall)));
659 }
660
661 // This is the single place where a Task goes to sleep. There are
662 // two reasons it might need to sleep:
663 // - there are no threads to run
664 // - we need to yield this Capability to someone else
665 // (see shouldYieldCapability())
666 //
667 // Careful: the scheduler loop is quite delicate. Make sure you run
668 // the tests in testsuite/concurrent (all ways) after modifying this,
669 // and also check the benchmarks in nofib/parallel for regressions.
670
671 static void
672 scheduleYield (Capability **pcap, Task *task)
673 {
674 Capability *cap = *pcap;
675 bool didGcLast = false;
676
677 // if we have work, and we don't need to give up the Capability, continue.
678 //
679 if (!shouldYieldCapability(cap,task,false) &&
680 (!emptyRunQueue(cap) ||
681 !emptyInbox(cap) ||
682 sched_state >= SCHED_INTERRUPTING)) {
683 return;
684 }
685
686 // otherwise yield (sleep), and keep yielding if necessary.
687 do {
688 if (doIdleGCWork(cap, false)) {
689 // there's more idle GC work to do
690 didGcLast = false;
691 } else {
692 // no more idle GC work to do
693 didGcLast = yieldCapability(&cap,task, !didGcLast);
694 }
695 }
696 while (shouldYieldCapability(cap,task,didGcLast));
697
698 // note there may still be no threads on the run queue at this
699 // point, the caller has to check.
700
701 *pcap = cap;
702 return;
703 }
704 #endif
705
706 /* -----------------------------------------------------------------------------
707 * schedulePushWork()
708 *
709 * Push work to other Capabilities if we have some.
710 * -------------------------------------------------------------------------- */
711
712 static void
713 schedulePushWork(Capability *cap USED_IF_THREADS,
714 Task *task USED_IF_THREADS)
715 {
716 #if defined(THREADED_RTS)
717
718 Capability *free_caps[n_capabilities], *cap0;
719 uint32_t i, n_wanted_caps, n_free_caps;
720
721 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
722
723 // migration can be turned off with +RTS -qm
724 if (!RtsFlags.ParFlags.migrate) {
725 spare_threads = 0;
726 }
727
728 // Figure out how many capabilities we want to wake up. We need at least
729 // sparkPoolSize(cap) plus the number of spare threads we have.
730 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
731 if (n_wanted_caps == 0) return;
732
733 // First grab as many free Capabilities as we can. ToDo: we should use
734 // capabilities on the same NUMA node preferably, but not exclusively.
735 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
736 n_free_caps < n_wanted_caps && i != cap->no;
737 i = (i + 1) % n_capabilities) {
738 cap0 = capabilities[i];
739 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
740 if (!emptyRunQueue(cap0)
741 || cap0->n_returning_tasks != 0
742 || !emptyInbox(cap0)) {
743 // it already has some work, we just grabbed it at
744 // the wrong moment. Or maybe it's deadlocked!
745 releaseCapability(cap0);
746 } else {
747 free_caps[n_free_caps++] = cap0;
748 }
749 }
750 }
751
752 // We now have n_free_caps free capabilities stashed in
753 // free_caps[]. Attempt to share our run queue equally with them.
754 // This is complicated slightly by the fact that we can't move
755 // some threads:
756 //
757 // - threads that have TSO_LOCKED cannot migrate
758 // - a thread that is bound to the current Task cannot be migrated
759 //
760 // This is about the simplest thing we could do; improvements we
761 // might want to do include:
762 //
763 // - giving high priority to moving relatively new threads, on
764 // the gournds that they haven't had time to build up a
765 // working set in the cache on this CPU/Capability.
766 //
767 // - giving low priority to moving long-lived threads
768
769 if (n_free_caps > 0) {
770 StgTSO *prev, *t, *next;
771
772 debugTrace(DEBUG_sched,
773 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
774 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
775 n_free_caps);
776
777 // There are n_free_caps+1 caps in total. We will share the threads
778 // evently between them, *except* that if the run queue does not divide
779 // evenly by n_free_caps+1 then we bias towards the current capability.
780 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
781 uint32_t keep_threads =
782 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
783
784 // This also ensures that we don't give away all our threads, since
785 // (x + y) / (y + 1) >= 1 when x >= 1.
786
787 // The number of threads we have left.
788 uint32_t n = cap->n_run_queue;
789
790 // prev = the previous thread on this cap's run queue
791 prev = END_TSO_QUEUE;
792
793 // We're going to walk through the run queue, migrating threads to other
794 // capabilities until we have only keep_threads left. We might
795 // encounter a thread that cannot be migrated, in which case we add it
796 // to the current run queue and decrement keep_threads.
797 for (t = cap->run_queue_hd, i = 0;
798 t != END_TSO_QUEUE && n > keep_threads;
799 t = next)
800 {
801 next = t->_link;
802 t->_link = END_TSO_QUEUE;
803
804 // Should we keep this thread?
805 if (t->bound == task->incall // don't move my bound thread
806 || tsoLocked(t) // don't move a locked thread
807 ) {
808 if (prev == END_TSO_QUEUE) {
809 cap->run_queue_hd = t;
810 } else {
811 setTSOLink(cap, prev, t);
812 }
813 setTSOPrev(cap, t, prev);
814 prev = t;
815 if (keep_threads > 0) keep_threads--;
816 }
817
818 // Or migrate it?
819 else {
820 appendToRunQueue(free_caps[i],t);
821 traceEventMigrateThread (cap, t, free_caps[i]->no);
822
823 if (t->bound) { t->bound->task->cap = free_caps[i]; }
824 t->cap = free_caps[i];
825 n--; // we have one fewer threads now
826 i++; // move on to the next free_cap
827 if (i == n_free_caps) i = 0;
828 }
829 }
830
831 // Join up the beginning of the queue (prev)
832 // with the rest of the queue (t)
833 if (t == END_TSO_QUEUE) {
834 cap->run_queue_tl = prev;
835 } else {
836 setTSOPrev(cap, t, prev);
837 }
838 if (prev == END_TSO_QUEUE) {
839 cap->run_queue_hd = t;
840 } else {
841 setTSOLink(cap, prev, t);
842 }
843 cap->n_run_queue = n;
844
845 IF_DEBUG(sanity, checkRunQueue(cap));
846
847 // release the capabilities
848 for (i = 0; i < n_free_caps; i++) {
849 task->cap = free_caps[i];
850 if (sparkPoolSizeCap(cap) > 0) {
851 // If we have sparks to steal, wake up a worker on the
852 // capability, even if it has no threads to run.
853 releaseAndWakeupCapability(free_caps[i]);
854 } else {
855 releaseCapability(free_caps[i]);
856 }
857 }
858 }
859 task->cap = cap; // reset to point to our Capability.
860
861 #endif /* THREADED_RTS */
862
863 }
864
865 /* ----------------------------------------------------------------------------
866 * Start any pending signal handlers
867 * ------------------------------------------------------------------------- */
868
869 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
870 static void
871 scheduleStartSignalHandlers(Capability *cap)
872 {
873 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
874 // safe outside the lock
875 startSignalHandlers(cap);
876 }
877 }
878 #else
879 static void
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 {
882 }
883 #endif
884
885 /* ----------------------------------------------------------------------------
886 * Check for blocked threads that can be woken up.
887 * ------------------------------------------------------------------------- */
888
889 static void
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
891 {
892 #if !defined(THREADED_RTS)
893 //
894 // Check whether any waiting threads need to be woken up. If the
895 // run queue is empty, and there are no other tasks running, we
896 // can wait indefinitely for something to happen.
897 //
898 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
899 {
900 awaitEvent (emptyRunQueue(cap));
901 }
902 #endif
903 }
904
905 /* ----------------------------------------------------------------------------
906 * Detect deadlock conditions and attempt to resolve them.
907 * ------------------------------------------------------------------------- */
908
909 static void
910 scheduleDetectDeadlock (Capability **pcap, Task *task)
911 {
912 Capability *cap = *pcap;
913 /*
914 * Detect deadlock: when we have no threads to run, there are no
915 * threads blocked, waiting for I/O, or sleeping, and all the
916 * other tasks are waiting for work, we must have a deadlock of
917 * some description.
918 */
919 if ( emptyThreadQueues(cap) )
920 {
921 #if defined(THREADED_RTS)
922 /*
923 * In the threaded RTS, we only check for deadlock if there
924 * has been no activity in a complete timeslice. This means
925 * we won't eagerly start a full GC just because we don't have
926 * any threads to run currently.
927 */
928 if (recent_activity != ACTIVITY_INACTIVE) return;
929 #endif
930
931 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
932
933 // Garbage collection can release some new threads due to
934 // either (a) finalizers or (b) threads resurrected because
935 // they are unreachable and will therefore be sent an
936 // exception. Any threads thus released will be immediately
937 // runnable.
938 scheduleDoGC (pcap, task, true/*force major GC*/);
939 cap = *pcap;
940 // when force_major == true. scheduleDoGC sets
941 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
942 // signal.
943
944 if ( !emptyRunQueue(cap) ) return;
945
946 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
947 /* If we have user-installed signal handlers, then wait
948 * for signals to arrive rather then bombing out with a
949 * deadlock.
950 */
951 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
952 debugTrace(DEBUG_sched,
953 "still deadlocked, waiting for signals...");
954
955 awaitUserSignals();
956
957 if (signals_pending()) {
958 startSignalHandlers(cap);
959 }
960
961 // either we have threads to run, or we were interrupted:
962 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
963
964 return;
965 }
966 #endif
967
968 #if !defined(THREADED_RTS)
969 /* Probably a real deadlock. Send the current main thread the
970 * Deadlock exception.
971 */
972 if (task->incall->tso) {
973 switch (task->incall->tso->why_blocked) {
974 case BlockedOnSTM:
975 case BlockedOnBlackHole:
976 case BlockedOnMsgThrowTo:
977 case BlockedOnMVar:
978 case BlockedOnMVarRead:
979 throwToSingleThreaded(cap, task->incall->tso,
980 (StgClosure *)nonTermination_closure);
981 return;
982 default:
983 barf("deadlock: main thread blocked in a strange way");
984 }
985 }
986 return;
987 #endif
988 }
989 }
990
991
992 /* ----------------------------------------------------------------------------
993 * Process message in the current Capability's inbox
994 * ------------------------------------------------------------------------- */
995
996 static void
997 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
998 {
999 #if defined(THREADED_RTS)
1000 Message *m, *next;
1001 PutMVar *p, *pnext;
1002 int r;
1003 Capability *cap = *pcap;
1004
1005 while (!emptyInbox(cap)) {
1006 // Executing messages might use heap, so we should check for GC.
1007 if (doYouWantToGC(cap)) {
1008 scheduleDoGC(pcap, cap->running_task, false);
1009 cap = *pcap;
1010 }
1011
1012 // don't use a blocking acquire; if the lock is held by
1013 // another thread then just carry on. This seems to avoid
1014 // getting stuck in a message ping-pong situation with other
1015 // processors. We'll check the inbox again later anyway.
1016 //
1017 // We should really use a more efficient queue data structure
1018 // here. The trickiness is that we must ensure a Capability
1019 // never goes idle if the inbox is non-empty, which is why we
1020 // use cap->lock (cap->lock is released as the last thing
1021 // before going idle; see Capability.c:releaseCapability()).
1022 r = TRY_ACQUIRE_LOCK(&cap->lock);
1023 if (r != 0) return;
1024
1025 m = cap->inbox;
1026 p = cap->putMVars;
1027 cap->inbox = (Message*)END_TSO_QUEUE;
1028 cap->putMVars = NULL;
1029
1030 RELEASE_LOCK(&cap->lock);
1031
1032 while (m != (Message*)END_TSO_QUEUE) {
1033 next = m->link;
1034 executeMessage(cap, m);
1035 m = next;
1036 }
1037
1038 while (p != NULL) {
1039 pnext = p->link;
1040 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1041 Unit_closure);
1042 freeStablePtr(p->mvar);
1043 stgFree(p);
1044 p = pnext;
1045 }
1046 }
1047 #endif
1048 }
1049
1050
1051 /* ----------------------------------------------------------------------------
1052 * Activate spark threads (THREADED_RTS)
1053 * ------------------------------------------------------------------------- */
1054
1055 #if defined(THREADED_RTS)
1056 static void
1057 scheduleActivateSpark(Capability *cap)
1058 {
1059 if (anySparks() && !cap->disabled)
1060 {
1061 createSparkThread(cap);
1062 debugTrace(DEBUG_sched, "creating a spark thread");
1063 }
1064 }
1065 #endif // THREADED_RTS
1066
1067 /* ----------------------------------------------------------------------------
1068 * After running a thread...
1069 * ------------------------------------------------------------------------- */
1070
1071 static void
1072 schedulePostRunThread (Capability *cap, StgTSO *t)
1073 {
1074 // We have to be able to catch transactions that are in an
1075 // infinite loop as a result of seeing an inconsistent view of
1076 // memory, e.g.
1077 //
1078 // atomically $ do
1079 // [a,b] <- mapM readTVar [ta,tb]
1080 // when (a == b) loop
1081 //
1082 // and a is never equal to b given a consistent view of memory.
1083 //
1084 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1085 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1086 debugTrace(DEBUG_sched | DEBUG_stm,
1087 "trec %p found wasting its time", t);
1088
1089 // strip the stack back to the
1090 // ATOMICALLY_FRAME, aborting the (nested)
1091 // transaction, and saving the stack of any
1092 // partially-evaluated thunks on the heap.
1093 throwToSingleThreaded_(cap, t, NULL, true);
1094
1095 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1096 }
1097 }
1098
1099 //
1100 // If the current thread's allocation limit has run out, send it
1101 // the AllocationLimitExceeded exception.
1102
1103 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1104 // Use a throwToSelf rather than a throwToSingleThreaded, because
1105 // it correctly handles the case where the thread is currently
1106 // inside mask. Also the thread might be blocked (e.g. on an
1107 // MVar), and throwToSingleThreaded doesn't unblock it
1108 // correctly in that case.
1109 throwToSelf(cap, t, allocationLimitExceeded_closure);
1110 ASSIGN_Int64((W_*)&(t->alloc_limit),
1111 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1112 }
1113
1114 /* some statistics gathering in the parallel case */
1115 }
1116
1117 /* -----------------------------------------------------------------------------
1118 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1119 * -------------------------------------------------------------------------- */
1120
1121 static bool
1122 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1123 {
1124 if (cap->r.rHpLim == NULL || cap->context_switch) {
1125 // Sometimes we miss a context switch, e.g. when calling
1126 // primitives in a tight loop, MAYBE_GC() doesn't check the
1127 // context switch flag, and we end up waiting for a GC.
1128 // See #1984, and concurrent/should_run/1984
1129 cap->context_switch = 0;
1130 appendToRunQueue(cap,t);
1131 } else {
1132 pushOnRunQueue(cap,t);
1133 }
1134
1135 // did the task ask for a large block?
1136 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1137 // if so, get one and push it on the front of the nursery.
1138 bdescr *bd;
1139 W_ blocks;
1140
1141 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1142
1143 if (blocks > BLOCKS_PER_MBLOCK) {
1144 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1145 }
1146
1147 debugTrace(DEBUG_sched,
1148 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1149 (long)t->id, what_next_strs[t->what_next], blocks);
1150
1151 // don't do this if the nursery is (nearly) full, we'll GC first.
1152 if (cap->r.rCurrentNursery->link != NULL ||
1153 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1154 // infinite loop if the
1155 // nursery has only one
1156 // block.
1157
1158 bd = allocGroupOnNode_lock(cap->node,blocks);
1159 cap->r.rNursery->n_blocks += blocks;
1160
1161 // link the new group after CurrentNursery
1162 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1163
1164 // initialise it as a nursery block. We initialise the
1165 // step, gen_no, and flags field of *every* sub-block in
1166 // this large block, because this is easier than making
1167 // sure that we always find the block head of a large
1168 // block whenever we call Bdescr() (eg. evacuate() and
1169 // isAlive() in the GC would both have to do this, at
1170 // least).
1171 {
1172 bdescr *x;
1173 for (x = bd; x < bd + blocks; x++) {
1174 initBdescr(x,g0,g0);
1175 x->free = x->start;
1176 x->flags = 0;
1177 }
1178 }
1179
1180 // This assert can be a killer if the app is doing lots
1181 // of large block allocations.
1182 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1183
1184 // now update the nursery to point to the new block
1185 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1186 cap->r.rCurrentNursery = bd;
1187
1188 // we might be unlucky and have another thread get on the
1189 // run queue before us and steal the large block, but in that
1190 // case the thread will just end up requesting another large
1191 // block.
1192 return false; /* not actually GC'ing */
1193 }
1194 }
1195
1196 return doYouWantToGC(cap);
1197 /* actual GC is done at the end of the while loop in schedule() */
1198 }
1199
1200 /* -----------------------------------------------------------------------------
1201 * Handle a thread that returned to the scheduler with ThreadYielding
1202 * -------------------------------------------------------------------------- */
1203
1204 static bool
1205 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1206 {
1207 /* put the thread back on the run queue. Then, if we're ready to
1208 * GC, check whether this is the last task to stop. If so, wake
1209 * up the GC thread. getThread will block during a GC until the
1210 * GC is finished.
1211 */
1212
1213 ASSERT(t->_link == END_TSO_QUEUE);
1214
1215 // Shortcut if we're just switching evaluators: just run the thread. See
1216 // Note [avoiding threadPaused] in Interpreter.c.
1217 if (t->what_next != prev_what_next) {
1218 debugTrace(DEBUG_sched,
1219 "--<< thread %ld (%s) stopped to switch evaluators",
1220 (long)t->id, what_next_strs[t->what_next]);
1221 return true;
1222 }
1223
1224 // Reset the context switch flag. We don't do this just before
1225 // running the thread, because that would mean we would lose ticks
1226 // during GC, which can lead to unfair scheduling (a thread hogs
1227 // the CPU because the tick always arrives during GC). This way
1228 // penalises threads that do a lot of allocation, but that seems
1229 // better than the alternative.
1230 if (cap->context_switch != 0) {
1231 cap->context_switch = 0;
1232 appendToRunQueue(cap,t);
1233 } else {
1234 pushOnRunQueue(cap,t);
1235 }
1236
1237 IF_DEBUG(sanity,
1238 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1239 checkTSO(t));
1240
1241 return false;
1242 }
1243
1244 /* -----------------------------------------------------------------------------
1245 * Handle a thread that returned to the scheduler with ThreadBlocked
1246 * -------------------------------------------------------------------------- */
1247
1248 static void
1249 scheduleHandleThreadBlocked( StgTSO *t
1250 #if !defined(DEBUG)
1251 STG_UNUSED
1252 #endif
1253 )
1254 {
1255
1256 // We don't need to do anything. The thread is blocked, and it
1257 // has tidied up its stack and placed itself on whatever queue
1258 // it needs to be on.
1259
1260 // ASSERT(t->why_blocked != NotBlocked);
1261 // Not true: for example,
1262 // - the thread may have woken itself up already, because
1263 // threadPaused() might have raised a blocked throwTo
1264 // exception, see maybePerformBlockedException().
1265
1266 #if defined(DEBUG)
1267 traceThreadStatus(DEBUG_sched, t);
1268 #endif
1269 }
1270
1271 /* -----------------------------------------------------------------------------
1272 * Handle a thread that returned to the scheduler with ThreadFinished
1273 * -------------------------------------------------------------------------- */
1274
1275 static bool
1276 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1277 {
1278 /* Need to check whether this was a main thread, and if so,
1279 * return with the return value.
1280 *
1281 * We also end up here if the thread kills itself with an
1282 * uncaught exception, see Exception.cmm.
1283 */
1284
1285 // blocked exceptions can now complete, even if the thread was in
1286 // blocked mode (see #2910).
1287 awakenBlockedExceptionQueue (cap, t);
1288
1289 //
1290 // Check whether the thread that just completed was a bound
1291 // thread, and if so return with the result.
1292 //
1293 // There is an assumption here that all thread completion goes
1294 // through this point; we need to make sure that if a thread
1295 // ends up in the ThreadKilled state, that it stays on the run
1296 // queue so it can be dealt with here.
1297 //
1298
1299 if (t->bound) {
1300
1301 if (t->bound != task->incall) {
1302 #if !defined(THREADED_RTS)
1303 // Must be a bound thread that is not the topmost one. Leave
1304 // it on the run queue until the stack has unwound to the
1305 // point where we can deal with this. Leaving it on the run
1306 // queue also ensures that the garbage collector knows about
1307 // this thread and its return value (it gets dropped from the
1308 // step->threads list so there's no other way to find it).
1309 appendToRunQueue(cap,t);
1310 return false;
1311 #else
1312 // this cannot happen in the threaded RTS, because a
1313 // bound thread can only be run by the appropriate Task.
1314 barf("finished bound thread that isn't mine");
1315 #endif
1316 }
1317
1318 ASSERT(task->incall->tso == t);
1319
1320 if (t->what_next == ThreadComplete) {
1321 if (task->incall->ret) {
1322 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1323 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1324 }
1325 task->incall->rstat = Success;
1326 } else {
1327 if (task->incall->ret) {
1328 *(task->incall->ret) = NULL;
1329 }
1330 if (sched_state >= SCHED_INTERRUPTING) {
1331 if (heap_overflow) {
1332 task->incall->rstat = HeapExhausted;
1333 } else {
1334 task->incall->rstat = Interrupted;
1335 }
1336 } else {
1337 task->incall->rstat = Killed;
1338 }
1339 }
1340 #if defined(DEBUG)
1341 removeThreadLabel((StgWord)task->incall->tso->id);
1342 #endif
1343
1344 // We no longer consider this thread and task to be bound to
1345 // each other. The TSO lives on until it is GC'd, but the
1346 // task is about to be released by the caller, and we don't
1347 // want anyone following the pointer from the TSO to the
1348 // defunct task (which might have already been
1349 // re-used). This was a real bug: the GC updated
1350 // tso->bound->tso which lead to a deadlock.
1351 t->bound = NULL;
1352 task->incall->tso = NULL;
1353
1354 return true; // tells schedule() to return
1355 }
1356
1357 return false;
1358 }
1359
1360 /* -----------------------------------------------------------------------------
1361 * Perform a heap census
1362 * -------------------------------------------------------------------------- */
1363
1364 static bool
1365 scheduleNeedHeapProfile( bool ready_to_gc )
1366 {
1367 // When we have +RTS -i0 and we're heap profiling, do a census at
1368 // every GC. This lets us get repeatable runs for debugging.
1369 if (performHeapProfile ||
1370 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1371 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1372 return true;
1373 } else {
1374 return false;
1375 }
1376 }
1377
1378 /* -----------------------------------------------------------------------------
1379 * stopAllCapabilities()
1380 *
1381 * Stop all Haskell execution. This is used when we need to make some global
1382 * change to the system, such as altering the number of capabilities, or
1383 * forking.
1384 *
1385 * pCap may be NULL in the event that the caller doesn't yet own a capability.
1386 *
1387 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1388 * -------------------------------------------------------------------------- */
1389
1390 #if defined(THREADED_RTS)
1391 void stopAllCapabilities (Capability **pCap, Task *task)
1392 {
1393 stopAllCapabilitiesWith(pCap, task, SYNC_OTHER);
1394 }
1395
1396 void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type)
1397 {
1398 bool was_syncing;
1399 SyncType prev_sync_type;
1400
1401 PendingSync sync = {
1402 .type = sync_type,
1403 .idle = NULL,
1404 .task = task
1405 };
1406
1407 do {
1408 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1409 } while (was_syncing);
1410
1411 acquireAllCapabilities(pCap ? *pCap : NULL, task);
1412
1413 pending_sync = 0;
1414 signalCondition(&sync_finished_cond);
1415 }
1416 #endif
1417
1418 /* -----------------------------------------------------------------------------
1419 * requestSync()
1420 *
1421 * Commence a synchronisation between all capabilities. Normally not called
1422 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1423 * has some special synchronisation requirements.
1424 *
1425 * Note that this can be called in two ways:
1426 *
1427 * - where *pcap points to a capability owned by the caller: in this case
1428 * *prev_sync_type will reflect the in-progress sync type on return, if one
1429 * *was found
1430 *
1431 * - where pcap == NULL: in this case the caller doesn't hold a capability.
1432 * we only return whether or not a pending sync was found and prev_sync_type
1433 * is unchanged.
1434 *
1435 * Returns:
1436 * false if we successfully got a sync
1437 * true if there was another sync request in progress,
1438 * and we yielded to it. The value returned is the
1439 * type of the other sync request.
1440 * -------------------------------------------------------------------------- */
1441
1442 #if defined(THREADED_RTS)
1443 static bool requestSync (
1444 Capability **pcap, Task *task, PendingSync *new_sync,
1445 SyncType *prev_sync_type)
1446 {
1447 PendingSync *sync;
1448
1449 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1450 (StgWord)NULL,
1451 (StgWord)new_sync);
1452
1453 if (sync != NULL)
1454 {
1455 // sync is valid until we have called yieldCapability().
1456 // After the sync is completed, we cannot read that struct any
1457 // more because it has been freed.
1458 *prev_sync_type = sync->type;
1459 if (pcap == NULL) {
1460 // The caller does not hold a capability (e.g. may be a concurrent
1461 // mark thread). Consequently we must wait until the pending sync is
1462 // finished before proceeding to ensure we don't loop.
1463 // TODO: Don't busy-wait
1464 ACQUIRE_LOCK(&sync_finished_mutex);
1465 while (pending_sync) {
1466 waitCondition(&sync_finished_cond, &sync_finished_mutex);
1467 }
1468 RELEASE_LOCK(&sync_finished_mutex);
1469 } else {
1470 do {
1471 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1472 sync->type);
1473 ASSERT(*pcap);
1474 yieldCapability(pcap,task,true);
1475 sync = pending_sync;
1476 } while (sync != NULL);
1477 }
1478
1479 // NOTE: task->cap might have changed now
1480 return true;
1481 }
1482 else
1483 {
1484 return false;
1485 }
1486 }
1487 #endif
1488
1489 /* -----------------------------------------------------------------------------
1490 * acquireAllCapabilities()
1491 *
1492 * Grab all the capabilities except the one we already hold (cap may be NULL is
1493 * the caller does not currently hold a capability). Used when synchronising
1494 * before a single-threaded GC (SYNC_SEQ_GC), and before a fork (SYNC_OTHER).
1495 *
1496 * Only call this after requestSync(), otherwise a deadlock might
1497 * ensue if another thread is trying to synchronise.
1498 * -------------------------------------------------------------------------- */
1499
1500 #if defined(THREADED_RTS)
1501 static void acquireAllCapabilities(Capability *cap, Task *task)
1502 {
1503 Capability *tmpcap;
1504 uint32_t i;
1505
1506 ASSERT(pending_sync != NULL);
1507 for (i=0; i < n_capabilities; i++) {
1508 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1509 i, n_capabilities);
1510 tmpcap = capabilities[i];
1511 if (tmpcap != cap) {
1512 // we better hope this task doesn't get migrated to
1513 // another Capability while we're waiting for this one.
1514 // It won't, because load balancing happens while we have
1515 // all the Capabilities, but even so it's a slightly
1516 // unsavoury invariant.
1517 task->cap = tmpcap;
1518 waitForCapability(&tmpcap, task);
1519 if (tmpcap->no != i) {
1520 barf("acquireAllCapabilities: got the wrong capability");
1521 }
1522 }
1523 }
1524 task->cap = cap == NULL ? tmpcap : cap;
1525 }
1526 #endif
1527
1528 /* -----------------------------------------------------------------------------
1529 * releaseAllCapabilities()
1530 *
1531 * Assuming this thread holds all the capabilities, release them all (except for
1532 * the one passed in as keep_cap, if non-NULL).
1533 * -------------------------------------------------------------------------- */
1534
1535 #if defined(THREADED_RTS)
1536 void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task)
1537 {
1538 uint32_t i;
1539
1540 for (i = 0; i < n; i++) {
1541 Capability *tmpcap = capabilities[i];
1542 if (keep_cap != tmpcap) {
1543 task->cap = tmpcap;
1544 releaseCapability(tmpcap);
1545 }
1546 }
1547 task->cap = keep_cap;
1548 }
1549 #endif
1550
1551 /* -----------------------------------------------------------------------------
1552 * Perform a garbage collection if necessary
1553 * -------------------------------------------------------------------------- */
1554
1555 static void
1556 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1557 bool force_major)
1558 {
1559 Capability *cap = *pcap;
1560 bool heap_census;
1561 uint32_t collect_gen;
1562 bool major_gc;
1563 #if defined(THREADED_RTS)
1564 uint32_t gc_type;
1565 uint32_t i;
1566 uint32_t need_idle;
1567 uint32_t n_gc_threads;
1568 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1569 StgTSO *tso;
1570 bool *idle_cap;
1571 // idle_cap is an array (allocated later) of size n_capabilities, where
1572 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1573 // cycle.
1574 #endif
1575
1576 if (sched_state == SCHED_SHUTTING_DOWN) {
1577 // The final GC has already been done, and the system is
1578 // shutting down. We'll probably deadlock if we try to GC
1579 // now.
1580 return;
1581 }
1582
1583 heap_census = scheduleNeedHeapProfile(true);
1584
1585 // Figure out which generation we are collecting, so that we can
1586 // decide whether this is a parallel GC or not.
1587 collect_gen = calcNeeded(force_major || heap_census, NULL);
1588 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1589
1590 #if defined(THREADED_RTS)
1591 if (sched_state < SCHED_INTERRUPTING
1592 && RtsFlags.ParFlags.parGcEnabled
1593 && collect_gen >= RtsFlags.ParFlags.parGcGen
1594 && ! oldest_gen->mark)
1595 {
1596 gc_type = SYNC_GC_PAR;
1597 } else {
1598 gc_type = SYNC_GC_SEQ;
1599 }
1600
1601 // In order to GC, there must be no threads running Haskell code.
1602 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1603 // capabilities, and release them after the GC has completed. For parallel
1604 // GC, we synchronise all the running threads using requestSync().
1605 //
1606 // Other capabilities are prevented from running yet more Haskell threads if
1607 // pending_sync is set. Tested inside yieldCapability() and
1608 // releaseCapability() in Capability.c
1609
1610 PendingSync sync = {
1611 .type = gc_type,
1612 .idle = NULL,
1613 .task = task
1614 };
1615
1616 {
1617 SyncType prev_sync = 0;
1618 bool was_syncing;
1619 do {
1620 // If -qn is not set and we have more capabilities than cores, set
1621 // the number of GC threads to #cores. We do this here rather than
1622 // in normaliseRtsOpts() because here it will work if the program
1623 // calls setNumCapabilities.
1624 //
1625 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1626 if (n_gc_threads == 0 &&
1627 enabled_capabilities > getNumberOfProcessors()) {
1628 n_gc_threads = getNumberOfProcessors();
1629 }
1630
1631 // This calculation must be inside the loop because
1632 // enabled_capabilities may change if requestSync() below fails and
1633 // we retry.
1634 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1635 if (n_gc_threads >= enabled_capabilities) {
1636 need_idle = 0;
1637 } else {
1638 need_idle = enabled_capabilities - n_gc_threads;
1639 }
1640 } else {
1641 need_idle = 0;
1642 }
1643
1644 // We need an array of size n_capabilities, but since this may
1645 // change each time around the loop we must allocate it afresh.
1646 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1647 sizeof(bool),
1648 "scheduleDoGC");
1649 sync.idle = idle_cap;
1650
1651 // When using +RTS -qn, we need some capabilities to be idle during
1652 // GC. The best bet is to choose some inactive ones, so we look for
1653 // those first:
1654 uint32_t n_idle = need_idle;
1655 for (i=0; i < n_capabilities; i++) {
1656 if (capabilities[i]->disabled) {
1657 idle_cap[i] = true;
1658 } else if (n_idle > 0 &&
1659 capabilities[i]->running_task == NULL) {
1660 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1661 n_idle--;
1662 idle_cap[i] = true;
1663 } else {
1664 idle_cap[i] = false;
1665 }
1666 }
1667 // If we didn't find enough inactive capabilities, just pick some
1668 // more to be idle.
1669 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1670 if (!idle_cap[i] && i != cap->no) {
1671 idle_cap[i] = true;
1672 n_idle--;
1673 }
1674 }
1675 ASSERT(n_idle == 0);
1676
1677 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1678 cap = *pcap;
1679 if (was_syncing) {
1680 stgFree(idle_cap);
1681 }
1682 if (was_syncing &&
1683 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1684 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1685 // someone else had a pending sync request for a GC, so
1686 // let's assume GC has been done and we don't need to GC
1687 // again.
1688 // Exception to this: if SCHED_INTERRUPTING, then we still
1689 // need to do the final GC.
1690 return;
1691 }
1692 if (sched_state == SCHED_SHUTTING_DOWN) {
1693 // The scheduler might now be shutting down. We tested
1694 // this above, but it might have become true since then as
1695 // we yielded the capability in requestSync().
1696 return;
1697 }
1698 } while (was_syncing);
1699 }
1700
1701 stat_startGCSync(gc_threads[cap->no]);
1702
1703 #if defined(DEBUG)
1704 unsigned int old_n_capabilities = n_capabilities;
1705 #endif
1706
1707 interruptAllCapabilities();
1708
1709 // The final shutdown GC is always single-threaded, because it's
1710 // possible that some of the Capabilities have no worker threads.
1711
1712 if (gc_type == SYNC_GC_SEQ) {
1713 traceEventRequestSeqGc(cap);
1714 } else {
1715 traceEventRequestParGc(cap);
1716 }
1717
1718 if (gc_type == SYNC_GC_SEQ) {
1719 // single-threaded GC: grab all the capabilities
1720 acquireAllCapabilities(cap,task);
1721 }
1722 else
1723 {
1724 // If we are load-balancing collections in this
1725 // generation, then we require all GC threads to participate
1726 // in the collection. Otherwise, we only require active
1727 // threads to participate, and we set gc_threads[i]->idle for
1728 // any idle capabilities. The rationale here is that waking
1729 // up an idle Capability takes much longer than just doing any
1730 // GC work on its behalf.
1731
1732 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1733 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1734 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1735 {
1736 for (i=0; i < n_capabilities; i++) {
1737 if (capabilities[i]->disabled) {
1738 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1739 if (idle_cap[i]) {
1740 n_idle_caps++;
1741 }
1742 } else {
1743 if (i != cap->no && idle_cap[i]) {
1744 Capability *tmpcap = capabilities[i];
1745 task->cap = tmpcap;
1746 waitForCapability(&tmpcap, task);
1747 n_idle_caps++;
1748 }
1749 }
1750 }
1751 }
1752 else
1753 {
1754 for (i=0; i < n_capabilities; i++) {
1755 if (capabilities[i]->disabled) {
1756 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1757 if (idle_cap[i]) {
1758 n_idle_caps++;
1759 }
1760 } else if (i != cap->no &&
1761 capabilities[i]->idle >=
1762 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1763 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1764 if (idle_cap[i]) {
1765 n_idle_caps++;
1766 } else {
1767 n_failed_trygrab_idles++;
1768 }
1769 }
1770 }
1771 }
1772 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1773
1774 for (i=0; i < n_capabilities; i++) {
1775 capabilities[i]->idle++;
1776 }
1777
1778 // For all capabilities participating in this GC, wait until
1779 // they have stopped mutating and are standing by for GC.
1780 waitForGcThreads(cap, idle_cap);
1781
1782 // Stable point where we can do a global check on our spark counters
1783 ASSERT(checkSparkCountInvariant());
1784 }
1785
1786 #endif
1787
1788 IF_DEBUG(scheduler, printAllThreads());
1789
1790 delete_threads_and_gc:
1791 /*
1792 * We now have all the capabilities; if we're in an interrupting
1793 * state, then we should take the opportunity to delete all the
1794 * threads in the system.
1795 * Checking for major_gc ensures that the last GC is major.
1796 */
1797 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1798 deleteAllThreads();
1799 #if defined(THREADED_RTS)
1800 // Discard all the sparks from every Capability. Why?
1801 // They'll probably be GC'd anyway since we've killed all the
1802 // threads. It just avoids the GC having to do any work to
1803 // figure out that any remaining sparks are garbage.
1804 for (i = 0; i < n_capabilities; i++) {
1805 capabilities[i]->spark_stats.gcd +=
1806 sparkPoolSize(capabilities[i]->sparks);
1807 // No race here since all Caps are stopped.
1808 discardSparksCap(capabilities[i]);
1809 }
1810 #endif
1811 sched_state = SCHED_SHUTTING_DOWN;
1812 }
1813
1814 /*
1815 * When there are disabled capabilities, we want to migrate any
1816 * threads away from them. Normally this happens in the
1817 * scheduler's loop, but only for unbound threads - it's really
1818 * hard for a bound thread to migrate itself. So we have another
1819 * go here.
1820 */
1821 #if defined(THREADED_RTS)
1822 for (i = enabled_capabilities; i < n_capabilities; i++) {
1823 Capability *tmp_cap, *dest_cap;
1824 tmp_cap = capabilities[i];
1825 ASSERT(tmp_cap->disabled);
1826 if (i != cap->no) {
1827 dest_cap = capabilities[i % enabled_capabilities];
1828 while (!emptyRunQueue(tmp_cap)) {
1829 tso = popRunQueue(tmp_cap);
1830 migrateThread(tmp_cap, tso, dest_cap);
1831 if (tso->bound) {
1832 traceTaskMigrate(tso->bound->task,
1833 tso->bound->task->cap,
1834 dest_cap);
1835 tso->bound->task->cap = dest_cap;
1836 }
1837 }
1838 }
1839 }
1840 #endif
1841
1842 // Do any remaining idle GC work from the previous GC
1843 doIdleGCWork(cap, true /* all of it */);
1844
1845 #if defined(THREADED_RTS)
1846 // reset pending_sync *before* GC, so that when the GC threads
1847 // emerge they don't immediately re-enter the GC.
1848 pending_sync = 0;
1849 signalCondition(&sync_finished_cond);
1850 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1851 #else
1852 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1853 #endif
1854
1855 // If we're shutting down, don't leave any idle GC work to do.
1856 if (sched_state == SCHED_SHUTTING_DOWN) {
1857 doIdleGCWork(cap, true /* all of it */);
1858 }
1859
1860 traceSparkCounters(cap);
1861
1862 switch (recent_activity) {
1863 case ACTIVITY_INACTIVE:
1864 if (force_major) {
1865 // We are doing a GC because the system has been idle for a
1866 // timeslice and we need to check for deadlock. Record the
1867 // fact that we've done a GC and turn off the timer signal;
1868 // it will get re-enabled if we run any threads after the GC.
1869 recent_activity = ACTIVITY_DONE_GC;
1870 #if !defined(PROFILING)
1871 stopTimer();
1872 #endif
1873 break;
1874 }
1875 // fall through...
1876
1877 case ACTIVITY_MAYBE_NO:
1878 // the GC might have taken long enough for the timer to set
1879 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1880 // but we aren't necessarily deadlocked:
1881 recent_activity = ACTIVITY_YES;
1882 break;
1883
1884 case ACTIVITY_DONE_GC:
1885 // If we are actually active, the scheduler will reset the
1886 // recent_activity flag and re-enable the timer.
1887 break;
1888 }
1889
1890 #if defined(THREADED_RTS)
1891 // Stable point where we can do a global check on our spark counters
1892 ASSERT(checkSparkCountInvariant());
1893 #endif
1894
1895 // The heap census itself is done during GarbageCollect().
1896 if (heap_census) {
1897 performHeapProfile = false;
1898 }
1899
1900 #if defined(THREADED_RTS)
1901
1902 // If n_capabilities has changed during GC, we're in trouble.
1903 ASSERT(n_capabilities == old_n_capabilities);
1904
1905 if (gc_type == SYNC_GC_PAR)
1906 {
1907 for (i = 0; i < n_capabilities; i++) {
1908 if (i != cap->no) {
1909 if (idle_cap[i]) {
1910 ASSERT(capabilities[i]->running_task == task);
1911 task->cap = capabilities[i];
1912 releaseCapability(capabilities[i]);
1913 } else {
1914 ASSERT(capabilities[i]->running_task != task);
1915 }
1916 }
1917 }
1918 task->cap = cap;
1919
1920 // releaseGCThreads() happens *after* we have released idle
1921 // capabilities. Otherwise what can happen is one of the released
1922 // threads starts a new GC, and finds that it can't acquire some of
1923 // the disabled capabilities, because the previous GC still holds
1924 // them, so those disabled capabilities will not be idle during the
1925 // next GC round. However, if we release the capabilities first,
1926 // then they will be free (because they're disabled) when the next
1927 // GC cycle happens.
1928 releaseGCThreads(cap, idle_cap);
1929 }
1930 #endif
1931 if (heap_overflow && sched_state == SCHED_RUNNING) {
1932 // GC set the heap_overflow flag. We should throw an exception if we
1933 // can, or shut down otherwise.
1934
1935 // Get the thread to which Ctrl-C is thrown
1936 StgTSO *main_thread = getTopHandlerThread();
1937 if (main_thread == NULL) {
1938 // GC set the heap_overflow flag, and there is no main thread to
1939 // throw an exception to, so we should proceed with an orderly
1940 // shutdown now. Ultimately we want the main thread to return to
1941 // its caller with HeapExhausted, at which point the caller should
1942 // call hs_exit(). The first step is to delete all the threads.
1943 sched_state = SCHED_INTERRUPTING;
1944 goto delete_threads_and_gc;
1945 }
1946
1947 heap_overflow = false;
1948 const uint64_t allocation_count = getAllocations();
1949 if (RtsFlags.GcFlags.heapLimitGrace <
1950 allocation_count - allocated_bytes_at_heapoverflow ||
1951 allocated_bytes_at_heapoverflow == 0) {
1952 allocated_bytes_at_heapoverflow = allocation_count;
1953 // We used to simply exit, but throwing an exception gives the
1954 // program a chance to clean up. It also lets the exception be
1955 // caught.
1956
1957 // FIXME this is not a good way to tell a program to release
1958 // resources. It is neither reliable (the RTS crashes if it fails
1959 // to allocate memory from the OS) nor very usable (it is always
1960 // thrown to the main thread, which might not be able to do anything
1961 // useful with it). We really should have a more general way to
1962 // release resources in low-memory conditions. Nevertheless, this
1963 // is still a big improvement over just exiting.
1964
1965 // FIXME again: perhaps we should throw a synchronous exception
1966 // instead an asynchronous one, or have a way for the program to
1967 // register a handler to be called when heap overflow happens.
1968 throwToSelf(cap, main_thread, heapOverflow_closure);
1969 }
1970 }
1971
1972 #if defined(THREADED_RTS)
1973 stgFree(idle_cap);
1974
1975 if (gc_type == SYNC_GC_SEQ) {
1976 // release our stash of capabilities.
1977 releaseAllCapabilities(n_capabilities, cap, task);
1978 }
1979 #endif
1980
1981 return;
1982 }
1983
1984 /* ---------------------------------------------------------------------------
1985 * Singleton fork(). Do not copy any running threads.
1986 * ------------------------------------------------------------------------- */
1987
1988 pid_t
1989 forkProcess(HsStablePtr *entry
1990 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1991 STG_UNUSED
1992 #endif
1993 )
1994 {
1995 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1996 pid_t pid;
1997 StgTSO* t,*next;
1998 Capability *cap;
1999 uint32_t g;
2000 Task *task = NULL;
2001 uint32_t i;
2002
2003 debugTrace(DEBUG_sched, "forking!");
2004
2005 task = newBoundTask();
2006
2007 cap = NULL;
2008 waitForCapability(&cap, task);
2009
2010 #if defined(THREADED_RTS)
2011 stopAllCapabilities(&cap, task);
2012 #endif
2013
2014 // no funny business: hold locks while we fork, otherwise if some
2015 // other thread is holding a lock when the fork happens, the data
2016 // structure protected by the lock will forever be in an
2017 // inconsistent state in the child. See also #1391.
2018 ACQUIRE_LOCK(&sched_mutex);
2019 ACQUIRE_LOCK(&sm_mutex);
2020 ACQUIRE_LOCK(&stable_ptr_mutex);
2021 ACQUIRE_LOCK(&stable_name_mutex);
2022 ACQUIRE_LOCK(&task->lock);
2023
2024 for (i=0; i < n_capabilities; i++) {
2025 ACQUIRE_LOCK(&capabilities[i]->lock);
2026 }
2027
2028 #if defined(THREADED_RTS)
2029 ACQUIRE_LOCK(&all_tasks_mutex);
2030 #endif
2031
2032 stopTimer(); // See #4074
2033
2034 #if defined(TRACING)
2035 flushEventLog(); // so that child won't inherit dirty file buffers
2036 #endif
2037
2038 pid = fork();
2039
2040 if (pid) { // parent
2041
2042 startTimer(); // #4074
2043
2044 RELEASE_LOCK(&sched_mutex);
2045 RELEASE_LOCK(&sm_mutex);
2046 RELEASE_LOCK(&stable_ptr_mutex);
2047 RELEASE_LOCK(&stable_name_mutex);
2048 RELEASE_LOCK(&task->lock);
2049
2050 #if defined(THREADED_RTS)
2051 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2052 RELEASE_LOCK(&all_tasks_mutex);
2053 #endif
2054
2055 for (i=0; i < n_capabilities; i++) {
2056 releaseCapability_(capabilities[i],false);
2057 RELEASE_LOCK(&capabilities[i]->lock);
2058 }
2059
2060 boundTaskExiting(task);
2061
2062 // just return the pid
2063 return pid;
2064
2065 } else { // child
2066
2067 // Current process times reset in the child process, so we should reset
2068 // the stats too. See #16102.
2069 resetChildProcessStats();
2070
2071 #if defined(THREADED_RTS)
2072 initMutex(&sched_mutex);
2073 initMutex(&sm_mutex);
2074 initMutex(&stable_ptr_mutex);
2075 initMutex(&stable_name_mutex);
2076 initMutex(&task->lock);
2077
2078 for (i=0; i < n_capabilities; i++) {
2079 initMutex(&capabilities[i]->lock);
2080 }
2081
2082 initMutex(&all_tasks_mutex);
2083 #endif
2084
2085 #if defined(TRACING)
2086 resetTracing();
2087 #endif
2088
2089 // Now, all OS threads except the thread that forked are
2090 // stopped. We need to stop all Haskell threads, including
2091 // those involved in foreign calls. Also we need to delete
2092 // all Tasks, because they correspond to OS threads that are
2093 // now gone.
2094
2095 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2096 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2097 next = t->global_link;
2098 // don't allow threads to catch the ThreadKilled
2099 // exception, but we do want to raiseAsync() because these
2100 // threads may be evaluating thunks that we need later.
2101 deleteThread_(t);
2102
2103 // stop the GC from updating the InCall to point to
2104 // the TSO. This is only necessary because the
2105 // OSThread bound to the TSO has been killed, and
2106 // won't get a chance to exit in the usual way (see
2107 // also scheduleHandleThreadFinished).
2108 t->bound = NULL;
2109 }
2110 }
2111
2112 discardTasksExcept(task);
2113
2114 for (i=0; i < n_capabilities; i++) {
2115 cap = capabilities[i];
2116
2117 // Empty the run queue. It seems tempting to let all the
2118 // killed threads stay on the run queue as zombies to be
2119 // cleaned up later, but some of them may correspond to
2120 // bound threads for which the corresponding Task does not
2121 // exist.
2122 truncateRunQueue(cap);
2123 cap->n_run_queue = 0;
2124
2125 // Any suspended C-calling Tasks are no more, their OS threads
2126 // don't exist now:
2127 cap->suspended_ccalls = NULL;
2128 cap->n_suspended_ccalls = 0;
2129
2130 #if defined(THREADED_RTS)
2131 // Wipe our spare workers list, they no longer exist. New
2132 // workers will be created if necessary.
2133 cap->spare_workers = NULL;
2134 cap->n_spare_workers = 0;
2135 cap->returning_tasks_hd = NULL;
2136 cap->returning_tasks_tl = NULL;
2137 cap->n_returning_tasks = 0;
2138 #endif
2139
2140 // Release all caps except 0, we'll use that for starting
2141 // the IO manager and running the client action below.
2142 if (cap->no != 0) {
2143 task->cap = cap;
2144 releaseCapability(cap);
2145 }
2146 }
2147 cap = capabilities[0];
2148 task->cap = cap;
2149
2150 // Empty the threads lists. Otherwise, the garbage
2151 // collector may attempt to resurrect some of these threads.
2152 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2153 generations[g].threads = END_TSO_QUEUE;
2154 }
2155
2156 // On Unix, all timers are reset in the child, so we need to start
2157 // the timer again.
2158 initTimer();
2159 startTimer();
2160
2161 // TODO: need to trace various other things in the child
2162 // like startup event, capabilities, process info etc
2163 traceTaskCreate(task, cap);
2164
2165 #if defined(THREADED_RTS)
2166 ioManagerStartCap(&cap);
2167 #endif
2168
2169 // Install toplevel exception handlers, so interruption
2170 // signal will be sent to the main thread.
2171 // See #12903
2172 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2173 rts_checkSchedStatus("forkProcess",cap);
2174
2175 rts_unlock(cap);
2176 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2177 }
2178 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2179 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2180 #endif
2181 }
2182
2183 /* ---------------------------------------------------------------------------
2184 * Changing the number of Capabilities
2185 *
2186 * Changing the number of Capabilities is very tricky! We can only do
2187 * it with the system fully stopped, so we do a full sync with
2188 * requestSync(SYNC_OTHER) and grab all the capabilities.
2189 *
2190 * Then we resize the appropriate data structures, and update all
2191 * references to the old data structures which have now moved.
2192 * Finally we release the Capabilities we are holding, and start
2193 * worker Tasks on the new Capabilities we created.
2194 *
2195 * ------------------------------------------------------------------------- */
2196
2197 void
2198 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2199 {
2200 #if !defined(THREADED_RTS)
2201 if (new_n_capabilities != 1) {
2202 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2203 }
2204 return;
2205 #elif defined(NOSMP)
2206 if (new_n_capabilities != 1) {
2207 errorBelch("setNumCapabilities: not supported on this platform");
2208 }
2209 return;
2210 #else
2211 Task *task;
2212 Capability *cap;
2213 uint32_t n;
2214 Capability *old_capabilities = NULL;
2215 uint32_t old_n_capabilities = n_capabilities;
2216
2217 if (new_n_capabilities == enabled_capabilities) {
2218 return;
2219 } else if (new_n_capabilities <= 0) {
2220 errorBelch("setNumCapabilities: Capability count must be positive");
2221 return;
2222 }
2223
2224
2225 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2226 enabled_capabilities, new_n_capabilities);
2227
2228 cap = rts_lock();
2229 task = cap->running_task;
2230
2231 stopAllCapabilities(&cap, task);
2232
2233 if (new_n_capabilities < enabled_capabilities)
2234 {
2235 // Reducing the number of capabilities: we do not actually
2236 // remove the extra capabilities, we just mark them as
2237 // "disabled". This has the following effects:
2238 //
2239 // - threads on a disabled capability are migrated away by the
2240 // scheduler loop
2241 //
2242 // - disabled capabilities do not participate in GC
2243 // (see scheduleDoGC())
2244 //
2245 // - No spark threads are created on this capability
2246 // (see scheduleActivateSpark())
2247 //
2248 // - We do not attempt to migrate threads *to* a disabled
2249 // capability (see schedulePushWork()).
2250 //
2251 // but in other respects, a disabled capability remains
2252 // alive. Threads may be woken up on a disabled capability,
2253 // but they will be immediately migrated away.
2254 //
2255 // This approach is much easier than trying to actually remove
2256 // the capability; we don't have to worry about GC data
2257 // structures, the nursery, etc.
2258 //
2259 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2260 capabilities[n]->disabled = true;
2261 traceCapDisable(capabilities[n]);
2262 }
2263 enabled_capabilities = new_n_capabilities;
2264 }
2265 else
2266 {
2267 // Increasing the number of enabled capabilities.
2268 //
2269 // enable any disabled capabilities, up to the required number
2270 for (n = enabled_capabilities;
2271 n < new_n_capabilities && n < n_capabilities; n++) {
2272 capabilities[n]->disabled = false;
2273 traceCapEnable(capabilities[n]);
2274 }
2275 enabled_capabilities = n;
2276
2277 if (new_n_capabilities > n_capabilities) {
2278 #if defined(TRACING)
2279 // Allocate eventlog buffers for the new capabilities. Note this
2280 // must be done before calling moreCapabilities(), because that
2281 // will emit events about creating the new capabilities and adding
2282 // them to existing capsets.
2283 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2284 #endif
2285
2286 // Resize the capabilities array
2287 // NB. after this, capabilities points somewhere new. Any pointers
2288 // of type (Capability *) are now invalid.
2289 moreCapabilities(n_capabilities, new_n_capabilities);
2290
2291 // Resize and update storage manager data structures
2292 storageAddCapabilities(n_capabilities, new_n_capabilities);
2293 }
2294 }
2295
2296 // update n_capabilities before things start running
2297 if (new_n_capabilities > n_capabilities) {
2298 n_capabilities = enabled_capabilities = new_n_capabilities;
2299 }
2300
2301 // We're done: release the original Capabilities
2302 releaseAllCapabilities(old_n_capabilities, cap,task);
2303
2304 // We can't free the old array until now, because we access it
2305 // while updating pointers in updateCapabilityRefs().
2306 if (old_capabilities) {
2307 stgFree(old_capabilities);
2308 }
2309
2310 // Notify IO manager that the number of capabilities has changed.
2311 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2312
2313 rts_unlock(cap);
2314
2315 #endif // THREADED_RTS
2316 }
2317
2318
2319
2320 /* ---------------------------------------------------------------------------
2321 * Delete all the threads in the system
2322 * ------------------------------------------------------------------------- */
2323
2324 static void
2325 deleteAllThreads ()
2326 {
2327 // NOTE: only safe to call if we own all capabilities.
2328
2329 StgTSO* t, *next;
2330 uint32_t g;
2331
2332 debugTrace(DEBUG_sched,"deleting all threads");
2333 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2334 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2335 next = t->global_link;
2336 deleteThread(t);
2337 }
2338 }
2339
2340 // The run queue now contains a bunch of ThreadKilled threads. We
2341 // must not throw these away: the main thread(s) will be in there
2342 // somewhere, and the main scheduler loop has to deal with it.
2343 // Also, the run queue is the only thing keeping these threads from
2344 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2345
2346 #if !defined(THREADED_RTS)
2347 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2348 ASSERT(sleeping_queue == END_TSO_QUEUE);
2349 #endif
2350 }
2351
2352 /* -----------------------------------------------------------------------------
2353 Managing the suspended_ccalls list.
2354 Locks required: sched_mutex
2355 -------------------------------------------------------------------------- */
2356
2357 STATIC_INLINE void
2358 suspendTask (Capability *cap, Task *task)
2359 {
2360 InCall *incall;
2361
2362 incall = task->incall;
2363 ASSERT(incall->next == NULL && incall->prev == NULL);
2364 incall->next = cap->suspended_ccalls;
2365 incall->prev = NULL;
2366 if (cap->suspended_ccalls) {
2367 cap->suspended_ccalls->prev = incall;
2368 }
2369 cap->suspended_ccalls = incall;
2370 cap->n_suspended_ccalls++;
2371 }
2372
2373 STATIC_INLINE void
2374 recoverSuspendedTask (Capability *cap, Task *task)
2375 {
2376 InCall *incall;
2377
2378 incall = task->incall;
2379 if (incall->prev) {
2380 incall->prev->next = incall->next;
2381 } else {
2382 ASSERT(cap->suspended_ccalls == incall);
2383 cap->suspended_ccalls = incall->next;
2384 }
2385 if (incall->next) {
2386 incall->next->prev = incall->prev;
2387 }
2388 incall->next = incall->prev = NULL;
2389 cap->n_suspended_ccalls--;
2390 }
2391
2392 /* ---------------------------------------------------------------------------
2393 * Suspending & resuming Haskell threads.
2394 *
2395 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2396 * its capability before calling the C function. This allows another
2397 * task to pick up the capability and carry on running Haskell
2398 * threads. It also means that if the C call blocks, it won't lock
2399 * the whole system.
2400 *
2401 * The Haskell thread making the C call is put to sleep for the
2402 * duration of the call, on the suspended_ccalling_threads queue. We
2403 * give out a token to the task, which it can use to resume the thread
2404 * on return from the C function.
2405 *
2406 * If this is an interruptible C call, this means that the FFI call may be
2407 * unceremoniously terminated and should be scheduled on an
2408 * unbound worker thread.
2409 * ------------------------------------------------------------------------- */
2410
2411 void *
2412 suspendThread (StgRegTable *reg, bool interruptible)
2413 {
2414 Capability *cap;
2415 int saved_errno;
2416 StgTSO *tso;
2417 Task *task;
2418 #if defined(mingw32_HOST_OS)
2419 StgWord32 saved_winerror;
2420 #endif
2421
2422 saved_errno = errno;
2423 #if defined(mingw32_HOST_OS)
2424 saved_winerror = GetLastError();
2425 #endif
2426
2427 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2428 */
2429 cap = regTableToCapability(reg);
2430
2431 task = cap->running_task;
2432 tso = cap->r.rCurrentTSO;
2433
2434 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2435
2436 // XXX this might not be necessary --SDM
2437 tso->what_next = ThreadRunGHC;
2438
2439 threadPaused(cap,tso);
2440
2441 if (interruptible) {
2442 tso->why_blocked = BlockedOnCCall_Interruptible;
2443 } else {
2444 tso->why_blocked = BlockedOnCCall;
2445 }
2446
2447 // Hand back capability
2448 task->incall->suspended_tso = tso;
2449 task->incall->suspended_cap = cap;
2450
2451 // Otherwise allocate() will write to invalid memory.
2452 cap->r.rCurrentTSO = NULL;
2453
2454 ACQUIRE_LOCK(&cap->lock);
2455
2456 suspendTask(cap,task);
2457 cap->in_haskell = false;
2458 releaseCapability_(cap,false);
2459
2460 RELEASE_LOCK(&cap->lock);
2461
2462 errno = saved_errno;
2463 #if defined(mingw32_HOST_OS)
2464 SetLastError(saved_winerror);
2465 #endif
2466 return task;
2467 }
2468
2469 StgRegTable *
2470 resumeThread (void *task_)
2471 {
2472 StgTSO *tso;
2473 InCall *incall;
2474 Capability *cap;
2475 Task *task = task_;
2476 int saved_errno;
2477 #if defined(mingw32_HOST_OS)
2478 StgWord32 saved_winerror;
2479 #endif
2480
2481 saved_errno = errno;
2482 #if defined(mingw32_HOST_OS)
2483 saved_winerror = GetLastError();
2484 #endif
2485
2486 incall = task->incall;
2487 cap = incall->suspended_cap;
2488 task->cap = cap;
2489
2490 // Wait for permission to re-enter the RTS with the result.
2491 waitForCapability(&cap,task);
2492 // we might be on a different capability now... but if so, our
2493 // entry on the suspended_ccalls list will also have been
2494 // migrated.
2495
2496 // Remove the thread from the suspended list
2497 recoverSuspendedTask(cap,task);
2498
2499 tso = incall->suspended_tso;
2500 incall->suspended_tso = NULL;
2501 incall->suspended_cap = NULL;
2502 // we will modify tso->_link
2503 if (RTS_UNLIKELY(nonmoving_write_barrier_enabled)) {
2504 updateRemembSetPushClosure(cap, (StgClosure *)tso->_link);
2505 }
2506 tso->_link = END_TSO_QUEUE;
2507
2508 traceEventRunThread(cap, tso);
2509
2510 /* Reset blocking status */
2511 tso->why_blocked = NotBlocked;
2512
2513 if ((tso->flags & TSO_BLOCKEX) == 0) {
2514 // avoid locking the TSO if we don't have to
2515 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2516 maybePerformBlockedException(cap,tso);
2517 }
2518 }
2519
2520 cap->r.rCurrentTSO = tso;
2521 cap->in_haskell = true;
2522 errno = saved_errno;
2523 #if defined(mingw32_HOST_OS)
2524 SetLastError(saved_winerror);
2525 #endif
2526
2527 /* We might have GC'd, mark the TSO dirty again */
2528 dirty_TSO(cap,tso);
2529 dirty_STACK(cap,tso->stackobj);
2530
2531 IF_DEBUG(sanity, checkTSO(tso));
2532
2533 return &cap->r;
2534 }
2535
2536 /* ---------------------------------------------------------------------------
2537 * scheduleThread()
2538 *
2539 * scheduleThread puts a thread on the end of the runnable queue.
2540 * This will usually be done immediately after a thread is created.
2541 * The caller of scheduleThread must create the thread using e.g.
2542 * createThread and push an appropriate closure
2543 * on this thread's stack before the scheduler is invoked.
2544 * ------------------------------------------------------------------------ */
2545
2546 void
2547 scheduleThread(Capability *cap, StgTSO *tso)
2548 {
2549 // The thread goes at the *end* of the run-queue, to avoid possible
2550 // starvation of any threads already on the queue.
2551 appendToRunQueue(cap,tso);
2552 }
2553
2554 void
2555 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2556 {
2557 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2558 // move this thread from now on.
2559 #if defined(THREADED_RTS)
2560 cpu %= enabled_capabilities;
2561 if (cpu == cap->no) {
2562 appendToRunQueue(cap,tso);
2563 } else {
2564 migrateThread(cap, tso, capabilities[cpu]);
2565 }
2566 #else
2567 appendToRunQueue(cap,tso);
2568 #endif
2569 }
2570
2571 void
2572 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2573 {
2574 Task *task;
2575 DEBUG_ONLY( StgThreadID id );
2576 Capability *cap;
2577
2578 cap = *pcap;
2579
2580 // We already created/initialised the Task
2581 task = cap->running_task;
2582
2583 // This TSO is now a bound thread; make the Task and TSO
2584 // point to each other.
2585 tso->bound = task->incall;
2586 tso->cap = cap;
2587
2588 task->incall->tso = tso;
2589 task->incall->ret = ret;
2590 task->incall->rstat = NoStatus;
2591
2592 appendToRunQueue(cap,tso);
2593
2594 DEBUG_ONLY( id = tso->id );
2595 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2596
2597 cap = schedule(cap,task);
2598
2599 ASSERT(task->incall->rstat != NoStatus);
2600 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2601
2602 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2603 *pcap = cap;
2604 }
2605
2606 /* ----------------------------------------------------------------------------
2607 * Starting Tasks
2608 * ------------------------------------------------------------------------- */
2609
2610 #if defined(THREADED_RTS)
2611 void scheduleWorker (Capability *cap, Task *task)
2612 {
2613 // schedule() runs without a lock.
2614 cap = schedule(cap,task);
2615
2616 // On exit from schedule(), we have a Capability, but possibly not
2617 // the same one we started with.
2618
2619 // During shutdown, the requirement is that after all the
2620 // Capabilities are shut down, all workers that are shutting down
2621 // have finished workerTaskStop(). This is why we hold on to
2622 // cap->lock until we've finished workerTaskStop() below.
2623 //
2624 // There may be workers still involved in foreign calls; those
2625 // will just block in waitForCapability() because the
2626 // Capability has been shut down.
2627 //
2628 ACQUIRE_LOCK(&cap->lock);
2629 releaseCapability_(cap,false);
2630 workerTaskStop(task);
2631 RELEASE_LOCK(&cap->lock);
2632 }
2633 #endif
2634
2635 /* ---------------------------------------------------------------------------
2636 * Start new worker tasks on Capabilities from--to
2637 * -------------------------------------------------------------------------- */
2638
2639 static void
2640 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2641 {
2642 #if defined(THREADED_RTS)
2643 uint32_t i;
2644 Capability *cap;
2645
2646 for (i = from; i < to; i++) {
2647 cap = capabilities[i];
2648 ACQUIRE_LOCK(&cap->lock);
2649 startWorkerTask(cap);
2650 RELEASE_LOCK(&cap->lock);
2651 }
2652 #endif
2653 }
2654
2655 /* ---------------------------------------------------------------------------
2656 * initScheduler()
2657 *
2658 * Initialise the scheduler. This resets all the queues - if the
2659 * queues contained any threads, they'll be garbage collected at the
2660 * next pass.
2661 *
2662 * ------------------------------------------------------------------------ */
2663
2664 void
2665 initScheduler(void)
2666 {
2667 #if !defined(THREADED_RTS)
2668 blocked_queue_hd = END_TSO_QUEUE;
2669 blocked_queue_tl = END_TSO_QUEUE;
2670 sleeping_queue = END_TSO_QUEUE;
2671 #endif
2672
2673 sched_state = SCHED_RUNNING;
2674 recent_activity = ACTIVITY_YES;
2675
2676 #if defined(THREADED_RTS)
2677 /* Initialise the mutex and condition variables used by
2678 * the scheduler. */
2679 initMutex(&sched_mutex);
2680 initMutex(&sync_finished_mutex);
2681 initCondition(&sync_finished_cond);
2682 #endif
2683
2684 ACQUIRE_LOCK(&sched_mutex);
2685
2686 allocated_bytes_at_heapoverflow = 0;
2687
2688 /* A capability holds the state a native thread needs in
2689 * order to execute STG code. At least one capability is
2690 * floating around (only THREADED_RTS builds have more than one).
2691 */
2692 initCapabilities();
2693
2694 initTaskManager();
2695
2696 /*
2697 * Eagerly start one worker to run each Capability, except for
2698 * Capability 0. The idea is that we're probably going to start a
2699 * bound thread on Capability 0 pretty soon, so we don't want a
2700 * worker task hogging it.
2701 */
2702 startWorkerTasks(1, n_capabilities);
2703
2704 RELEASE_LOCK(&sched_mutex);
2705
2706 }
2707
2708 void
2709 exitScheduler (bool wait_foreign USED_IF_THREADS)
2710 /* see Capability.c, shutdownCapability() */
2711 {
2712 Task *task = newBoundTask();
2713
2714 // If we haven't killed all the threads yet, do it now.
2715 if (sched_state < SCHED_SHUTTING_DOWN) {
2716 sched_state = SCHED_INTERRUPTING;
2717 nonmovingExit();
2718 Capability *cap = task->cap;
2719 waitForCapability(&cap,task);
2720 scheduleDoGC(&cap,task,true);
2721 ASSERT(task->incall->tso == NULL);
2722 releaseCapability(cap);
2723 }
2724 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2725
2726 shutdownCapabilities(task, wait_foreign);
2727
2728 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2729 // n_failed_trygrab_idles, n_idle_caps);
2730
2731 boundTaskExiting(task);
2732 }
2733
2734 void
2735 freeScheduler( void )
2736 {
2737 uint32_t still_running;
2738
2739 ACQUIRE_LOCK(&sched_mutex);
2740 still_running = freeTaskManager();
2741 // We can only free the Capabilities if there are no Tasks still
2742 // running. We might have a Task about to return from a foreign
2743 // call into waitForCapability(), for example (actually,
2744 // this should be the *only* thing that a still-running Task can
2745 // do at this point, and it will block waiting for the
2746 // Capability).
2747 if (still_running == 0) {
2748 freeCapabilities();
2749 }
2750 RELEASE_LOCK(&sched_mutex);
2751 #if defined(THREADED_RTS)
2752 closeMutex(&sched_mutex);
2753 #endif
2754 }
2755
2756 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2757 void *user USED_IF_NOT_THREADS)
2758 {
2759 #if !defined(THREADED_RTS)
2760 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2761 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2762 evac(user, (StgClosure **)(void *)&sleeping_queue);
2763 #endif
2764 }
2765
2766 /* -----------------------------------------------------------------------------
2767 performGC
2768
2769 This is the interface to the garbage collector from Haskell land.
2770 We provide this so that external C code can allocate and garbage
2771 collect when called from Haskell via _ccall_GC.
2772 -------------------------------------------------------------------------- */
2773
2774 static void
2775 performGC_(bool force_major)
2776 {
2777 Task *task;
2778 Capability *cap = NULL;
2779
2780 // We must grab a new Task here, because the existing Task may be
2781 // associated with a particular Capability, and chained onto the
2782 // suspended_ccalls queue.
2783 task = newBoundTask();
2784
2785 // TODO: do we need to traceTask*() here?
2786
2787 waitForCapability(&cap,task);
2788 scheduleDoGC(&cap,task,force_major);
2789 releaseCapability(cap);
2790 boundTaskExiting(task);
2791 }
2792
2793 void
2794 performGC(void)
2795 {
2796 performGC_(false);
2797 }
2798
2799 void
2800 performMajorGC(void)
2801 {
2802 performGC_(true);
2803 }
2804
2805 /* ---------------------------------------------------------------------------
2806 Interrupt execution.
2807 Might be called inside a signal handler so it mustn't do anything fancy.
2808 ------------------------------------------------------------------------ */
2809
2810 void
2811 interruptStgRts(void)
2812 {
2813 ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2814 sched_state = SCHED_INTERRUPTING;
2815 interruptAllCapabilities();
2816 #if defined(THREADED_RTS)
2817 wakeUpRts();
2818 #endif
2819 }
2820
2821 /* -----------------------------------------------------------------------------
2822 Wake up the RTS
2823
2824 This function causes at least one OS thread to wake up and run the
2825 scheduler loop. It is invoked when the RTS might be deadlocked, or
2826 an external event has arrived that may need servicing (eg. a
2827 keyboard interrupt).
2828
2829 In the single-threaded RTS we don't do anything here; we only have
2830 one thread anyway, and the event that caused us to want to wake up
2831 will have interrupted any blocking system call in progress anyway.
2832 -------------------------------------------------------------------------- */
2833
2834 #if defined(THREADED_RTS)
2835 void wakeUpRts(void)
2836 {
2837 // This forces the IO Manager thread to wakeup, which will
2838 // in turn ensure that some OS thread wakes up and runs the
2839 // scheduler loop, which will cause a GC and deadlock check.
2840 ioManagerWakeup();
2841 }
2842 #endif
2843
2844 /* -----------------------------------------------------------------------------
2845 Deleting threads
2846
2847 This is used for interruption (^C) and forking, and corresponds to
2848 raising an exception but without letting the thread catch the
2849 exception.
2850 -------------------------------------------------------------------------- */
2851
2852 static void
2853 deleteThread (StgTSO *tso)
2854 {
2855 // NOTE: must only be called on a TSO that we have exclusive
2856 // access to, because we will call throwToSingleThreaded() below.
2857 // The TSO must be on the run queue of the Capability we own, or
2858 // we must own all Capabilities.
2859
2860 if (tso->why_blocked != BlockedOnCCall &&
2861 tso->why_blocked != BlockedOnCCall_Interruptible) {
2862 throwToSingleThreaded(tso->cap,tso,NULL);
2863 }
2864 }
2865
2866 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2867 static void
2868 deleteThread_(StgTSO *tso)
2869 { // for forkProcess only:
2870 // like deleteThread(), but we delete threads in foreign calls, too.
2871
2872 if (tso->why_blocked == BlockedOnCCall ||
2873 tso->why_blocked == BlockedOnCCall_Interruptible) {
2874 tso->what_next = ThreadKilled;
2875 appendToRunQueue(tso->cap, tso);
2876 } else {
2877 deleteThread(tso);
2878 }
2879 }
2880 #endif
2881
2882 /* -----------------------------------------------------------------------------
2883 raiseExceptionHelper
2884
2885 This function is called by the raise# primitive, just so that we can
2886 move some of the tricky bits of raising an exception from C-- into
2887 C. Who knows, it might be a useful re-useable thing here too.
2888 -------------------------------------------------------------------------- */
2889
2890 StgWord
2891 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2892 {
2893 Capability *cap = regTableToCapability(reg);
2894 StgThunk *raise_closure = NULL;
2895 StgPtr p, next;
2896 const StgRetInfoTable *info;
2897 //
2898 // This closure represents the expression 'raise# E' where E
2899 // is the exception raise. It is used to overwrite all the
2900 // thunks which are currently under evaluation.
2901 //
2902
2903 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2904 // LDV profiling: stg_raise_info has THUNK as its closure
2905 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2906 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2907 // 1 does not cause any problem unless profiling is performed.
2908 // However, when LDV profiling goes on, we need to linearly scan
2909 // small object pool, where raise_closure is stored, so we should
2910 // use MIN_UPD_SIZE.
2911 //
2912 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2913 // sizeofW(StgClosure)+1);
2914 //
2915
2916 //
2917 // Walk up the stack, looking for the catch frame. On the way,
2918 // we update any closures pointed to from update frames with the
2919 // raise closure that we just built.
2920 //
2921 p = tso->stackobj->sp;
2922 while(1) {
2923 info = get_ret_itbl((StgClosure *)p);
2924 next = p + stack_frame_sizeW((StgClosure *)p);
2925 switch (info->i.type) {
2926
2927 case UPDATE_FRAME:
2928 // Only create raise_closure if we need to.
2929 if (raise_closure == NULL) {
2930 raise_closure =
2931 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2932 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2933 raise_closure->payload[0] = exception;
2934 }
2935 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2936 (StgClosure *)raise_closure);
2937 p = next;
2938 continue;
2939
2940 case ATOMICALLY_FRAME:
2941 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2942 tso->stackobj->sp = p;
2943 return ATOMICALLY_FRAME;
2944
2945 case CATCH_FRAME:
2946 tso->stackobj->sp = p;
2947 return CATCH_FRAME;
2948
2949 case CATCH_STM_FRAME:
2950 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2951 tso->stackobj->sp = p;
2952 return CATCH_STM_FRAME;
2953
2954 case UNDERFLOW_FRAME:
2955 tso->stackobj->sp = p;
2956 threadStackUnderflow(cap,tso);
2957 p = tso->stackobj->sp;
2958 continue;
2959
2960 case STOP_FRAME:
2961 tso->stackobj->sp = p;
2962 return STOP_FRAME;
2963
2964 case CATCH_RETRY_FRAME: {
2965 StgTRecHeader *trec = tso -> trec;
2966 StgTRecHeader *outer = trec -> enclosing_trec;
2967 debugTrace(DEBUG_stm,
2968 "found CATCH_RETRY_FRAME at %p during raise", p);
2969 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2970 stmAbortTransaction(cap, trec);
2971 stmFreeAbortedTRec(cap, trec);
2972 tso -> trec = outer;
2973 p = next;
2974 continue;
2975 }
2976
2977 default:
2978 p = next;
2979 continue;
2980 }
2981 }
2982 }
2983
2984
2985 /* -----------------------------------------------------------------------------
2986 findRetryFrameHelper
2987
2988 This function is called by the retry# primitive. It traverses the stack
2989 leaving tso->sp referring to the frame which should handle the retry.
2990
2991 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2992 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2993
2994 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2995 create) because retries are not considered to be exceptions, despite the
2996 similar implementation.
2997
2998 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2999 not be created within memory transactions.
3000 -------------------------------------------------------------------------- */
3001
3002 StgWord
3003 findRetryFrameHelper (Capability *cap, StgTSO *tso)
3004 {
3005 const StgRetInfoTable *info;
3006 StgPtr p, next;
3007
3008 p = tso->stackobj->sp;
3009 while (1) {
3010 info = get_ret_itbl((const StgClosure *)p);
3011 next = p + stack_frame_sizeW((StgClosure *)p);
3012 switch (info->i.type) {
3013
3014 case ATOMICALLY_FRAME:
3015 debugTrace(DEBUG_stm,
3016 "found ATOMICALLY_FRAME at %p during retry", p);
3017 tso->stackobj->sp = p;
3018 return ATOMICALLY_FRAME;
3019
3020 case CATCH_RETRY_FRAME:
3021 debugTrace(DEBUG_stm,
3022 "found CATCH_RETRY_FRAME at %p during retry", p);
3023 tso->stackobj->sp = p;
3024 return CATCH_RETRY_FRAME;
3025
3026 case CATCH_STM_FRAME: {
3027 StgTRecHeader *trec = tso -> trec;
3028 StgTRecHeader *outer = trec -> enclosing_trec;
3029 debugTrace(DEBUG_stm,
3030 "found CATCH_STM_FRAME at %p during retry", p);
3031 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3032 stmAbortTransaction(cap, trec);
3033 stmFreeAbortedTRec(cap, trec);
3034 tso -> trec = outer;
3035 p = next;
3036 continue;
3037 }
3038
3039 case UNDERFLOW_FRAME:
3040 tso->stackobj->sp = p;
3041 threadStackUnderflow(cap,tso);
3042 p = tso->stackobj->sp;
3043 continue;
3044
3045 default:
3046 ASSERT(info->i.type != CATCH_FRAME);
3047 ASSERT(info->i.type != STOP_FRAME);
3048 p = next;
3049 continue;
3050 }
3051 }
3052 }
3053
3054 /* -----------------------------------------------------------------------------
3055 resurrectThreads is called after garbage collection on the list of
3056 threads found to be garbage. Each of these threads will be woken
3057 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3058 on an MVar, or NonTermination if the thread was blocked on a Black
3059 Hole.
3060
3061 Locks: assumes we hold *all* the capabilities.
3062 -------------------------------------------------------------------------- */
3063
3064 void
3065 resurrectThreads (StgTSO *threads)
3066 {
3067 StgTSO *tso, *next;
3068 Capability *cap;
3069 generation *gen;
3070
3071 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3072 next = tso->global_link;
3073
3074 gen = Bdescr((P_)tso)->gen;
3075 tso->global_link = gen->threads;
3076 gen->threads = tso;
3077
3078 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3079
3080 // Wake up the thread on the Capability it was last on
3081 cap = tso->cap;
3082
3083 switch (tso->why_blocked) {
3084 case BlockedOnMVar:
3085 case BlockedOnMVarRead:
3086 /* Called by GC - sched_mutex lock is currently held. */
3087 throwToSingleThreaded(cap, tso,
3088 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3089 break;
3090 case BlockedOnBlackHole:
3091 throwToSingleThreaded(cap, tso,
3092 (StgClosure *)nonTermination_closure);
3093 break;
3094 case BlockedOnSTM:
3095 throwToSingleThreaded(cap, tso,
3096 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3097 break;
3098 case NotBlocked:
3099 /* This might happen if the thread was blocked on a black hole
3100 * belonging to a thread that we've just woken up (raiseAsync
3101 * can wake up threads, remember...).
3102 */
3103 continue;
3104 case BlockedOnMsgThrowTo:
3105 // This can happen if the target is masking, blocks on a
3106 // black hole, and then is found to be unreachable. In
3107 // this case, we want to let the target wake up and carry
3108 // on, and do nothing to this thread.
3109 continue;
3110 default:
3111 barf("resurrectThreads: thread blocked in a strange way: %d",
3112 tso->why_blocked);
3113 }
3114 }
3115 }