rts/RetainerProfile: Dump closure type if push() fails
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %" FMT_Word, sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #if defined(THREADED_RTS)
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #if defined(mingw32_HOST_OS)
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #if !defined(PROFILING)
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #if defined(mingw32_HOST_OS)
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 if (doIdleGCWork(cap, false)) {
683 didGcLast = false;
684 } else {
685 didGcLast = yieldCapability(&cap,task, !didGcLast);
686 }
687 }
688 while (shouldYieldCapability(cap,task,didGcLast));
689
690 // note there may still be no threads on the run queue at this
691 // point, the caller has to check.
692
693 *pcap = cap;
694 return;
695 }
696 #endif
697
698 /* -----------------------------------------------------------------------------
699 * schedulePushWork()
700 *
701 * Push work to other Capabilities if we have some.
702 * -------------------------------------------------------------------------- */
703
704 static void
705 schedulePushWork(Capability *cap USED_IF_THREADS,
706 Task *task USED_IF_THREADS)
707 {
708 /* following code not for PARALLEL_HASKELL. I kept the call general,
709 future GUM versions might use pushing in a distributed setup */
710 #if defined(THREADED_RTS)
711
712 Capability *free_caps[n_capabilities], *cap0;
713 uint32_t i, n_wanted_caps, n_free_caps;
714
715 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
716
717 // migration can be turned off with +RTS -qm
718 if (!RtsFlags.ParFlags.migrate) {
719 spare_threads = 0;
720 }
721
722 // Figure out how many capabilities we want to wake up. We need at least
723 // sparkPoolSize(cap) plus the number of spare threads we have.
724 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
725 if (n_wanted_caps == 0) return;
726
727 // First grab as many free Capabilities as we can. ToDo: we should use
728 // capabilities on the same NUMA node preferably, but not exclusively.
729 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
730 n_free_caps < n_wanted_caps && i != cap->no;
731 i = (i + 1) % n_capabilities) {
732 cap0 = capabilities[i];
733 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
734 if (!emptyRunQueue(cap0)
735 || cap0->n_returning_tasks != 0
736 || !emptyInbox(cap0)) {
737 // it already has some work, we just grabbed it at
738 // the wrong moment. Or maybe it's deadlocked!
739 releaseCapability(cap0);
740 } else {
741 free_caps[n_free_caps++] = cap0;
742 }
743 }
744 }
745
746 // We now have n_free_caps free capabilities stashed in
747 // free_caps[]. Attempt to share our run queue equally with them.
748 // This is complicated slightly by the fact that we can't move
749 // some threads:
750 //
751 // - threads that have TSO_LOCKED cannot migrate
752 // - a thread that is bound to the current Task cannot be migrated
753 //
754 // This is about the simplest thing we could do; improvements we
755 // might want to do include:
756 //
757 // - giving high priority to moving relatively new threads, on
758 // the gournds that they haven't had time to build up a
759 // working set in the cache on this CPU/Capability.
760 //
761 // - giving low priority to moving long-lived threads
762
763 if (n_free_caps > 0) {
764 StgTSO *prev, *t, *next;
765
766 debugTrace(DEBUG_sched,
767 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
768 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
769 n_free_caps);
770
771 // There are n_free_caps+1 caps in total. We will share the threads
772 // evently between them, *except* that if the run queue does not divide
773 // evenly by n_free_caps+1 then we bias towards the current capability.
774 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
775 uint32_t keep_threads =
776 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
777
778 // This also ensures that we don't give away all our threads, since
779 // (x + y) / (y + 1) >= 1 when x >= 1.
780
781 // The number of threads we have left.
782 uint32_t n = cap->n_run_queue;
783
784 // prev = the previous thread on this cap's run queue
785 prev = END_TSO_QUEUE;
786
787 // We're going to walk through the run queue, migrating threads to other
788 // capabilities until we have only keep_threads left. We might
789 // encounter a thread that cannot be migrated, in which case we add it
790 // to the current run queue and decrement keep_threads.
791 for (t = cap->run_queue_hd, i = 0;
792 t != END_TSO_QUEUE && n > keep_threads;
793 t = next)
794 {
795 next = t->_link;
796 t->_link = END_TSO_QUEUE;
797
798 // Should we keep this thread?
799 if (t->bound == task->incall // don't move my bound thread
800 || tsoLocked(t) // don't move a locked thread
801 ) {
802 if (prev == END_TSO_QUEUE) {
803 cap->run_queue_hd = t;
804 } else {
805 setTSOLink(cap, prev, t);
806 }
807 setTSOPrev(cap, t, prev);
808 prev = t;
809 if (keep_threads > 0) keep_threads--;
810 }
811
812 // Or migrate it?
813 else {
814 appendToRunQueue(free_caps[i],t);
815 traceEventMigrateThread (cap, t, free_caps[i]->no);
816
817 if (t->bound) { t->bound->task->cap = free_caps[i]; }
818 t->cap = free_caps[i];
819 n--; // we have one fewer threads now
820 i++; // move on to the next free_cap
821 if (i == n_free_caps) i = 0;
822 }
823 }
824
825 // Join up the beginning of the queue (prev)
826 // with the rest of the queue (t)
827 if (t == END_TSO_QUEUE) {
828 cap->run_queue_tl = prev;
829 } else {
830 setTSOPrev(cap, t, prev);
831 }
832 if (prev == END_TSO_QUEUE) {
833 cap->run_queue_hd = t;
834 } else {
835 setTSOLink(cap, prev, t);
836 }
837 cap->n_run_queue = n;
838
839 IF_DEBUG(sanity, checkRunQueue(cap));
840
841 // release the capabilities
842 for (i = 0; i < n_free_caps; i++) {
843 task->cap = free_caps[i];
844 if (sparkPoolSizeCap(cap) > 0) {
845 // If we have sparks to steal, wake up a worker on the
846 // capability, even if it has no threads to run.
847 releaseAndWakeupCapability(free_caps[i]);
848 } else {
849 releaseCapability(free_caps[i]);
850 }
851 }
852 }
853 task->cap = cap; // reset to point to our Capability.
854
855 #endif /* THREADED_RTS */
856
857 }
858
859 /* ----------------------------------------------------------------------------
860 * Start any pending signal handlers
861 * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868 // safe outside the lock
869 startSignalHandlers(cap);
870 }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880 * Check for blocked threads that can be woken up.
881 * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887 //
888 // Check whether any waiting threads need to be woken up. If the
889 // run queue is empty, and there are no other tasks running, we
890 // can wait indefinitely for something to happen.
891 //
892 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893 {
894 awaitEvent (emptyRunQueue(cap));
895 }
896 #endif
897 }
898
899 /* ----------------------------------------------------------------------------
900 * Detect deadlock conditions and attempt to resolve them.
901 * ------------------------------------------------------------------------- */
902
903 static void
904 scheduleDetectDeadlock (Capability **pcap, Task *task)
905 {
906 Capability *cap = *pcap;
907 /*
908 * Detect deadlock: when we have no threads to run, there are no
909 * threads blocked, waiting for I/O, or sleeping, and all the
910 * other tasks are waiting for work, we must have a deadlock of
911 * some description.
912 */
913 if ( emptyThreadQueues(cap) )
914 {
915 #if defined(THREADED_RTS)
916 /*
917 * In the threaded RTS, we only check for deadlock if there
918 * has been no activity in a complete timeslice. This means
919 * we won't eagerly start a full GC just because we don't have
920 * any threads to run currently.
921 */
922 if (recent_activity != ACTIVITY_INACTIVE) return;
923 #endif
924
925 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
926
927 // Garbage collection can release some new threads due to
928 // either (a) finalizers or (b) threads resurrected because
929 // they are unreachable and will therefore be sent an
930 // exception. Any threads thus released will be immediately
931 // runnable.
932 scheduleDoGC (pcap, task, true/*force major GC*/);
933 cap = *pcap;
934 // when force_major == true. scheduleDoGC sets
935 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
936 // signal.
937
938 if ( !emptyRunQueue(cap) ) return;
939
940 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
941 /* If we have user-installed signal handlers, then wait
942 * for signals to arrive rather then bombing out with a
943 * deadlock.
944 */
945 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
946 debugTrace(DEBUG_sched,
947 "still deadlocked, waiting for signals...");
948
949 awaitUserSignals();
950
951 if (signals_pending()) {
952 startSignalHandlers(cap);
953 }
954
955 // either we have threads to run, or we were interrupted:
956 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
957
958 return;
959 }
960 #endif
961
962 #if !defined(THREADED_RTS)
963 /* Probably a real deadlock. Send the current main thread the
964 * Deadlock exception.
965 */
966 if (task->incall->tso) {
967 switch (task->incall->tso->why_blocked) {
968 case BlockedOnSTM:
969 case BlockedOnBlackHole:
970 case BlockedOnMsgThrowTo:
971 case BlockedOnMVar:
972 case BlockedOnMVarRead:
973 throwToSingleThreaded(cap, task->incall->tso,
974 (StgClosure *)nonTermination_closure);
975 return;
976 default:
977 barf("deadlock: main thread blocked in a strange way");
978 }
979 }
980 return;
981 #endif
982 }
983 }
984
985
986 /* ----------------------------------------------------------------------------
987 * Process message in the current Capability's inbox
988 * ------------------------------------------------------------------------- */
989
990 static void
991 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
992 {
993 #if defined(THREADED_RTS)
994 Message *m, *next;
995 PutMVar *p, *pnext;
996 int r;
997 Capability *cap = *pcap;
998
999 while (!emptyInbox(cap)) {
1000 // Executing messages might use heap, so we should check for GC.
1001 if (doYouWantToGC(cap)) {
1002 scheduleDoGC(pcap, cap->running_task, false);
1003 cap = *pcap;
1004 }
1005
1006 // don't use a blocking acquire; if the lock is held by
1007 // another thread then just carry on. This seems to avoid
1008 // getting stuck in a message ping-pong situation with other
1009 // processors. We'll check the inbox again later anyway.
1010 //
1011 // We should really use a more efficient queue data structure
1012 // here. The trickiness is that we must ensure a Capability
1013 // never goes idle if the inbox is non-empty, which is why we
1014 // use cap->lock (cap->lock is released as the last thing
1015 // before going idle; see Capability.c:releaseCapability()).
1016 r = TRY_ACQUIRE_LOCK(&cap->lock);
1017 if (r != 0) return;
1018
1019 m = cap->inbox;
1020 p = cap->putMVars;
1021 cap->inbox = (Message*)END_TSO_QUEUE;
1022 cap->putMVars = NULL;
1023
1024 RELEASE_LOCK(&cap->lock);
1025
1026 while (m != (Message*)END_TSO_QUEUE) {
1027 next = m->link;
1028 executeMessage(cap, m);
1029 m = next;
1030 }
1031
1032 while (p != NULL) {
1033 pnext = p->link;
1034 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1035 Unit_closure);
1036 freeStablePtr(p->mvar);
1037 stgFree(p);
1038 p = pnext;
1039 }
1040 }
1041 #endif
1042 }
1043
1044
1045 /* ----------------------------------------------------------------------------
1046 * Activate spark threads (THREADED_RTS)
1047 * ------------------------------------------------------------------------- */
1048
1049 #if defined(THREADED_RTS)
1050 static void
1051 scheduleActivateSpark(Capability *cap)
1052 {
1053 if (anySparks() && !cap->disabled)
1054 {
1055 createSparkThread(cap);
1056 debugTrace(DEBUG_sched, "creating a spark thread");
1057 }
1058 }
1059 #endif // THREADED_RTS
1060
1061 /* ----------------------------------------------------------------------------
1062 * After running a thread...
1063 * ------------------------------------------------------------------------- */
1064
1065 static void
1066 schedulePostRunThread (Capability *cap, StgTSO *t)
1067 {
1068 // We have to be able to catch transactions that are in an
1069 // infinite loop as a result of seeing an inconsistent view of
1070 // memory, e.g.
1071 //
1072 // atomically $ do
1073 // [a,b] <- mapM readTVar [ta,tb]
1074 // when (a == b) loop
1075 //
1076 // and a is never equal to b given a consistent view of memory.
1077 //
1078 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1079 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1080 debugTrace(DEBUG_sched | DEBUG_stm,
1081 "trec %p found wasting its time", t);
1082
1083 // strip the stack back to the
1084 // ATOMICALLY_FRAME, aborting the (nested)
1085 // transaction, and saving the stack of any
1086 // partially-evaluated thunks on the heap.
1087 throwToSingleThreaded_(cap, t, NULL, true);
1088
1089 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1090 }
1091 }
1092
1093 //
1094 // If the current thread's allocation limit has run out, send it
1095 // the AllocationLimitExceeded exception.
1096
1097 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1098 // Use a throwToSelf rather than a throwToSingleThreaded, because
1099 // it correctly handles the case where the thread is currently
1100 // inside mask. Also the thread might be blocked (e.g. on an
1101 // MVar), and throwToSingleThreaded doesn't unblock it
1102 // correctly in that case.
1103 throwToSelf(cap, t, allocationLimitExceeded_closure);
1104 ASSIGN_Int64((W_*)&(t->alloc_limit),
1105 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1106 }
1107
1108 /* some statistics gathering in the parallel case */
1109 }
1110
1111 /* -----------------------------------------------------------------------------
1112 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1113 * -------------------------------------------------------------------------- */
1114
1115 static bool
1116 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1117 {
1118 if (cap->r.rHpLim == NULL || cap->context_switch) {
1119 // Sometimes we miss a context switch, e.g. when calling
1120 // primitives in a tight loop, MAYBE_GC() doesn't check the
1121 // context switch flag, and we end up waiting for a GC.
1122 // See #1984, and concurrent/should_run/1984
1123 cap->context_switch = 0;
1124 appendToRunQueue(cap,t);
1125 } else {
1126 pushOnRunQueue(cap,t);
1127 }
1128
1129 // did the task ask for a large block?
1130 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1131 // if so, get one and push it on the front of the nursery.
1132 bdescr *bd;
1133 W_ blocks;
1134
1135 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1136
1137 if (blocks > BLOCKS_PER_MBLOCK) {
1138 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1139 }
1140
1141 debugTrace(DEBUG_sched,
1142 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1143 (long)t->id, what_next_strs[t->what_next], blocks);
1144
1145 // don't do this if the nursery is (nearly) full, we'll GC first.
1146 if (cap->r.rCurrentNursery->link != NULL ||
1147 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1148 // infinite loop if the
1149 // nursery has only one
1150 // block.
1151
1152 bd = allocGroupOnNode_lock(cap->node,blocks);
1153 cap->r.rNursery->n_blocks += blocks;
1154
1155 // link the new group after CurrentNursery
1156 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1157
1158 // initialise it as a nursery block. We initialise the
1159 // step, gen_no, and flags field of *every* sub-block in
1160 // this large block, because this is easier than making
1161 // sure that we always find the block head of a large
1162 // block whenever we call Bdescr() (eg. evacuate() and
1163 // isAlive() in the GC would both have to do this, at
1164 // least).
1165 {
1166 bdescr *x;
1167 for (x = bd; x < bd + blocks; x++) {
1168 initBdescr(x,g0,g0);
1169 x->free = x->start;
1170 x->flags = 0;
1171 }
1172 }
1173
1174 // This assert can be a killer if the app is doing lots
1175 // of large block allocations.
1176 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1177
1178 // now update the nursery to point to the new block
1179 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1180 cap->r.rCurrentNursery = bd;
1181
1182 // we might be unlucky and have another thread get on the
1183 // run queue before us and steal the large block, but in that
1184 // case the thread will just end up requesting another large
1185 // block.
1186 return false; /* not actually GC'ing */
1187 }
1188 }
1189
1190 return doYouWantToGC(cap);
1191 /* actual GC is done at the end of the while loop in schedule() */
1192 }
1193
1194 /* -----------------------------------------------------------------------------
1195 * Handle a thread that returned to the scheduler with ThreadYielding
1196 * -------------------------------------------------------------------------- */
1197
1198 static bool
1199 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1200 {
1201 /* put the thread back on the run queue. Then, if we're ready to
1202 * GC, check whether this is the last task to stop. If so, wake
1203 * up the GC thread. getThread will block during a GC until the
1204 * GC is finished.
1205 */
1206
1207 ASSERT(t->_link == END_TSO_QUEUE);
1208
1209 // Shortcut if we're just switching evaluators: just run the thread. See
1210 // Note [avoiding threadPaused] in Interpreter.c.
1211 if (t->what_next != prev_what_next) {
1212 debugTrace(DEBUG_sched,
1213 "--<< thread %ld (%s) stopped to switch evaluators",
1214 (long)t->id, what_next_strs[t->what_next]);
1215 return true;
1216 }
1217
1218 // Reset the context switch flag. We don't do this just before
1219 // running the thread, because that would mean we would lose ticks
1220 // during GC, which can lead to unfair scheduling (a thread hogs
1221 // the CPU because the tick always arrives during GC). This way
1222 // penalises threads that do a lot of allocation, but that seems
1223 // better than the alternative.
1224 if (cap->context_switch != 0) {
1225 cap->context_switch = 0;
1226 appendToRunQueue(cap,t);
1227 } else {
1228 pushOnRunQueue(cap,t);
1229 }
1230
1231 IF_DEBUG(sanity,
1232 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1233 checkTSO(t));
1234
1235 return false;
1236 }
1237
1238 /* -----------------------------------------------------------------------------
1239 * Handle a thread that returned to the scheduler with ThreadBlocked
1240 * -------------------------------------------------------------------------- */
1241
1242 static void
1243 scheduleHandleThreadBlocked( StgTSO *t
1244 #if !defined(DEBUG)
1245 STG_UNUSED
1246 #endif
1247 )
1248 {
1249
1250 // We don't need to do anything. The thread is blocked, and it
1251 // has tidied up its stack and placed itself on whatever queue
1252 // it needs to be on.
1253
1254 // ASSERT(t->why_blocked != NotBlocked);
1255 // Not true: for example,
1256 // - the thread may have woken itself up already, because
1257 // threadPaused() might have raised a blocked throwTo
1258 // exception, see maybePerformBlockedException().
1259
1260 #if defined(DEBUG)
1261 traceThreadStatus(DEBUG_sched, t);
1262 #endif
1263 }
1264
1265 /* -----------------------------------------------------------------------------
1266 * Handle a thread that returned to the scheduler with ThreadFinished
1267 * -------------------------------------------------------------------------- */
1268
1269 static bool
1270 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1271 {
1272 /* Need to check whether this was a main thread, and if so,
1273 * return with the return value.
1274 *
1275 * We also end up here if the thread kills itself with an
1276 * uncaught exception, see Exception.cmm.
1277 */
1278
1279 // blocked exceptions can now complete, even if the thread was in
1280 // blocked mode (see #2910).
1281 awakenBlockedExceptionQueue (cap, t);
1282
1283 //
1284 // Check whether the thread that just completed was a bound
1285 // thread, and if so return with the result.
1286 //
1287 // There is an assumption here that all thread completion goes
1288 // through this point; we need to make sure that if a thread
1289 // ends up in the ThreadKilled state, that it stays on the run
1290 // queue so it can be dealt with here.
1291 //
1292
1293 if (t->bound) {
1294
1295 if (t->bound != task->incall) {
1296 #if !defined(THREADED_RTS)
1297 // Must be a bound thread that is not the topmost one. Leave
1298 // it on the run queue until the stack has unwound to the
1299 // point where we can deal with this. Leaving it on the run
1300 // queue also ensures that the garbage collector knows about
1301 // this thread and its return value (it gets dropped from the
1302 // step->threads list so there's no other way to find it).
1303 appendToRunQueue(cap,t);
1304 return false;
1305 #else
1306 // this cannot happen in the threaded RTS, because a
1307 // bound thread can only be run by the appropriate Task.
1308 barf("finished bound thread that isn't mine");
1309 #endif
1310 }
1311
1312 ASSERT(task->incall->tso == t);
1313
1314 if (t->what_next == ThreadComplete) {
1315 if (task->incall->ret) {
1316 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1317 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1318 }
1319 task->incall->rstat = Success;
1320 } else {
1321 if (task->incall->ret) {
1322 *(task->incall->ret) = NULL;
1323 }
1324 if (sched_state >= SCHED_INTERRUPTING) {
1325 if (heap_overflow) {
1326 task->incall->rstat = HeapExhausted;
1327 } else {
1328 task->incall->rstat = Interrupted;
1329 }
1330 } else {
1331 task->incall->rstat = Killed;
1332 }
1333 }
1334 #if defined(DEBUG)
1335 removeThreadLabel((StgWord)task->incall->tso->id);
1336 #endif
1337
1338 // We no longer consider this thread and task to be bound to
1339 // each other. The TSO lives on until it is GC'd, but the
1340 // task is about to be released by the caller, and we don't
1341 // want anyone following the pointer from the TSO to the
1342 // defunct task (which might have already been
1343 // re-used). This was a real bug: the GC updated
1344 // tso->bound->tso which lead to a deadlock.
1345 t->bound = NULL;
1346 task->incall->tso = NULL;
1347
1348 return true; // tells schedule() to return
1349 }
1350
1351 return false;
1352 }
1353
1354 /* -----------------------------------------------------------------------------
1355 * Perform a heap census
1356 * -------------------------------------------------------------------------- */
1357
1358 static bool
1359 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1360 {
1361 // When we have +RTS -i0 and we're heap profiling, do a census at
1362 // every GC. This lets us get repeatable runs for debugging.
1363 if (performHeapProfile ||
1364 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1365 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1366 return true;
1367 } else {
1368 return false;
1369 }
1370 }
1371
1372 /* -----------------------------------------------------------------------------
1373 * stopAllCapabilities()
1374 *
1375 * Stop all Haskell execution. This is used when we need to make some global
1376 * change to the system, such as altering the number of capabilities, or
1377 * forking.
1378 *
1379 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1380 * -------------------------------------------------------------------------- */
1381
1382 #if defined(THREADED_RTS)
1383 static void stopAllCapabilities (Capability **pCap, Task *task)
1384 {
1385 bool was_syncing;
1386 SyncType prev_sync_type;
1387
1388 PendingSync sync = {
1389 .type = SYNC_OTHER,
1390 .idle = NULL,
1391 .task = task
1392 };
1393
1394 do {
1395 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1396 } while (was_syncing);
1397
1398 acquireAllCapabilities(*pCap,task);
1399
1400 pending_sync = 0;
1401 }
1402 #endif
1403
1404 /* -----------------------------------------------------------------------------
1405 * requestSync()
1406 *
1407 * Commence a synchronisation between all capabilities. Normally not called
1408 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1409 * has some special synchronisation requirements.
1410 *
1411 * Returns:
1412 * false if we successfully got a sync
1413 * true if there was another sync request in progress,
1414 * and we yielded to it. The value returned is the
1415 * type of the other sync request.
1416 * -------------------------------------------------------------------------- */
1417
1418 #if defined(THREADED_RTS)
1419 static bool requestSync (
1420 Capability **pcap, Task *task, PendingSync *new_sync,
1421 SyncType *prev_sync_type)
1422 {
1423 PendingSync *sync;
1424
1425 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1426 (StgWord)NULL,
1427 (StgWord)new_sync);
1428
1429 if (sync != NULL)
1430 {
1431 // sync is valid until we have called yieldCapability().
1432 // After the sync is completed, we cannot read that struct any
1433 // more because it has been freed.
1434 *prev_sync_type = sync->type;
1435 do {
1436 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1437 sync->type);
1438 ASSERT(*pcap);
1439 yieldCapability(pcap,task,true);
1440 sync = pending_sync;
1441 } while (sync != NULL);
1442
1443 // NOTE: task->cap might have changed now
1444 return true;
1445 }
1446 else
1447 {
1448 return false;
1449 }
1450 }
1451 #endif
1452
1453 /* -----------------------------------------------------------------------------
1454 * acquireAllCapabilities()
1455 *
1456 * Grab all the capabilities except the one we already hold. Used
1457 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1458 * before a fork (SYNC_OTHER).
1459 *
1460 * Only call this after requestSync(), otherwise a deadlock might
1461 * ensue if another thread is trying to synchronise.
1462 * -------------------------------------------------------------------------- */
1463
1464 #if defined(THREADED_RTS)
1465 static void acquireAllCapabilities(Capability *cap, Task *task)
1466 {
1467 Capability *tmpcap;
1468 uint32_t i;
1469
1470 ASSERT(pending_sync != NULL);
1471 for (i=0; i < n_capabilities; i++) {
1472 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1473 i, n_capabilities);
1474 tmpcap = capabilities[i];
1475 if (tmpcap != cap) {
1476 // we better hope this task doesn't get migrated to
1477 // another Capability while we're waiting for this one.
1478 // It won't, because load balancing happens while we have
1479 // all the Capabilities, but even so it's a slightly
1480 // unsavoury invariant.
1481 task->cap = tmpcap;
1482 waitForCapability(&tmpcap, task);
1483 if (tmpcap->no != i) {
1484 barf("acquireAllCapabilities: got the wrong capability");
1485 }
1486 }
1487 }
1488 task->cap = cap;
1489 }
1490 #endif
1491
1492 /* -----------------------------------------------------------------------------
1493 * releaseAllcapabilities()
1494 *
1495 * Assuming this thread holds all the capabilities, release them all except for
1496 * the one passed in as cap.
1497 * -------------------------------------------------------------------------- */
1498
1499 #if defined(THREADED_RTS)
1500 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1501 {
1502 uint32_t i;
1503
1504 for (i = 0; i < n; i++) {
1505 if (cap->no != i) {
1506 task->cap = capabilities[i];
1507 releaseCapability(capabilities[i]);
1508 }
1509 }
1510 task->cap = cap;
1511 }
1512 #endif
1513
1514 /* -----------------------------------------------------------------------------
1515 * Perform a garbage collection if necessary
1516 * -------------------------------------------------------------------------- */
1517
1518 static void
1519 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1520 bool force_major)
1521 {
1522 Capability *cap = *pcap;
1523 bool heap_census;
1524 uint32_t collect_gen;
1525 bool major_gc;
1526 #if defined(THREADED_RTS)
1527 uint32_t gc_type;
1528 uint32_t i;
1529 uint32_t need_idle;
1530 uint32_t n_gc_threads;
1531 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1532 StgTSO *tso;
1533 bool *idle_cap;
1534 // idle_cap is an array (allocated later) of size n_capabilities, where
1535 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1536 // cycle.
1537 #endif
1538
1539 if (sched_state == SCHED_SHUTTING_DOWN) {
1540 // The final GC has already been done, and the system is
1541 // shutting down. We'll probably deadlock if we try to GC
1542 // now.
1543 return;
1544 }
1545
1546 heap_census = scheduleNeedHeapProfile(true);
1547
1548 // Figure out which generation we are collecting, so that we can
1549 // decide whether this is a parallel GC or not.
1550 collect_gen = calcNeeded(force_major || heap_census, NULL);
1551 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1552
1553 #if defined(THREADED_RTS)
1554 if (sched_state < SCHED_INTERRUPTING
1555 && RtsFlags.ParFlags.parGcEnabled
1556 && collect_gen >= RtsFlags.ParFlags.parGcGen
1557 && ! oldest_gen->mark)
1558 {
1559 gc_type = SYNC_GC_PAR;
1560 } else {
1561 gc_type = SYNC_GC_SEQ;
1562 }
1563
1564 // In order to GC, there must be no threads running Haskell code.
1565 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1566 // capabilities, and release them after the GC has completed. For parallel
1567 // GC, we synchronise all the running threads using requestSync().
1568 //
1569 // Other capabilities are prevented from running yet more Haskell threads if
1570 // pending_sync is set. Tested inside yieldCapability() and
1571 // releaseCapability() in Capability.c
1572
1573 PendingSync sync = {
1574 .type = gc_type,
1575 .idle = NULL,
1576 .task = task
1577 };
1578
1579 {
1580 SyncType prev_sync = 0;
1581 bool was_syncing;
1582 do {
1583 // If -qn is not set and we have more capabilities than cores, set
1584 // the number of GC threads to #cores. We do this here rather than
1585 // in normaliseRtsOpts() because here it will work if the program
1586 // calls setNumCapabilities.
1587 //
1588 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1589 if (n_gc_threads == 0 &&
1590 enabled_capabilities > getNumberOfProcessors()) {
1591 n_gc_threads = getNumberOfProcessors();
1592 }
1593
1594 // This calculation must be inside the loop because
1595 // enabled_capabilities may change if requestSync() below fails and
1596 // we retry.
1597 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1598 if (n_gc_threads >= enabled_capabilities) {
1599 need_idle = 0;
1600 } else {
1601 need_idle = enabled_capabilities - n_gc_threads;
1602 }
1603 } else {
1604 need_idle = 0;
1605 }
1606
1607 // We need an array of size n_capabilities, but since this may
1608 // change each time around the loop we must allocate it afresh.
1609 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1610 sizeof(bool),
1611 "scheduleDoGC");
1612 sync.idle = idle_cap;
1613
1614 // When using +RTS -qn, we need some capabilities to be idle during
1615 // GC. The best bet is to choose some inactive ones, so we look for
1616 // those first:
1617 uint32_t n_idle = need_idle;
1618 for (i=0; i < n_capabilities; i++) {
1619 if (capabilities[i]->disabled) {
1620 idle_cap[i] = true;
1621 } else if (n_idle > 0 &&
1622 capabilities[i]->running_task == NULL) {
1623 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1624 n_idle--;
1625 idle_cap[i] = true;
1626 } else {
1627 idle_cap[i] = false;
1628 }
1629 }
1630 // If we didn't find enough inactive capabilities, just pick some
1631 // more to be idle.
1632 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1633 if (!idle_cap[i] && i != cap->no) {
1634 idle_cap[i] = true;
1635 n_idle--;
1636 }
1637 }
1638 ASSERT(n_idle == 0);
1639
1640 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1641 cap = *pcap;
1642 if (was_syncing) {
1643 stgFree(idle_cap);
1644 }
1645 if (was_syncing &&
1646 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1647 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1648 // someone else had a pending sync request for a GC, so
1649 // let's assume GC has been done and we don't need to GC
1650 // again.
1651 // Exception to this: if SCHED_INTERRUPTING, then we still
1652 // need to do the final GC.
1653 return;
1654 }
1655 if (sched_state == SCHED_SHUTTING_DOWN) {
1656 // The scheduler might now be shutting down. We tested
1657 // this above, but it might have become true since then as
1658 // we yielded the capability in requestSync().
1659 return;
1660 }
1661 } while (was_syncing);
1662 }
1663
1664 stat_startGCSync(gc_threads[cap->no]);
1665
1666 #if defined(DEBUG)
1667 unsigned int old_n_capabilities = n_capabilities;
1668 #endif
1669
1670 interruptAllCapabilities();
1671
1672 // The final shutdown GC is always single-threaded, because it's
1673 // possible that some of the Capabilities have no worker threads.
1674
1675 if (gc_type == SYNC_GC_SEQ) {
1676 traceEventRequestSeqGc(cap);
1677 } else {
1678 traceEventRequestParGc(cap);
1679 }
1680
1681 if (gc_type == SYNC_GC_SEQ) {
1682 // single-threaded GC: grab all the capabilities
1683 acquireAllCapabilities(cap,task);
1684 }
1685 else
1686 {
1687 // If we are load-balancing collections in this
1688 // generation, then we require all GC threads to participate
1689 // in the collection. Otherwise, we only require active
1690 // threads to participate, and we set gc_threads[i]->idle for
1691 // any idle capabilities. The rationale here is that waking
1692 // up an idle Capability takes much longer than just doing any
1693 // GC work on its behalf.
1694
1695 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1696 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1697 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1698 {
1699 for (i=0; i < n_capabilities; i++) {
1700 if (capabilities[i]->disabled) {
1701 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1702 if (idle_cap[i]) {
1703 n_idle_caps++;
1704 }
1705 } else {
1706 if (i != cap->no && idle_cap[i]) {
1707 Capability *tmpcap = capabilities[i];
1708 task->cap = tmpcap;
1709 waitForCapability(&tmpcap, task);
1710 n_idle_caps++;
1711 }
1712 }
1713 }
1714 }
1715 else
1716 {
1717 for (i=0; i < n_capabilities; i++) {
1718 if (capabilities[i]->disabled) {
1719 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1720 if (idle_cap[i]) {
1721 n_idle_caps++;
1722 }
1723 } else if (i != cap->no &&
1724 capabilities[i]->idle >=
1725 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1726 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1727 if (idle_cap[i]) {
1728 n_idle_caps++;
1729 } else {
1730 n_failed_trygrab_idles++;
1731 }
1732 }
1733 }
1734 }
1735 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1736
1737 for (i=0; i < n_capabilities; i++) {
1738 capabilities[i]->idle++;
1739 }
1740
1741 // For all capabilities participating in this GC, wait until
1742 // they have stopped mutating and are standing by for GC.
1743 waitForGcThreads(cap, idle_cap);
1744
1745 // Stable point where we can do a global check on our spark counters
1746 ASSERT(checkSparkCountInvariant());
1747 }
1748
1749 #endif
1750
1751 IF_DEBUG(scheduler, printAllThreads());
1752
1753 delete_threads_and_gc:
1754 /*
1755 * We now have all the capabilities; if we're in an interrupting
1756 * state, then we should take the opportunity to delete all the
1757 * threads in the system.
1758 * Checking for major_gc ensures that the last GC is major.
1759 */
1760 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1761 deleteAllThreads(cap);
1762 #if defined(THREADED_RTS)
1763 // Discard all the sparks from every Capability. Why?
1764 // They'll probably be GC'd anyway since we've killed all the
1765 // threads. It just avoids the GC having to do any work to
1766 // figure out that any remaining sparks are garbage.
1767 for (i = 0; i < n_capabilities; i++) {
1768 capabilities[i]->spark_stats.gcd +=
1769 sparkPoolSize(capabilities[i]->sparks);
1770 // No race here since all Caps are stopped.
1771 discardSparksCap(capabilities[i]);
1772 }
1773 #endif
1774 sched_state = SCHED_SHUTTING_DOWN;
1775 }
1776
1777 /*
1778 * When there are disabled capabilities, we want to migrate any
1779 * threads away from them. Normally this happens in the
1780 * scheduler's loop, but only for unbound threads - it's really
1781 * hard for a bound thread to migrate itself. So we have another
1782 * go here.
1783 */
1784 #if defined(THREADED_RTS)
1785 for (i = enabled_capabilities; i < n_capabilities; i++) {
1786 Capability *tmp_cap, *dest_cap;
1787 tmp_cap = capabilities[i];
1788 ASSERT(tmp_cap->disabled);
1789 if (i != cap->no) {
1790 dest_cap = capabilities[i % enabled_capabilities];
1791 while (!emptyRunQueue(tmp_cap)) {
1792 tso = popRunQueue(tmp_cap);
1793 migrateThread(tmp_cap, tso, dest_cap);
1794 if (tso->bound) {
1795 traceTaskMigrate(tso->bound->task,
1796 tso->bound->task->cap,
1797 dest_cap);
1798 tso->bound->task->cap = dest_cap;
1799 }
1800 }
1801 }
1802 }
1803 #endif
1804
1805 // Do any remaining idle GC work from the previous GC
1806 doIdleGCWork(cap, true /* all of it */);
1807
1808 #if defined(THREADED_RTS)
1809 // reset pending_sync *before* GC, so that when the GC threads
1810 // emerge they don't immediately re-enter the GC.
1811 pending_sync = 0;
1812 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1813 #else
1814 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1815 #endif
1816
1817 // If we're shutting down, don't leave any idle GC work to do.
1818 if (sched_state == SCHED_SHUTTING_DOWN) {
1819 doIdleGCWork(cap, true /* all of it */);
1820 }
1821
1822 traceSparkCounters(cap);
1823
1824 switch (recent_activity) {
1825 case ACTIVITY_INACTIVE:
1826 if (force_major) {
1827 // We are doing a GC because the system has been idle for a
1828 // timeslice and we need to check for deadlock. Record the
1829 // fact that we've done a GC and turn off the timer signal;
1830 // it will get re-enabled if we run any threads after the GC.
1831 recent_activity = ACTIVITY_DONE_GC;
1832 #if !defined(PROFILING)
1833 stopTimer();
1834 #endif
1835 break;
1836 }
1837 // fall through...
1838
1839 case ACTIVITY_MAYBE_NO:
1840 // the GC might have taken long enough for the timer to set
1841 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1842 // but we aren't necessarily deadlocked:
1843 recent_activity = ACTIVITY_YES;
1844 break;
1845
1846 case ACTIVITY_DONE_GC:
1847 // If we are actually active, the scheduler will reset the
1848 // recent_activity flag and re-enable the timer.
1849 break;
1850 }
1851
1852 #if defined(THREADED_RTS)
1853 // Stable point where we can do a global check on our spark counters
1854 ASSERT(checkSparkCountInvariant());
1855 #endif
1856
1857 // The heap census itself is done during GarbageCollect().
1858 if (heap_census) {
1859 performHeapProfile = false;
1860 }
1861
1862 #if defined(THREADED_RTS)
1863
1864 // If n_capabilities has changed during GC, we're in trouble.
1865 ASSERT(n_capabilities == old_n_capabilities);
1866
1867 if (gc_type == SYNC_GC_PAR)
1868 {
1869 for (i = 0; i < n_capabilities; i++) {
1870 if (i != cap->no) {
1871 if (idle_cap[i]) {
1872 ASSERT(capabilities[i]->running_task == task);
1873 task->cap = capabilities[i];
1874 releaseCapability(capabilities[i]);
1875 } else {
1876 ASSERT(capabilities[i]->running_task != task);
1877 }
1878 }
1879 }
1880 task->cap = cap;
1881
1882 // releaseGCThreads() happens *after* we have released idle
1883 // capabilities. Otherwise what can happen is one of the released
1884 // threads starts a new GC, and finds that it can't acquire some of
1885 // the disabled capabilities, because the previous GC still holds
1886 // them, so those disabled capabilities will not be idle during the
1887 // next GC round. However, if we release the capabilities first,
1888 // then they will be free (because they're disabled) when the next
1889 // GC cycle happens.
1890 releaseGCThreads(cap, idle_cap);
1891 }
1892 #endif
1893 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1894 // GC set the heap_overflow flag. We should throw an exception if we
1895 // can, or shut down otherwise.
1896
1897 // Get the thread to which Ctrl-C is thrown
1898 StgTSO *main_thread = getTopHandlerThread();
1899 if (main_thread == NULL) {
1900 // GC set the heap_overflow flag, and there is no main thread to
1901 // throw an exception to, so we should proceed with an orderly
1902 // shutdown now. Ultimately we want the main thread to return to
1903 // its caller with HeapExhausted, at which point the caller should
1904 // call hs_exit(). The first step is to delete all the threads.
1905 sched_state = SCHED_INTERRUPTING;
1906 goto delete_threads_and_gc;
1907 }
1908
1909 heap_overflow = false;
1910 const uint64_t allocation_count = getAllocations();
1911 if (RtsFlags.GcFlags.heapLimitGrace <
1912 allocation_count - allocated_bytes_at_heapoverflow ||
1913 allocated_bytes_at_heapoverflow == 0) {
1914 allocated_bytes_at_heapoverflow = allocation_count;
1915 // We used to simply exit, but throwing an exception gives the
1916 // program a chance to clean up. It also lets the exception be
1917 // caught.
1918
1919 // FIXME this is not a good way to tell a program to release
1920 // resources. It is neither reliable (the RTS crashes if it fails
1921 // to allocate memory from the OS) nor very usable (it is always
1922 // thrown to the main thread, which might not be able to do anything
1923 // useful with it). We really should have a more general way to
1924 // release resources in low-memory conditions. Nevertheless, this
1925 // is still a big improvement over just exiting.
1926
1927 // FIXME again: perhaps we should throw a synchronous exception
1928 // instead an asynchronous one, or have a way for the program to
1929 // register a handler to be called when heap overflow happens.
1930 throwToSelf(cap, main_thread, heapOverflow_closure);
1931 }
1932 }
1933
1934 #if defined(THREADED_RTS)
1935 stgFree(idle_cap);
1936
1937 if (gc_type == SYNC_GC_SEQ) {
1938 // release our stash of capabilities.
1939 releaseAllCapabilities(n_capabilities, cap, task);
1940 }
1941 #endif
1942
1943 return;
1944 }
1945
1946 /* ---------------------------------------------------------------------------
1947 * Singleton fork(). Do not copy any running threads.
1948 * ------------------------------------------------------------------------- */
1949
1950 pid_t
1951 forkProcess(HsStablePtr *entry
1952 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1953 STG_UNUSED
1954 #endif
1955 )
1956 {
1957 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1958 pid_t pid;
1959 StgTSO* t,*next;
1960 Capability *cap;
1961 uint32_t g;
1962 Task *task = NULL;
1963 uint32_t i;
1964
1965 debugTrace(DEBUG_sched, "forking!");
1966
1967 task = newBoundTask();
1968
1969 cap = NULL;
1970 waitForCapability(&cap, task);
1971
1972 #if defined(THREADED_RTS)
1973 stopAllCapabilities(&cap, task);
1974 #endif
1975
1976 // no funny business: hold locks while we fork, otherwise if some
1977 // other thread is holding a lock when the fork happens, the data
1978 // structure protected by the lock will forever be in an
1979 // inconsistent state in the child. See also #1391.
1980 ACQUIRE_LOCK(&sched_mutex);
1981 ACQUIRE_LOCK(&sm_mutex);
1982 ACQUIRE_LOCK(&stable_mutex);
1983 ACQUIRE_LOCK(&task->lock);
1984
1985 for (i=0; i < n_capabilities; i++) {
1986 ACQUIRE_LOCK(&capabilities[i]->lock);
1987 }
1988
1989 #if defined(THREADED_RTS)
1990 ACQUIRE_LOCK(&all_tasks_mutex);
1991 #endif
1992
1993 stopTimer(); // See #4074
1994
1995 #if defined(TRACING)
1996 flushEventLog(); // so that child won't inherit dirty file buffers
1997 #endif
1998
1999 pid = fork();
2000
2001 if (pid) { // parent
2002
2003 startTimer(); // #4074
2004
2005 RELEASE_LOCK(&sched_mutex);
2006 RELEASE_LOCK(&sm_mutex);
2007 RELEASE_LOCK(&stable_mutex);
2008 RELEASE_LOCK(&task->lock);
2009
2010 #if defined(THREADED_RTS)
2011 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2012 RELEASE_LOCK(&all_tasks_mutex);
2013 #endif
2014
2015 for (i=0; i < n_capabilities; i++) {
2016 releaseCapability_(capabilities[i],false);
2017 RELEASE_LOCK(&capabilities[i]->lock);
2018 }
2019
2020 boundTaskExiting(task);
2021
2022 // just return the pid
2023 return pid;
2024
2025 } else { // child
2026
2027 #if defined(THREADED_RTS)
2028 initMutex(&sched_mutex);
2029 initMutex(&sm_mutex);
2030 initMutex(&stable_mutex);
2031 initMutex(&task->lock);
2032
2033 for (i=0; i < n_capabilities; i++) {
2034 initMutex(&capabilities[i]->lock);
2035 }
2036
2037 initMutex(&all_tasks_mutex);
2038 #endif
2039
2040 #if defined(TRACING)
2041 resetTracing();
2042 #endif
2043
2044 // Now, all OS threads except the thread that forked are
2045 // stopped. We need to stop all Haskell threads, including
2046 // those involved in foreign calls. Also we need to delete
2047 // all Tasks, because they correspond to OS threads that are
2048 // now gone.
2049
2050 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2051 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2052 next = t->global_link;
2053 // don't allow threads to catch the ThreadKilled
2054 // exception, but we do want to raiseAsync() because these
2055 // threads may be evaluating thunks that we need later.
2056 deleteThread_(t->cap,t);
2057
2058 // stop the GC from updating the InCall to point to
2059 // the TSO. This is only necessary because the
2060 // OSThread bound to the TSO has been killed, and
2061 // won't get a chance to exit in the usual way (see
2062 // also scheduleHandleThreadFinished).
2063 t->bound = NULL;
2064 }
2065 }
2066
2067 discardTasksExcept(task);
2068
2069 for (i=0; i < n_capabilities; i++) {
2070 cap = capabilities[i];
2071
2072 // Empty the run queue. It seems tempting to let all the
2073 // killed threads stay on the run queue as zombies to be
2074 // cleaned up later, but some of them may correspond to
2075 // bound threads for which the corresponding Task does not
2076 // exist.
2077 truncateRunQueue(cap);
2078 cap->n_run_queue = 0;
2079
2080 // Any suspended C-calling Tasks are no more, their OS threads
2081 // don't exist now:
2082 cap->suspended_ccalls = NULL;
2083 cap->n_suspended_ccalls = 0;
2084
2085 #if defined(THREADED_RTS)
2086 // Wipe our spare workers list, they no longer exist. New
2087 // workers will be created if necessary.
2088 cap->spare_workers = NULL;
2089 cap->n_spare_workers = 0;
2090 cap->returning_tasks_hd = NULL;
2091 cap->returning_tasks_tl = NULL;
2092 cap->n_returning_tasks = 0;
2093 #endif
2094
2095 // Release all caps except 0, we'll use that for starting
2096 // the IO manager and running the client action below.
2097 if (cap->no != 0) {
2098 task->cap = cap;
2099 releaseCapability(cap);
2100 }
2101 }
2102 cap = capabilities[0];
2103 task->cap = cap;
2104
2105 // Empty the threads lists. Otherwise, the garbage
2106 // collector may attempt to resurrect some of these threads.
2107 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2108 generations[g].threads = END_TSO_QUEUE;
2109 }
2110
2111 // On Unix, all timers are reset in the child, so we need to start
2112 // the timer again.
2113 initTimer();
2114 startTimer();
2115
2116 // TODO: need to trace various other things in the child
2117 // like startup event, capabilities, process info etc
2118 traceTaskCreate(task, cap);
2119
2120 #if defined(THREADED_RTS)
2121 ioManagerStartCap(&cap);
2122 #endif
2123
2124 // Install toplevel exception handlers, so interruption
2125 // signal will be sent to the main thread.
2126 // See Trac #12903
2127 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2128 rts_checkSchedStatus("forkProcess",cap);
2129
2130 rts_unlock(cap);
2131 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2132 }
2133 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2134 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2135 #endif
2136 }
2137
2138 /* ---------------------------------------------------------------------------
2139 * Changing the number of Capabilities
2140 *
2141 * Changing the number of Capabilities is very tricky! We can only do
2142 * it with the system fully stopped, so we do a full sync with
2143 * requestSync(SYNC_OTHER) and grab all the capabilities.
2144 *
2145 * Then we resize the appropriate data structures, and update all
2146 * references to the old data structures which have now moved.
2147 * Finally we release the Capabilities we are holding, and start
2148 * worker Tasks on the new Capabilities we created.
2149 *
2150 * ------------------------------------------------------------------------- */
2151
2152 void
2153 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2154 {
2155 #if !defined(THREADED_RTS)
2156 if (new_n_capabilities != 1) {
2157 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2158 }
2159 return;
2160 #elif defined(NOSMP)
2161 if (new_n_capabilities != 1) {
2162 errorBelch("setNumCapabilities: not supported on this platform");
2163 }
2164 return;
2165 #else
2166 Task *task;
2167 Capability *cap;
2168 uint32_t n;
2169 Capability *old_capabilities = NULL;
2170 uint32_t old_n_capabilities = n_capabilities;
2171
2172 if (new_n_capabilities == enabled_capabilities) {
2173 return;
2174 } else if (new_n_capabilities <= 0) {
2175 errorBelch("setNumCapabilities: Capability count must be positive");
2176 return;
2177 }
2178
2179
2180 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2181 enabled_capabilities, new_n_capabilities);
2182
2183 cap = rts_lock();
2184 task = cap->running_task;
2185
2186 stopAllCapabilities(&cap, task);
2187
2188 if (new_n_capabilities < enabled_capabilities)
2189 {
2190 // Reducing the number of capabilities: we do not actually
2191 // remove the extra capabilities, we just mark them as
2192 // "disabled". This has the following effects:
2193 //
2194 // - threads on a disabled capability are migrated away by the
2195 // scheduler loop
2196 //
2197 // - disabled capabilities do not participate in GC
2198 // (see scheduleDoGC())
2199 //
2200 // - No spark threads are created on this capability
2201 // (see scheduleActivateSpark())
2202 //
2203 // - We do not attempt to migrate threads *to* a disabled
2204 // capability (see schedulePushWork()).
2205 //
2206 // but in other respects, a disabled capability remains
2207 // alive. Threads may be woken up on a disabled capability,
2208 // but they will be immediately migrated away.
2209 //
2210 // This approach is much easier than trying to actually remove
2211 // the capability; we don't have to worry about GC data
2212 // structures, the nursery, etc.
2213 //
2214 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2215 capabilities[n]->disabled = true;
2216 traceCapDisable(capabilities[n]);
2217 }
2218 enabled_capabilities = new_n_capabilities;
2219 }
2220 else
2221 {
2222 // Increasing the number of enabled capabilities.
2223 //
2224 // enable any disabled capabilities, up to the required number
2225 for (n = enabled_capabilities;
2226 n < new_n_capabilities && n < n_capabilities; n++) {
2227 capabilities[n]->disabled = false;
2228 traceCapEnable(capabilities[n]);
2229 }
2230 enabled_capabilities = n;
2231
2232 if (new_n_capabilities > n_capabilities) {
2233 #if defined(TRACING)
2234 // Allocate eventlog buffers for the new capabilities. Note this
2235 // must be done before calling moreCapabilities(), because that
2236 // will emit events about creating the new capabilities and adding
2237 // them to existing capsets.
2238 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2239 #endif
2240
2241 // Resize the capabilities array
2242 // NB. after this, capabilities points somewhere new. Any pointers
2243 // of type (Capability *) are now invalid.
2244 moreCapabilities(n_capabilities, new_n_capabilities);
2245
2246 // Resize and update storage manager data structures
2247 storageAddCapabilities(n_capabilities, new_n_capabilities);
2248 }
2249 }
2250
2251 // update n_capabilities before things start running
2252 if (new_n_capabilities > n_capabilities) {
2253 n_capabilities = enabled_capabilities = new_n_capabilities;
2254 }
2255
2256 // We're done: release the original Capabilities
2257 releaseAllCapabilities(old_n_capabilities, cap,task);
2258
2259 // We can't free the old array until now, because we access it
2260 // while updating pointers in updateCapabilityRefs().
2261 if (old_capabilities) {
2262 stgFree(old_capabilities);
2263 }
2264
2265 // Notify IO manager that the number of capabilities has changed.
2266 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2267
2268 rts_unlock(cap);
2269
2270 #endif // THREADED_RTS
2271 }
2272
2273
2274
2275 /* ---------------------------------------------------------------------------
2276 * Delete all the threads in the system
2277 * ------------------------------------------------------------------------- */
2278
2279 static void
2280 deleteAllThreads ( Capability *cap )
2281 {
2282 // NOTE: only safe to call if we own all capabilities.
2283
2284 StgTSO* t, *next;
2285 uint32_t g;
2286
2287 debugTrace(DEBUG_sched,"deleting all threads");
2288 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2289 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2290 next = t->global_link;
2291 deleteThread(cap,t);
2292 }
2293 }
2294
2295 // The run queue now contains a bunch of ThreadKilled threads. We
2296 // must not throw these away: the main thread(s) will be in there
2297 // somewhere, and the main scheduler loop has to deal with it.
2298 // Also, the run queue is the only thing keeping these threads from
2299 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2300
2301 #if !defined(THREADED_RTS)
2302 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2303 ASSERT(sleeping_queue == END_TSO_QUEUE);
2304 #endif
2305 }
2306
2307 /* -----------------------------------------------------------------------------
2308 Managing the suspended_ccalls list.
2309 Locks required: sched_mutex
2310 -------------------------------------------------------------------------- */
2311
2312 STATIC_INLINE void
2313 suspendTask (Capability *cap, Task *task)
2314 {
2315 InCall *incall;
2316
2317 incall = task->incall;
2318 ASSERT(incall->next == NULL && incall->prev == NULL);
2319 incall->next = cap->suspended_ccalls;
2320 incall->prev = NULL;
2321 if (cap->suspended_ccalls) {
2322 cap->suspended_ccalls->prev = incall;
2323 }
2324 cap->suspended_ccalls = incall;
2325 cap->n_suspended_ccalls++;
2326 }
2327
2328 STATIC_INLINE void
2329 recoverSuspendedTask (Capability *cap, Task *task)
2330 {
2331 InCall *incall;
2332
2333 incall = task->incall;
2334 if (incall->prev) {
2335 incall->prev->next = incall->next;
2336 } else {
2337 ASSERT(cap->suspended_ccalls == incall);
2338 cap->suspended_ccalls = incall->next;
2339 }
2340 if (incall->next) {
2341 incall->next->prev = incall->prev;
2342 }
2343 incall->next = incall->prev = NULL;
2344 cap->n_suspended_ccalls--;
2345 }
2346
2347 /* ---------------------------------------------------------------------------
2348 * Suspending & resuming Haskell threads.
2349 *
2350 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2351 * its capability before calling the C function. This allows another
2352 * task to pick up the capability and carry on running Haskell
2353 * threads. It also means that if the C call blocks, it won't lock
2354 * the whole system.
2355 *
2356 * The Haskell thread making the C call is put to sleep for the
2357 * duration of the call, on the suspended_ccalling_threads queue. We
2358 * give out a token to the task, which it can use to resume the thread
2359 * on return from the C function.
2360 *
2361 * If this is an interruptible C call, this means that the FFI call may be
2362 * unceremoniously terminated and should be scheduled on an
2363 * unbound worker thread.
2364 * ------------------------------------------------------------------------- */
2365
2366 void *
2367 suspendThread (StgRegTable *reg, bool interruptible)
2368 {
2369 Capability *cap;
2370 int saved_errno;
2371 StgTSO *tso;
2372 Task *task;
2373 #if defined(mingw32_HOST_OS)
2374 StgWord32 saved_winerror;
2375 #endif
2376
2377 saved_errno = errno;
2378 #if defined(mingw32_HOST_OS)
2379 saved_winerror = GetLastError();
2380 #endif
2381
2382 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2383 */
2384 cap = regTableToCapability(reg);
2385
2386 task = cap->running_task;
2387 tso = cap->r.rCurrentTSO;
2388
2389 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2390
2391 // XXX this might not be necessary --SDM
2392 tso->what_next = ThreadRunGHC;
2393
2394 threadPaused(cap,tso);
2395
2396 if (interruptible) {
2397 tso->why_blocked = BlockedOnCCall_Interruptible;
2398 } else {
2399 tso->why_blocked = BlockedOnCCall;
2400 }
2401
2402 // Hand back capability
2403 task->incall->suspended_tso = tso;
2404 task->incall->suspended_cap = cap;
2405
2406 // Otherwise allocate() will write to invalid memory.
2407 cap->r.rCurrentTSO = NULL;
2408
2409 ACQUIRE_LOCK(&cap->lock);
2410
2411 suspendTask(cap,task);
2412 cap->in_haskell = false;
2413 releaseCapability_(cap,false);
2414
2415 RELEASE_LOCK(&cap->lock);
2416
2417 errno = saved_errno;
2418 #if defined(mingw32_HOST_OS)
2419 SetLastError(saved_winerror);
2420 #endif
2421 return task;
2422 }
2423
2424 StgRegTable *
2425 resumeThread (void *task_)
2426 {
2427 StgTSO *tso;
2428 InCall *incall;
2429 Capability *cap;
2430 Task *task = task_;
2431 int saved_errno;
2432 #if defined(mingw32_HOST_OS)
2433 StgWord32 saved_winerror;
2434 #endif
2435
2436 saved_errno = errno;
2437 #if defined(mingw32_HOST_OS)
2438 saved_winerror = GetLastError();
2439 #endif
2440
2441 incall = task->incall;
2442 cap = incall->suspended_cap;
2443 task->cap = cap;
2444
2445 // Wait for permission to re-enter the RTS with the result.
2446 waitForCapability(&cap,task);
2447 // we might be on a different capability now... but if so, our
2448 // entry on the suspended_ccalls list will also have been
2449 // migrated.
2450
2451 // Remove the thread from the suspended list
2452 recoverSuspendedTask(cap,task);
2453
2454 tso = incall->suspended_tso;
2455 incall->suspended_tso = NULL;
2456 incall->suspended_cap = NULL;
2457 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2458
2459 traceEventRunThread(cap, tso);
2460
2461 /* Reset blocking status */
2462 tso->why_blocked = NotBlocked;
2463
2464 if ((tso->flags & TSO_BLOCKEX) == 0) {
2465 // avoid locking the TSO if we don't have to
2466 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2467 maybePerformBlockedException(cap,tso);
2468 }
2469 }
2470
2471 cap->r.rCurrentTSO = tso;
2472 cap->in_haskell = true;
2473 errno = saved_errno;
2474 #if defined(mingw32_HOST_OS)
2475 SetLastError(saved_winerror);
2476 #endif
2477
2478 /* We might have GC'd, mark the TSO dirty again */
2479 dirty_TSO(cap,tso);
2480 dirty_STACK(cap,tso->stackobj);
2481
2482 IF_DEBUG(sanity, checkTSO(tso));
2483
2484 return &cap->r;
2485 }
2486
2487 /* ---------------------------------------------------------------------------
2488 * scheduleThread()
2489 *
2490 * scheduleThread puts a thread on the end of the runnable queue.
2491 * This will usually be done immediately after a thread is created.
2492 * The caller of scheduleThread must create the thread using e.g.
2493 * createThread and push an appropriate closure
2494 * on this thread's stack before the scheduler is invoked.
2495 * ------------------------------------------------------------------------ */
2496
2497 void
2498 scheduleThread(Capability *cap, StgTSO *tso)
2499 {
2500 // The thread goes at the *end* of the run-queue, to avoid possible
2501 // starvation of any threads already on the queue.
2502 appendToRunQueue(cap,tso);
2503 }
2504
2505 void
2506 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2507 {
2508 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2509 // move this thread from now on.
2510 #if defined(THREADED_RTS)
2511 cpu %= enabled_capabilities;
2512 if (cpu == cap->no) {
2513 appendToRunQueue(cap,tso);
2514 } else {
2515 migrateThread(cap, tso, capabilities[cpu]);
2516 }
2517 #else
2518 appendToRunQueue(cap,tso);
2519 #endif
2520 }
2521
2522 void
2523 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2524 {
2525 Task *task;
2526 DEBUG_ONLY( StgThreadID id );
2527 Capability *cap;
2528
2529 cap = *pcap;
2530
2531 // We already created/initialised the Task
2532 task = cap->running_task;
2533
2534 // This TSO is now a bound thread; make the Task and TSO
2535 // point to each other.
2536 tso->bound = task->incall;
2537 tso->cap = cap;
2538
2539 task->incall->tso = tso;
2540 task->incall->ret = ret;
2541 task->incall->rstat = NoStatus;
2542
2543 appendToRunQueue(cap,tso);
2544
2545 DEBUG_ONLY( id = tso->id );
2546 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2547
2548 cap = schedule(cap,task);
2549
2550 ASSERT(task->incall->rstat != NoStatus);
2551 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2552
2553 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2554 *pcap = cap;
2555 }
2556
2557 /* ----------------------------------------------------------------------------
2558 * Starting Tasks
2559 * ------------------------------------------------------------------------- */
2560
2561 #if defined(THREADED_RTS)
2562 void scheduleWorker (Capability *cap, Task *task)
2563 {
2564 // schedule() runs without a lock.
2565 cap = schedule(cap,task);
2566
2567 // On exit from schedule(), we have a Capability, but possibly not
2568 // the same one we started with.
2569
2570 // During shutdown, the requirement is that after all the
2571 // Capabilities are shut down, all workers that are shutting down
2572 // have finished workerTaskStop(). This is why we hold on to
2573 // cap->lock until we've finished workerTaskStop() below.
2574 //
2575 // There may be workers still involved in foreign calls; those
2576 // will just block in waitForCapability() because the
2577 // Capability has been shut down.
2578 //
2579 ACQUIRE_LOCK(&cap->lock);
2580 releaseCapability_(cap,false);
2581 workerTaskStop(task);
2582 RELEASE_LOCK(&cap->lock);
2583 }
2584 #endif
2585
2586 /* ---------------------------------------------------------------------------
2587 * Start new worker tasks on Capabilities from--to
2588 * -------------------------------------------------------------------------- */
2589
2590 static void
2591 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2592 {
2593 #if defined(THREADED_RTS)
2594 uint32_t i;
2595 Capability *cap;
2596
2597 for (i = from; i < to; i++) {
2598 cap = capabilities[i];
2599 ACQUIRE_LOCK(&cap->lock);
2600 startWorkerTask(cap);
2601 RELEASE_LOCK(&cap->lock);
2602 }
2603 #endif
2604 }
2605
2606 /* ---------------------------------------------------------------------------
2607 * initScheduler()
2608 *
2609 * Initialise the scheduler. This resets all the queues - if the
2610 * queues contained any threads, they'll be garbage collected at the
2611 * next pass.
2612 *
2613 * ------------------------------------------------------------------------ */
2614
2615 void
2616 initScheduler(void)
2617 {
2618 #if !defined(THREADED_RTS)
2619 blocked_queue_hd = END_TSO_QUEUE;
2620 blocked_queue_tl = END_TSO_QUEUE;
2621 sleeping_queue = END_TSO_QUEUE;
2622 #endif
2623
2624 sched_state = SCHED_RUNNING;
2625 recent_activity = ACTIVITY_YES;
2626
2627 #if defined(THREADED_RTS)
2628 /* Initialise the mutex and condition variables used by
2629 * the scheduler. */
2630 initMutex(&sched_mutex);
2631 #endif
2632
2633 ACQUIRE_LOCK(&sched_mutex);
2634
2635 allocated_bytes_at_heapoverflow = 0;
2636
2637 /* A capability holds the state a native thread needs in
2638 * order to execute STG code. At least one capability is
2639 * floating around (only THREADED_RTS builds have more than one).
2640 */
2641 initCapabilities();
2642
2643 initTaskManager();
2644
2645 /*
2646 * Eagerly start one worker to run each Capability, except for
2647 * Capability 0. The idea is that we're probably going to start a
2648 * bound thread on Capability 0 pretty soon, so we don't want a
2649 * worker task hogging it.
2650 */
2651 startWorkerTasks(1, n_capabilities);
2652
2653 RELEASE_LOCK(&sched_mutex);
2654
2655 }
2656
2657 void
2658 exitScheduler (bool wait_foreign USED_IF_THREADS)
2659 /* see Capability.c, shutdownCapability() */
2660 {
2661 Task *task = NULL;
2662
2663 task = newBoundTask();
2664
2665 // If we haven't killed all the threads yet, do it now.
2666 if (sched_state < SCHED_SHUTTING_DOWN) {
2667 sched_state = SCHED_INTERRUPTING;
2668 Capability *cap = task->cap;
2669 waitForCapability(&cap,task);
2670 scheduleDoGC(&cap,task,true);
2671 ASSERT(task->incall->tso == NULL);
2672 releaseCapability(cap);
2673 }
2674 sched_state = SCHED_SHUTTING_DOWN;
2675
2676 shutdownCapabilities(task, wait_foreign);
2677
2678 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2679 // n_failed_trygrab_idles, n_idle_caps);
2680
2681 boundTaskExiting(task);
2682 }
2683
2684 void
2685 freeScheduler( void )
2686 {
2687 uint32_t still_running;
2688
2689 ACQUIRE_LOCK(&sched_mutex);
2690 still_running = freeTaskManager();
2691 // We can only free the Capabilities if there are no Tasks still
2692 // running. We might have a Task about to return from a foreign
2693 // call into waitForCapability(), for example (actually,
2694 // this should be the *only* thing that a still-running Task can
2695 // do at this point, and it will block waiting for the
2696 // Capability).
2697 if (still_running == 0) {
2698 freeCapabilities();
2699 }
2700 RELEASE_LOCK(&sched_mutex);
2701 #if defined(THREADED_RTS)
2702 closeMutex(&sched_mutex);
2703 #endif
2704 }
2705
2706 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2707 void *user USED_IF_NOT_THREADS)
2708 {
2709 #if !defined(THREADED_RTS)
2710 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2711 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2712 evac(user, (StgClosure **)(void *)&sleeping_queue);
2713 #endif
2714 }
2715
2716 /* -----------------------------------------------------------------------------
2717 performGC
2718
2719 This is the interface to the garbage collector from Haskell land.
2720 We provide this so that external C code can allocate and garbage
2721 collect when called from Haskell via _ccall_GC.
2722 -------------------------------------------------------------------------- */
2723
2724 static void
2725 performGC_(bool force_major)
2726 {
2727 Task *task;
2728 Capability *cap = NULL;
2729
2730 // We must grab a new Task here, because the existing Task may be
2731 // associated with a particular Capability, and chained onto the
2732 // suspended_ccalls queue.
2733 task = newBoundTask();
2734
2735 // TODO: do we need to traceTask*() here?
2736
2737 waitForCapability(&cap,task);
2738 scheduleDoGC(&cap,task,force_major);
2739 releaseCapability(cap);
2740 boundTaskExiting(task);
2741 }
2742
2743 void
2744 performGC(void)
2745 {
2746 performGC_(false);
2747 }
2748
2749 void
2750 performMajorGC(void)
2751 {
2752 performGC_(true);
2753 }
2754
2755 /* ---------------------------------------------------------------------------
2756 Interrupt execution.
2757 Might be called inside a signal handler so it mustn't do anything fancy.
2758 ------------------------------------------------------------------------ */
2759
2760 void
2761 interruptStgRts(void)
2762 {
2763 sched_state = SCHED_INTERRUPTING;
2764 interruptAllCapabilities();
2765 #if defined(THREADED_RTS)
2766 wakeUpRts();
2767 #endif
2768 }
2769
2770 /* -----------------------------------------------------------------------------
2771 Wake up the RTS
2772
2773 This function causes at least one OS thread to wake up and run the
2774 scheduler loop. It is invoked when the RTS might be deadlocked, or
2775 an external event has arrived that may need servicing (eg. a
2776 keyboard interrupt).
2777
2778 In the single-threaded RTS we don't do anything here; we only have
2779 one thread anyway, and the event that caused us to want to wake up
2780 will have interrupted any blocking system call in progress anyway.
2781 -------------------------------------------------------------------------- */
2782
2783 #if defined(THREADED_RTS)
2784 void wakeUpRts(void)
2785 {
2786 // This forces the IO Manager thread to wakeup, which will
2787 // in turn ensure that some OS thread wakes up and runs the
2788 // scheduler loop, which will cause a GC and deadlock check.
2789 ioManagerWakeup();
2790 }
2791 #endif
2792
2793 /* -----------------------------------------------------------------------------
2794 Deleting threads
2795
2796 This is used for interruption (^C) and forking, and corresponds to
2797 raising an exception but without letting the thread catch the
2798 exception.
2799 -------------------------------------------------------------------------- */
2800
2801 static void
2802 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2803 {
2804 // NOTE: must only be called on a TSO that we have exclusive
2805 // access to, because we will call throwToSingleThreaded() below.
2806 // The TSO must be on the run queue of the Capability we own, or
2807 // we must own all Capabilities.
2808
2809 if (tso->why_blocked != BlockedOnCCall &&
2810 tso->why_blocked != BlockedOnCCall_Interruptible) {
2811 throwToSingleThreaded(tso->cap,tso,NULL);
2812 }
2813 }
2814
2815 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2816 static void
2817 deleteThread_(Capability *cap, StgTSO *tso)
2818 { // for forkProcess only:
2819 // like deleteThread(), but we delete threads in foreign calls, too.
2820
2821 if (tso->why_blocked == BlockedOnCCall ||
2822 tso->why_blocked == BlockedOnCCall_Interruptible) {
2823 tso->what_next = ThreadKilled;
2824 appendToRunQueue(tso->cap, tso);
2825 } else {
2826 deleteThread(cap,tso);
2827 }
2828 }
2829 #endif
2830
2831 /* -----------------------------------------------------------------------------
2832 raiseExceptionHelper
2833
2834 This function is called by the raise# primitive, just so that we can
2835 move some of the tricky bits of raising an exception from C-- into
2836 C. Who knows, it might be a useful re-useable thing here too.
2837 -------------------------------------------------------------------------- */
2838
2839 StgWord
2840 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2841 {
2842 Capability *cap = regTableToCapability(reg);
2843 StgThunk *raise_closure = NULL;
2844 StgPtr p, next;
2845 const StgRetInfoTable *info;
2846 //
2847 // This closure represents the expression 'raise# E' where E
2848 // is the exception raise. It is used to overwrite all the
2849 // thunks which are currently under evaluation.
2850 //
2851
2852 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2853 // LDV profiling: stg_raise_info has THUNK as its closure
2854 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2855 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2856 // 1 does not cause any problem unless profiling is performed.
2857 // However, when LDV profiling goes on, we need to linearly scan
2858 // small object pool, where raise_closure is stored, so we should
2859 // use MIN_UPD_SIZE.
2860 //
2861 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2862 // sizeofW(StgClosure)+1);
2863 //
2864
2865 //
2866 // Walk up the stack, looking for the catch frame. On the way,
2867 // we update any closures pointed to from update frames with the
2868 // raise closure that we just built.
2869 //
2870 p = tso->stackobj->sp;
2871 while(1) {
2872 info = get_ret_itbl((StgClosure *)p);
2873 next = p + stack_frame_sizeW((StgClosure *)p);
2874 switch (info->i.type) {
2875
2876 case UPDATE_FRAME:
2877 // Only create raise_closure if we need to.
2878 if (raise_closure == NULL) {
2879 raise_closure =
2880 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2881 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2882 raise_closure->payload[0] = exception;
2883 }
2884 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2885 (StgClosure *)raise_closure);
2886 p = next;
2887 continue;
2888
2889 case ATOMICALLY_FRAME:
2890 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2891 tso->stackobj->sp = p;
2892 return ATOMICALLY_FRAME;
2893
2894 case CATCH_FRAME:
2895 tso->stackobj->sp = p;
2896 return CATCH_FRAME;
2897
2898 case CATCH_STM_FRAME:
2899 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2900 tso->stackobj->sp = p;
2901 return CATCH_STM_FRAME;
2902
2903 case UNDERFLOW_FRAME:
2904 tso->stackobj->sp = p;
2905 threadStackUnderflow(cap,tso);
2906 p = tso->stackobj->sp;
2907 continue;
2908
2909 case STOP_FRAME:
2910 tso->stackobj->sp = p;
2911 return STOP_FRAME;
2912
2913 case CATCH_RETRY_FRAME: {
2914 StgTRecHeader *trec = tso -> trec;
2915 StgTRecHeader *outer = trec -> enclosing_trec;
2916 debugTrace(DEBUG_stm,
2917 "found CATCH_RETRY_FRAME at %p during raise", p);
2918 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2919 stmAbortTransaction(cap, trec);
2920 stmFreeAbortedTRec(cap, trec);
2921 tso -> trec = outer;
2922 p = next;
2923 continue;
2924 }
2925
2926 default:
2927 p = next;
2928 continue;
2929 }
2930 }
2931 }
2932
2933
2934 /* -----------------------------------------------------------------------------
2935 findRetryFrameHelper
2936
2937 This function is called by the retry# primitive. It traverses the stack
2938 leaving tso->sp referring to the frame which should handle the retry.
2939
2940 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2941 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2942
2943 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2944 create) because retries are not considered to be exceptions, despite the
2945 similar implementation.
2946
2947 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2948 not be created within memory transactions.
2949 -------------------------------------------------------------------------- */
2950
2951 StgWord
2952 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2953 {
2954 const StgRetInfoTable *info;
2955 StgPtr p, next;
2956
2957 p = tso->stackobj->sp;
2958 while (1) {
2959 info = get_ret_itbl((const StgClosure *)p);
2960 next = p + stack_frame_sizeW((StgClosure *)p);
2961 switch (info->i.type) {
2962
2963 case ATOMICALLY_FRAME:
2964 debugTrace(DEBUG_stm,
2965 "found ATOMICALLY_FRAME at %p during retry", p);
2966 tso->stackobj->sp = p;
2967 return ATOMICALLY_FRAME;
2968
2969 case CATCH_RETRY_FRAME:
2970 debugTrace(DEBUG_stm,
2971 "found CATCH_RETRY_FRAME at %p during retry", p);
2972 tso->stackobj->sp = p;
2973 return CATCH_RETRY_FRAME;
2974
2975 case CATCH_STM_FRAME: {
2976 StgTRecHeader *trec = tso -> trec;
2977 StgTRecHeader *outer = trec -> enclosing_trec;
2978 debugTrace(DEBUG_stm,
2979 "found CATCH_STM_FRAME at %p during retry", p);
2980 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2981 stmAbortTransaction(cap, trec);
2982 stmFreeAbortedTRec(cap, trec);
2983 tso -> trec = outer;
2984 p = next;
2985 continue;
2986 }
2987
2988 case UNDERFLOW_FRAME:
2989 tso->stackobj->sp = p;
2990 threadStackUnderflow(cap,tso);
2991 p = tso->stackobj->sp;
2992 continue;
2993
2994 default:
2995 ASSERT(info->i.type != CATCH_FRAME);
2996 ASSERT(info->i.type != STOP_FRAME);
2997 p = next;
2998 continue;
2999 }
3000 }
3001 }
3002
3003 /* -----------------------------------------------------------------------------
3004 resurrectThreads is called after garbage collection on the list of
3005 threads found to be garbage. Each of these threads will be woken
3006 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3007 on an MVar, or NonTermination if the thread was blocked on a Black
3008 Hole.
3009
3010 Locks: assumes we hold *all* the capabilities.
3011 -------------------------------------------------------------------------- */
3012
3013 void
3014 resurrectThreads (StgTSO *threads)
3015 {
3016 StgTSO *tso, *next;
3017 Capability *cap;
3018 generation *gen;
3019
3020 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3021 next = tso->global_link;
3022
3023 gen = Bdescr((P_)tso)->gen;
3024 tso->global_link = gen->threads;
3025 gen->threads = tso;
3026
3027 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3028
3029 // Wake up the thread on the Capability it was last on
3030 cap = tso->cap;
3031
3032 switch (tso->why_blocked) {
3033 case BlockedOnMVar:
3034 case BlockedOnMVarRead:
3035 /* Called by GC - sched_mutex lock is currently held. */
3036 throwToSingleThreaded(cap, tso,
3037 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3038 break;
3039 case BlockedOnBlackHole:
3040 throwToSingleThreaded(cap, tso,
3041 (StgClosure *)nonTermination_closure);
3042 break;
3043 case BlockedOnSTM:
3044 throwToSingleThreaded(cap, tso,
3045 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3046 break;
3047 case NotBlocked:
3048 /* This might happen if the thread was blocked on a black hole
3049 * belonging to a thread that we've just woken up (raiseAsync
3050 * can wake up threads, remember...).
3051 */
3052 continue;
3053 case BlockedOnMsgThrowTo:
3054 // This can happen if the target is masking, blocks on a
3055 // black hole, and then is found to be unreachable. In
3056 // this case, we want to let the target wake up and carry
3057 // on, and do nothing to this thread.
3058 continue;
3059 default:
3060 barf("resurrectThreads: thread blocked in a strange way: %d",
3061 tso->why_blocked);
3062 }
3063 }
3064 }