Tag the FUN before making a PAP (#13767)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %d", sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #if defined(THREADED_RTS)
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #if defined(mingw32_HOST_OS)
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #if !defined(PROFILING)
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #if defined(mingw32_HOST_OS)
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 didGcLast = yieldCapability(&cap,task, !didGcLast);
683 }
684 while (shouldYieldCapability(cap,task,didGcLast));
685
686 // note there may still be no threads on the run queue at this
687 // point, the caller has to check.
688
689 *pcap = cap;
690 return;
691 }
692 #endif
693
694 /* -----------------------------------------------------------------------------
695 * schedulePushWork()
696 *
697 * Push work to other Capabilities if we have some.
698 * -------------------------------------------------------------------------- */
699
700 static void
701 schedulePushWork(Capability *cap USED_IF_THREADS,
702 Task *task USED_IF_THREADS)
703 {
704 /* following code not for PARALLEL_HASKELL. I kept the call general,
705 future GUM versions might use pushing in a distributed setup */
706 #if defined(THREADED_RTS)
707
708 Capability *free_caps[n_capabilities], *cap0;
709 uint32_t i, n_wanted_caps, n_free_caps;
710
711 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
712
713 // migration can be turned off with +RTS -qm
714 if (!RtsFlags.ParFlags.migrate) {
715 spare_threads = 0;
716 }
717
718 // Figure out how many capabilities we want to wake up. We need at least
719 // sparkPoolSize(cap) plus the number of spare threads we have.
720 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
721 if (n_wanted_caps == 0) return;
722
723 // First grab as many free Capabilities as we can. ToDo: we should use
724 // capabilities on the same NUMA node preferably, but not exclusively.
725 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
726 n_free_caps < n_wanted_caps && i != cap->no;
727 i = (i + 1) % n_capabilities) {
728 cap0 = capabilities[i];
729 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
730 if (!emptyRunQueue(cap0)
731 || cap0->n_returning_tasks != 0
732 || !emptyInbox(cap0)) {
733 // it already has some work, we just grabbed it at
734 // the wrong moment. Or maybe it's deadlocked!
735 releaseCapability(cap0);
736 } else {
737 free_caps[n_free_caps++] = cap0;
738 }
739 }
740 }
741
742 // We now have n_free_caps free capabilities stashed in
743 // free_caps[]. Attempt to share our run queue equally with them.
744 // This is complicated slightly by the fact that we can't move
745 // some threads:
746 //
747 // - threads that have TSO_LOCKED cannot migrate
748 // - a thread that is bound to the current Task cannot be migrated
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761
762 debugTrace(DEBUG_sched,
763 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
764 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
765 n_free_caps);
766
767 // There are n_free_caps+1 caps in total. We will share the threads
768 // evently between them, *except* that if the run queue does not divide
769 // evenly by n_free_caps+1 then we bias towards the current capability.
770 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
771 uint32_t keep_threads =
772 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
773
774 // This also ensures that we don't give away all our threads, since
775 // (x + y) / (y + 1) >= 1 when x >= 1.
776
777 // The number of threads we have left.
778 uint32_t n = cap->n_run_queue;
779
780 // prev = the previous thread on this cap's run queue
781 prev = END_TSO_QUEUE;
782
783 // We're going to walk through the run queue, migrating threads to other
784 // capabilities until we have only keep_threads left. We might
785 // encounter a thread that cannot be migrated, in which case we add it
786 // to the current run queue and decrement keep_threads.
787 for (t = cap->run_queue_hd, i = 0;
788 t != END_TSO_QUEUE && n > keep_threads;
789 t = next)
790 {
791 next = t->_link;
792 t->_link = END_TSO_QUEUE;
793
794 // Should we keep this thread?
795 if (t->bound == task->incall // don't move my bound thread
796 || tsoLocked(t) // don't move a locked thread
797 ) {
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 if (keep_threads > 0) keep_threads--;
806 }
807
808 // Or migrate it?
809 else {
810 appendToRunQueue(free_caps[i],t);
811 traceEventMigrateThread (cap, t, free_caps[i]->no);
812
813 if (t->bound) { t->bound->task->cap = free_caps[i]; }
814 t->cap = free_caps[i];
815 n--; // we have one fewer threads now
816 i++; // move on to the next free_cap
817 if (i == n_free_caps) i = 0;
818 }
819 }
820
821 // Join up the beginning of the queue (prev)
822 // with the rest of the queue (t)
823 if (t == END_TSO_QUEUE) {
824 cap->run_queue_tl = prev;
825 } else {
826 setTSOPrev(cap, t, prev);
827 }
828 if (prev == END_TSO_QUEUE) {
829 cap->run_queue_hd = t;
830 } else {
831 setTSOLink(cap, prev, t);
832 }
833 cap->n_run_queue = n;
834
835 IF_DEBUG(sanity, checkRunQueue(cap));
836
837 // release the capabilities
838 for (i = 0; i < n_free_caps; i++) {
839 task->cap = free_caps[i];
840 if (sparkPoolSizeCap(cap) > 0) {
841 // If we have sparks to steal, wake up a worker on the
842 // capability, even if it has no threads to run.
843 releaseAndWakeupCapability(free_caps[i]);
844 } else {
845 releaseCapability(free_caps[i]);
846 }
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850
851 #endif /* THREADED_RTS */
852
853 }
854
855 /* ----------------------------------------------------------------------------
856 * Start any pending signal handlers
857 * ------------------------------------------------------------------------- */
858
859 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
860 static void
861 scheduleStartSignalHandlers(Capability *cap)
862 {
863 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
864 // safe outside the lock
865 startSignalHandlers(cap);
866 }
867 }
868 #else
869 static void
870 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
871 {
872 }
873 #endif
874
875 /* ----------------------------------------------------------------------------
876 * Check for blocked threads that can be woken up.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
881 {
882 #if !defined(THREADED_RTS)
883 //
884 // Check whether any waiting threads need to be woken up. If the
885 // run queue is empty, and there are no other tasks running, we
886 // can wait indefinitely for something to happen.
887 //
888 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
889 {
890 awaitEvent (emptyRunQueue(cap));
891 }
892 #endif
893 }
894
895 /* ----------------------------------------------------------------------------
896 * Detect deadlock conditions and attempt to resolve them.
897 * ------------------------------------------------------------------------- */
898
899 static void
900 scheduleDetectDeadlock (Capability **pcap, Task *task)
901 {
902 Capability *cap = *pcap;
903 /*
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
907 * some description.
908 */
909 if ( emptyThreadQueues(cap) )
910 {
911 #if defined(THREADED_RTS)
912 /*
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
917 */
918 if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
922
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
927 // runnable.
928 scheduleDoGC (pcap, task, true/*force major GC*/);
929 cap = *pcap;
930 // when force_major == true. scheduleDoGC sets
931 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 // signal.
933
934 if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
937 /* If we have user-installed signal handlers, then wait
938 * for signals to arrive rather then bombing out with a
939 * deadlock.
940 */
941 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
942 debugTrace(DEBUG_sched,
943 "still deadlocked, waiting for signals...");
944
945 awaitUserSignals();
946
947 if (signals_pending()) {
948 startSignalHandlers(cap);
949 }
950
951 // either we have threads to run, or we were interrupted:
952 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
953
954 return;
955 }
956 #endif
957
958 #if !defined(THREADED_RTS)
959 /* Probably a real deadlock. Send the current main thread the
960 * Deadlock exception.
961 */
962 if (task->incall->tso) {
963 switch (task->incall->tso->why_blocked) {
964 case BlockedOnSTM:
965 case BlockedOnBlackHole:
966 case BlockedOnMsgThrowTo:
967 case BlockedOnMVar:
968 case BlockedOnMVarRead:
969 throwToSingleThreaded(cap, task->incall->tso,
970 (StgClosure *)nonTermination_closure);
971 return;
972 default:
973 barf("deadlock: main thread blocked in a strange way");
974 }
975 }
976 return;
977 #endif
978 }
979 }
980
981
982 /* ----------------------------------------------------------------------------
983 * Process message in the current Capability's inbox
984 * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
988 {
989 #if defined(THREADED_RTS)
990 Message *m, *next;
991 PutMVar *p, *pnext;
992 int r;
993 Capability *cap = *pcap;
994
995 while (!emptyInbox(cap)) {
996 if (cap->r.rCurrentNursery->link == NULL ||
997 g0->n_new_large_words >= large_alloc_lim) {
998 scheduleDoGC(pcap, cap->running_task, false);
999 cap = *pcap;
1000 }
1001
1002 // don't use a blocking acquire; if the lock is held by
1003 // another thread then just carry on. This seems to avoid
1004 // getting stuck in a message ping-pong situation with other
1005 // processors. We'll check the inbox again later anyway.
1006 //
1007 // We should really use a more efficient queue data structure
1008 // here. The trickiness is that we must ensure a Capability
1009 // never goes idle if the inbox is non-empty, which is why we
1010 // use cap->lock (cap->lock is released as the last thing
1011 // before going idle; see Capability.c:releaseCapability()).
1012 r = TRY_ACQUIRE_LOCK(&cap->lock);
1013 if (r != 0) return;
1014
1015 m = cap->inbox;
1016 p = cap->putMVars;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018 cap->putMVars = NULL;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027
1028 while (p != NULL) {
1029 pnext = p->link;
1030 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1031 Unit_closure);
1032 freeStablePtr(p->mvar);
1033 stgFree(p);
1034 p = pnext;
1035 }
1036 }
1037 #endif
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042 * Activate spark threads (THREADED_RTS)
1043 * ------------------------------------------------------------------------- */
1044
1045 #if defined(THREADED_RTS)
1046 static void
1047 scheduleActivateSpark(Capability *cap)
1048 {
1049 if (anySparks() && !cap->disabled)
1050 {
1051 createSparkThread(cap);
1052 debugTrace(DEBUG_sched, "creating a spark thread");
1053 }
1054 }
1055 #endif // THREADED_RTS
1056
1057 /* ----------------------------------------------------------------------------
1058 * After running a thread...
1059 * ------------------------------------------------------------------------- */
1060
1061 static void
1062 schedulePostRunThread (Capability *cap, StgTSO *t)
1063 {
1064 // We have to be able to catch transactions that are in an
1065 // infinite loop as a result of seeing an inconsistent view of
1066 // memory, e.g.
1067 //
1068 // atomically $ do
1069 // [a,b] <- mapM readTVar [ta,tb]
1070 // when (a == b) loop
1071 //
1072 // and a is never equal to b given a consistent view of memory.
1073 //
1074 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1075 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1076 debugTrace(DEBUG_sched | DEBUG_stm,
1077 "trec %p found wasting its time", t);
1078
1079 // strip the stack back to the
1080 // ATOMICALLY_FRAME, aborting the (nested)
1081 // transaction, and saving the stack of any
1082 // partially-evaluated thunks on the heap.
1083 throwToSingleThreaded_(cap, t, NULL, true);
1084
1085 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 }
1087 }
1088
1089 //
1090 // If the current thread's allocation limit has run out, send it
1091 // the AllocationLimitExceeded exception.
1092
1093 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1094 // Use a throwToSelf rather than a throwToSingleThreaded, because
1095 // it correctly handles the case where the thread is currently
1096 // inside mask. Also the thread might be blocked (e.g. on an
1097 // MVar), and throwToSingleThreaded doesn't unblock it
1098 // correctly in that case.
1099 throwToSelf(cap, t, allocationLimitExceeded_closure);
1100 ASSIGN_Int64((W_*)&(t->alloc_limit),
1101 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1102 }
1103
1104 /* some statistics gathering in the parallel case */
1105 }
1106
1107 /* -----------------------------------------------------------------------------
1108 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1109 * -------------------------------------------------------------------------- */
1110
1111 static bool
1112 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1113 {
1114 if (cap->r.rHpLim == NULL || cap->context_switch) {
1115 // Sometimes we miss a context switch, e.g. when calling
1116 // primitives in a tight loop, MAYBE_GC() doesn't check the
1117 // context switch flag, and we end up waiting for a GC.
1118 // See #1984, and concurrent/should_run/1984
1119 cap->context_switch = 0;
1120 appendToRunQueue(cap,t);
1121 } else {
1122 pushOnRunQueue(cap,t);
1123 }
1124
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1127 // if so, get one and push it on the front of the nursery.
1128 bdescr *bd;
1129 W_ blocks;
1130
1131 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1132
1133 if (blocks > BLOCKS_PER_MBLOCK) {
1134 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1135 }
1136
1137 debugTrace(DEBUG_sched,
1138 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1139 (long)t->id, what_next_strs[t->what_next], blocks);
1140
1141 // don't do this if the nursery is (nearly) full, we'll GC first.
1142 if (cap->r.rCurrentNursery->link != NULL ||
1143 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1144 // infinite loop if the
1145 // nursery has only one
1146 // block.
1147
1148 bd = allocGroupOnNode_lock(cap->node,blocks);
1149 cap->r.rNursery->n_blocks += blocks;
1150
1151 // link the new group after CurrentNursery
1152 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1153
1154 // initialise it as a nursery block. We initialise the
1155 // step, gen_no, and flags field of *every* sub-block in
1156 // this large block, because this is easier than making
1157 // sure that we always find the block head of a large
1158 // block whenever we call Bdescr() (eg. evacuate() and
1159 // isAlive() in the GC would both have to do this, at
1160 // least).
1161 {
1162 bdescr *x;
1163 for (x = bd; x < bd + blocks; x++) {
1164 initBdescr(x,g0,g0);
1165 x->free = x->start;
1166 x->flags = 0;
1167 }
1168 }
1169
1170 // This assert can be a killer if the app is doing lots
1171 // of large block allocations.
1172 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1173
1174 // now update the nursery to point to the new block
1175 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1176 cap->r.rCurrentNursery = bd;
1177
1178 // we might be unlucky and have another thread get on the
1179 // run queue before us and steal the large block, but in that
1180 // case the thread will just end up requesting another large
1181 // block.
1182 return false; /* not actually GC'ing */
1183 }
1184 }
1185
1186 // if we got here because we exceeded large_alloc_lim, then
1187 // proceed straight to GC.
1188 if (g0->n_new_large_words >= large_alloc_lim) {
1189 return true;
1190 }
1191
1192 // Otherwise, we just ran out of space in the current nursery.
1193 // Grab another nursery if we can.
1194 if (getNewNursery(cap)) {
1195 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1196 return false;
1197 }
1198
1199 return true;
1200 /* actual GC is done at the end of the while loop in schedule() */
1201 }
1202
1203 /* -----------------------------------------------------------------------------
1204 * Handle a thread that returned to the scheduler with ThreadYielding
1205 * -------------------------------------------------------------------------- */
1206
1207 static bool
1208 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1209 {
1210 /* put the thread back on the run queue. Then, if we're ready to
1211 * GC, check whether this is the last task to stop. If so, wake
1212 * up the GC thread. getThread will block during a GC until the
1213 * GC is finished.
1214 */
1215
1216 ASSERT(t->_link == END_TSO_QUEUE);
1217
1218 // Shortcut if we're just switching evaluators: just run the thread. See
1219 // Note [avoiding threadPaused] in Interpreter.c.
1220 if (t->what_next != prev_what_next) {
1221 debugTrace(DEBUG_sched,
1222 "--<< thread %ld (%s) stopped to switch evaluators",
1223 (long)t->id, what_next_strs[t->what_next]);
1224 return true;
1225 }
1226
1227 // Reset the context switch flag. We don't do this just before
1228 // running the thread, because that would mean we would lose ticks
1229 // during GC, which can lead to unfair scheduling (a thread hogs
1230 // the CPU because the tick always arrives during GC). This way
1231 // penalises threads that do a lot of allocation, but that seems
1232 // better than the alternative.
1233 if (cap->context_switch != 0) {
1234 cap->context_switch = 0;
1235 appendToRunQueue(cap,t);
1236 } else {
1237 pushOnRunQueue(cap,t);
1238 }
1239
1240 IF_DEBUG(sanity,
1241 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1242 checkTSO(t));
1243
1244 return false;
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248 * Handle a thread that returned to the scheduler with ThreadBlocked
1249 * -------------------------------------------------------------------------- */
1250
1251 static void
1252 scheduleHandleThreadBlocked( StgTSO *t
1253 #if !defined(DEBUG)
1254 STG_UNUSED
1255 #endif
1256 )
1257 {
1258
1259 // We don't need to do anything. The thread is blocked, and it
1260 // has tidied up its stack and placed itself on whatever queue
1261 // it needs to be on.
1262
1263 // ASSERT(t->why_blocked != NotBlocked);
1264 // Not true: for example,
1265 // - the thread may have woken itself up already, because
1266 // threadPaused() might have raised a blocked throwTo
1267 // exception, see maybePerformBlockedException().
1268
1269 #if defined(DEBUG)
1270 traceThreadStatus(DEBUG_sched, t);
1271 #endif
1272 }
1273
1274 /* -----------------------------------------------------------------------------
1275 * Handle a thread that returned to the scheduler with ThreadFinished
1276 * -------------------------------------------------------------------------- */
1277
1278 static bool
1279 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1280 {
1281 /* Need to check whether this was a main thread, and if so,
1282 * return with the return value.
1283 *
1284 * We also end up here if the thread kills itself with an
1285 * uncaught exception, see Exception.cmm.
1286 */
1287
1288 // blocked exceptions can now complete, even if the thread was in
1289 // blocked mode (see #2910).
1290 awakenBlockedExceptionQueue (cap, t);
1291
1292 //
1293 // Check whether the thread that just completed was a bound
1294 // thread, and if so return with the result.
1295 //
1296 // There is an assumption here that all thread completion goes
1297 // through this point; we need to make sure that if a thread
1298 // ends up in the ThreadKilled state, that it stays on the run
1299 // queue so it can be dealt with here.
1300 //
1301
1302 if (t->bound) {
1303
1304 if (t->bound != task->incall) {
1305 #if !defined(THREADED_RTS)
1306 // Must be a bound thread that is not the topmost one. Leave
1307 // it on the run queue until the stack has unwound to the
1308 // point where we can deal with this. Leaving it on the run
1309 // queue also ensures that the garbage collector knows about
1310 // this thread and its return value (it gets dropped from the
1311 // step->threads list so there's no other way to find it).
1312 appendToRunQueue(cap,t);
1313 return false;
1314 #else
1315 // this cannot happen in the threaded RTS, because a
1316 // bound thread can only be run by the appropriate Task.
1317 barf("finished bound thread that isn't mine");
1318 #endif
1319 }
1320
1321 ASSERT(task->incall->tso == t);
1322
1323 if (t->what_next == ThreadComplete) {
1324 if (task->incall->ret) {
1325 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1326 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1327 }
1328 task->incall->rstat = Success;
1329 } else {
1330 if (task->incall->ret) {
1331 *(task->incall->ret) = NULL;
1332 }
1333 if (sched_state >= SCHED_INTERRUPTING) {
1334 if (heap_overflow) {
1335 task->incall->rstat = HeapExhausted;
1336 } else {
1337 task->incall->rstat = Interrupted;
1338 }
1339 } else {
1340 task->incall->rstat = Killed;
1341 }
1342 }
1343 #if defined(DEBUG)
1344 removeThreadLabel((StgWord)task->incall->tso->id);
1345 #endif
1346
1347 // We no longer consider this thread and task to be bound to
1348 // each other. The TSO lives on until it is GC'd, but the
1349 // task is about to be released by the caller, and we don't
1350 // want anyone following the pointer from the TSO to the
1351 // defunct task (which might have already been
1352 // re-used). This was a real bug: the GC updated
1353 // tso->bound->tso which lead to a deadlock.
1354 t->bound = NULL;
1355 task->incall->tso = NULL;
1356
1357 return true; // tells schedule() to return
1358 }
1359
1360 return false;
1361 }
1362
1363 /* -----------------------------------------------------------------------------
1364 * Perform a heap census
1365 * -------------------------------------------------------------------------- */
1366
1367 static bool
1368 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1369 {
1370 // When we have +RTS -i0 and we're heap profiling, do a census at
1371 // every GC. This lets us get repeatable runs for debugging.
1372 if (performHeapProfile ||
1373 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1374 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1375 return true;
1376 } else {
1377 return false;
1378 }
1379 }
1380
1381 /* -----------------------------------------------------------------------------
1382 * stopAllCapabilities()
1383 *
1384 * Stop all Haskell execution. This is used when we need to make some global
1385 * change to the system, such as altering the number of capabilities, or
1386 * forking.
1387 *
1388 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1389 * -------------------------------------------------------------------------- */
1390
1391 #if defined(THREADED_RTS)
1392 static void stopAllCapabilities (Capability **pCap, Task *task)
1393 {
1394 bool was_syncing;
1395 SyncType prev_sync_type;
1396
1397 PendingSync sync = {
1398 .type = SYNC_OTHER,
1399 .idle = NULL,
1400 .task = task
1401 };
1402
1403 do {
1404 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1405 } while (was_syncing);
1406
1407 acquireAllCapabilities(*pCap,task);
1408
1409 pending_sync = 0;
1410 }
1411 #endif
1412
1413 /* -----------------------------------------------------------------------------
1414 * requestSync()
1415 *
1416 * Commence a synchronisation between all capabilities. Normally not called
1417 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1418 * has some special synchronisation requirements.
1419 *
1420 * Returns:
1421 * false if we successfully got a sync
1422 * true if there was another sync request in progress,
1423 * and we yielded to it. The value returned is the
1424 * type of the other sync request.
1425 * -------------------------------------------------------------------------- */
1426
1427 #if defined(THREADED_RTS)
1428 static bool requestSync (
1429 Capability **pcap, Task *task, PendingSync *new_sync,
1430 SyncType *prev_sync_type)
1431 {
1432 PendingSync *sync;
1433
1434 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1435 (StgWord)NULL,
1436 (StgWord)new_sync);
1437
1438 if (sync != NULL)
1439 {
1440 // sync is valid until we have called yieldCapability().
1441 // After the sync is completed, we cannot read that struct any
1442 // more because it has been freed.
1443 *prev_sync_type = sync->type;
1444 do {
1445 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1446 sync->type);
1447 ASSERT(*pcap);
1448 yieldCapability(pcap,task,true);
1449 sync = pending_sync;
1450 } while (sync != NULL);
1451
1452 // NOTE: task->cap might have changed now
1453 return true;
1454 }
1455 else
1456 {
1457 return false;
1458 }
1459 }
1460 #endif
1461
1462 /* -----------------------------------------------------------------------------
1463 * acquireAllCapabilities()
1464 *
1465 * Grab all the capabilities except the one we already hold. Used
1466 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1467 * before a fork (SYNC_OTHER).
1468 *
1469 * Only call this after requestSync(), otherwise a deadlock might
1470 * ensue if another thread is trying to synchronise.
1471 * -------------------------------------------------------------------------- */
1472
1473 #if defined(THREADED_RTS)
1474 static void acquireAllCapabilities(Capability *cap, Task *task)
1475 {
1476 Capability *tmpcap;
1477 uint32_t i;
1478
1479 ASSERT(pending_sync != NULL);
1480 for (i=0; i < n_capabilities; i++) {
1481 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1482 i, n_capabilities);
1483 tmpcap = capabilities[i];
1484 if (tmpcap != cap) {
1485 // we better hope this task doesn't get migrated to
1486 // another Capability while we're waiting for this one.
1487 // It won't, because load balancing happens while we have
1488 // all the Capabilities, but even so it's a slightly
1489 // unsavoury invariant.
1490 task->cap = tmpcap;
1491 waitForCapability(&tmpcap, task);
1492 if (tmpcap->no != i) {
1493 barf("acquireAllCapabilities: got the wrong capability");
1494 }
1495 }
1496 }
1497 task->cap = cap;
1498 }
1499 #endif
1500
1501 /* -----------------------------------------------------------------------------
1502 * releaseAllcapabilities()
1503 *
1504 * Assuming this thread holds all the capabilities, release them all except for
1505 * the one passed in as cap.
1506 * -------------------------------------------------------------------------- */
1507
1508 #if defined(THREADED_RTS)
1509 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1510 {
1511 uint32_t i;
1512
1513 for (i = 0; i < n; i++) {
1514 if (cap->no != i) {
1515 task->cap = capabilities[i];
1516 releaseCapability(capabilities[i]);
1517 }
1518 }
1519 task->cap = cap;
1520 }
1521 #endif
1522
1523 /* -----------------------------------------------------------------------------
1524 * Perform a garbage collection if necessary
1525 * -------------------------------------------------------------------------- */
1526
1527 static void
1528 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1529 bool force_major)
1530 {
1531 Capability *cap = *pcap;
1532 bool heap_census;
1533 uint32_t collect_gen;
1534 bool major_gc;
1535 #if defined(THREADED_RTS)
1536 uint32_t gc_type;
1537 uint32_t i;
1538 uint32_t need_idle;
1539 uint32_t n_gc_threads;
1540 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1541 StgTSO *tso;
1542 bool *idle_cap;
1543 // idle_cap is an array (allocated later) of size n_capabilities, where
1544 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1545 // cycle.
1546 #endif
1547
1548 if (sched_state == SCHED_SHUTTING_DOWN) {
1549 // The final GC has already been done, and the system is
1550 // shutting down. We'll probably deadlock if we try to GC
1551 // now.
1552 return;
1553 }
1554
1555 heap_census = scheduleNeedHeapProfile(true);
1556
1557 // Figure out which generation we are collecting, so that we can
1558 // decide whether this is a parallel GC or not.
1559 collect_gen = calcNeeded(force_major || heap_census, NULL);
1560 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1561
1562 #if defined(THREADED_RTS)
1563 if (sched_state < SCHED_INTERRUPTING
1564 && RtsFlags.ParFlags.parGcEnabled
1565 && collect_gen >= RtsFlags.ParFlags.parGcGen
1566 && ! oldest_gen->mark)
1567 {
1568 gc_type = SYNC_GC_PAR;
1569 } else {
1570 gc_type = SYNC_GC_SEQ;
1571 }
1572
1573 // In order to GC, there must be no threads running Haskell code.
1574 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1575 // capabilities, and release them after the GC has completed. For parallel
1576 // GC, we synchronise all the running threads using requestSync().
1577 //
1578 // Other capabilities are prevented from running yet more Haskell threads if
1579 // pending_sync is set. Tested inside yieldCapability() and
1580 // releaseCapability() in Capability.c
1581
1582 PendingSync sync = {
1583 .type = gc_type,
1584 .idle = NULL,
1585 .task = task
1586 };
1587
1588 {
1589 SyncType prev_sync = 0;
1590 bool was_syncing;
1591 do {
1592 // If -qn is not set and we have more capabilities than cores, set
1593 // the number of GC threads to #cores. We do this here rather than
1594 // in normaliseRtsOpts() because here it will work if the program
1595 // calls setNumCapabilities.
1596 //
1597 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1598 if (n_gc_threads == 0 &&
1599 enabled_capabilities > getNumberOfProcessors()) {
1600 n_gc_threads = getNumberOfProcessors();
1601 }
1602
1603 // This calculation must be inside the loop because
1604 // enabled_capabilities may change if requestSync() below fails and
1605 // we retry.
1606 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1607 if (n_gc_threads >= enabled_capabilities) {
1608 need_idle = 0;
1609 } else {
1610 need_idle = enabled_capabilities - n_gc_threads;
1611 }
1612 } else {
1613 need_idle = 0;
1614 }
1615
1616 // We need an array of size n_capabilities, but since this may
1617 // change each time around the loop we must allocate it afresh.
1618 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1619 sizeof(bool),
1620 "scheduleDoGC");
1621 sync.idle = idle_cap;
1622
1623 // When using +RTS -qn, we need some capabilities to be idle during
1624 // GC. The best bet is to choose some inactive ones, so we look for
1625 // those first:
1626 uint32_t n_idle = need_idle;
1627 for (i=0; i < n_capabilities; i++) {
1628 if (capabilities[i]->disabled) {
1629 idle_cap[i] = true;
1630 } else if (n_idle > 0 &&
1631 capabilities[i]->running_task == NULL) {
1632 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1633 n_idle--;
1634 idle_cap[i] = true;
1635 } else {
1636 idle_cap[i] = false;
1637 }
1638 }
1639 // If we didn't find enough inactive capabilities, just pick some
1640 // more to be idle.
1641 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1642 if (!idle_cap[i] && i != cap->no) {
1643 idle_cap[i] = true;
1644 n_idle--;
1645 }
1646 }
1647 ASSERT(n_idle == 0);
1648
1649 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1650 cap = *pcap;
1651 if (was_syncing) {
1652 stgFree(idle_cap);
1653 }
1654 if (was_syncing &&
1655 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1656 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1657 // someone else had a pending sync request for a GC, so
1658 // let's assume GC has been done and we don't need to GC
1659 // again.
1660 // Exception to this: if SCHED_INTERRUPTING, then we still
1661 // need to do the final GC.
1662 return;
1663 }
1664 if (sched_state == SCHED_SHUTTING_DOWN) {
1665 // The scheduler might now be shutting down. We tested
1666 // this above, but it might have become true since then as
1667 // we yielded the capability in requestSync().
1668 return;
1669 }
1670 } while (was_syncing);
1671 }
1672
1673 stat_startGCSync(gc_threads[cap->no]);
1674
1675 #if defined(DEBUG)
1676 unsigned int old_n_capabilities = n_capabilities;
1677 #endif
1678
1679 interruptAllCapabilities();
1680
1681 // The final shutdown GC is always single-threaded, because it's
1682 // possible that some of the Capabilities have no worker threads.
1683
1684 if (gc_type == SYNC_GC_SEQ) {
1685 traceEventRequestSeqGc(cap);
1686 } else {
1687 traceEventRequestParGc(cap);
1688 }
1689
1690 if (gc_type == SYNC_GC_SEQ) {
1691 // single-threaded GC: grab all the capabilities
1692 acquireAllCapabilities(cap,task);
1693 }
1694 else
1695 {
1696 // If we are load-balancing collections in this
1697 // generation, then we require all GC threads to participate
1698 // in the collection. Otherwise, we only require active
1699 // threads to participate, and we set gc_threads[i]->idle for
1700 // any idle capabilities. The rationale here is that waking
1701 // up an idle Capability takes much longer than just doing any
1702 // GC work on its behalf.
1703
1704 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1705 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1706 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1707 {
1708 for (i=0; i < n_capabilities; i++) {
1709 if (capabilities[i]->disabled) {
1710 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1711 if (idle_cap[i]) {
1712 n_idle_caps++;
1713 }
1714 } else {
1715 if (i != cap->no && idle_cap[i]) {
1716 Capability *tmpcap = capabilities[i];
1717 task->cap = tmpcap;
1718 waitForCapability(&tmpcap, task);
1719 n_idle_caps++;
1720 }
1721 }
1722 }
1723 }
1724 else
1725 {
1726 for (i=0; i < n_capabilities; i++) {
1727 if (capabilities[i]->disabled) {
1728 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1729 if (idle_cap[i]) {
1730 n_idle_caps++;
1731 }
1732 } else if (i != cap->no &&
1733 capabilities[i]->idle >=
1734 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1735 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1736 if (idle_cap[i]) {
1737 n_idle_caps++;
1738 } else {
1739 n_failed_trygrab_idles++;
1740 }
1741 }
1742 }
1743 }
1744 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1745
1746 for (i=0; i < n_capabilities; i++) {
1747 capabilities[i]->idle++;
1748 }
1749
1750 // For all capabilities participating in this GC, wait until
1751 // they have stopped mutating and are standing by for GC.
1752 waitForGcThreads(cap, idle_cap);
1753
1754 #if defined(THREADED_RTS)
1755 // Stable point where we can do a global check on our spark counters
1756 ASSERT(checkSparkCountInvariant());
1757 #endif
1758 }
1759
1760 #endif
1761
1762 IF_DEBUG(scheduler, printAllThreads());
1763
1764 delete_threads_and_gc:
1765 /*
1766 * We now have all the capabilities; if we're in an interrupting
1767 * state, then we should take the opportunity to delete all the
1768 * threads in the system.
1769 * Checking for major_gc ensures that the last GC is major.
1770 */
1771 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1772 deleteAllThreads(cap);
1773 #if defined(THREADED_RTS)
1774 // Discard all the sparks from every Capability. Why?
1775 // They'll probably be GC'd anyway since we've killed all the
1776 // threads. It just avoids the GC having to do any work to
1777 // figure out that any remaining sparks are garbage.
1778 for (i = 0; i < n_capabilities; i++) {
1779 capabilities[i]->spark_stats.gcd +=
1780 sparkPoolSize(capabilities[i]->sparks);
1781 // No race here since all Caps are stopped.
1782 discardSparksCap(capabilities[i]);
1783 }
1784 #endif
1785 sched_state = SCHED_SHUTTING_DOWN;
1786 }
1787
1788 /*
1789 * When there are disabled capabilities, we want to migrate any
1790 * threads away from them. Normally this happens in the
1791 * scheduler's loop, but only for unbound threads - it's really
1792 * hard for a bound thread to migrate itself. So we have another
1793 * go here.
1794 */
1795 #if defined(THREADED_RTS)
1796 for (i = enabled_capabilities; i < n_capabilities; i++) {
1797 Capability *tmp_cap, *dest_cap;
1798 tmp_cap = capabilities[i];
1799 ASSERT(tmp_cap->disabled);
1800 if (i != cap->no) {
1801 dest_cap = capabilities[i % enabled_capabilities];
1802 while (!emptyRunQueue(tmp_cap)) {
1803 tso = popRunQueue(tmp_cap);
1804 migrateThread(tmp_cap, tso, dest_cap);
1805 if (tso->bound) {
1806 traceTaskMigrate(tso->bound->task,
1807 tso->bound->task->cap,
1808 dest_cap);
1809 tso->bound->task->cap = dest_cap;
1810 }
1811 }
1812 }
1813 }
1814 #endif
1815
1816 #if defined(THREADED_RTS)
1817 // reset pending_sync *before* GC, so that when the GC threads
1818 // emerge they don't immediately re-enter the GC.
1819 pending_sync = 0;
1820 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1821 #else
1822 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1823 #endif
1824
1825 traceSparkCounters(cap);
1826
1827 switch (recent_activity) {
1828 case ACTIVITY_INACTIVE:
1829 if (force_major) {
1830 // We are doing a GC because the system has been idle for a
1831 // timeslice and we need to check for deadlock. Record the
1832 // fact that we've done a GC and turn off the timer signal;
1833 // it will get re-enabled if we run any threads after the GC.
1834 recent_activity = ACTIVITY_DONE_GC;
1835 #if !defined(PROFILING)
1836 stopTimer();
1837 #endif
1838 break;
1839 }
1840 // fall through...
1841
1842 case ACTIVITY_MAYBE_NO:
1843 // the GC might have taken long enough for the timer to set
1844 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1845 // but we aren't necessarily deadlocked:
1846 recent_activity = ACTIVITY_YES;
1847 break;
1848
1849 case ACTIVITY_DONE_GC:
1850 // If we are actually active, the scheduler will reset the
1851 // recent_activity flag and re-enable the timer.
1852 break;
1853 }
1854
1855 #if defined(THREADED_RTS)
1856 // Stable point where we can do a global check on our spark counters
1857 ASSERT(checkSparkCountInvariant());
1858 #endif
1859
1860 // The heap census itself is done during GarbageCollect().
1861 if (heap_census) {
1862 performHeapProfile = false;
1863 }
1864
1865 #if defined(THREADED_RTS)
1866
1867 // If n_capabilities has changed during GC, we're in trouble.
1868 ASSERT(n_capabilities == old_n_capabilities);
1869
1870 if (gc_type == SYNC_GC_PAR)
1871 {
1872 for (i = 0; i < n_capabilities; i++) {
1873 if (i != cap->no) {
1874 if (idle_cap[i]) {
1875 ASSERT(capabilities[i]->running_task == task);
1876 task->cap = capabilities[i];
1877 releaseCapability(capabilities[i]);
1878 } else {
1879 ASSERT(capabilities[i]->running_task != task);
1880 }
1881 }
1882 }
1883 task->cap = cap;
1884
1885 // releaseGCThreads() happens *after* we have released idle
1886 // capabilities. Otherwise what can happen is one of the released
1887 // threads starts a new GC, and finds that it can't acquire some of
1888 // the disabled capabilities, because the previous GC still holds
1889 // them, so those disabled capabilities will not be idle during the
1890 // next GC round. However, if we release the capabilities first,
1891 // then they will be free (because they're disabled) when the next
1892 // GC cycle happens.
1893 releaseGCThreads(cap, idle_cap);
1894 }
1895 #endif
1896 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1897 // GC set the heap_overflow flag. We should throw an exception if we
1898 // can, or shut down otherwise.
1899
1900 // Get the thread to which Ctrl-C is thrown
1901 StgTSO *main_thread = getTopHandlerThread();
1902 if (main_thread == NULL) {
1903 // GC set the heap_overflow flag, and there is no main thread to
1904 // throw an exception to, so we should proceed with an orderly
1905 // shutdown now. Ultimately we want the main thread to return to
1906 // its caller with HeapExhausted, at which point the caller should
1907 // call hs_exit(). The first step is to delete all the threads.
1908 sched_state = SCHED_INTERRUPTING;
1909 goto delete_threads_and_gc;
1910 }
1911
1912 heap_overflow = false;
1913 const uint64_t allocation_count = getAllocations();
1914 if (RtsFlags.GcFlags.heapLimitGrace <
1915 allocation_count - allocated_bytes_at_heapoverflow ||
1916 allocated_bytes_at_heapoverflow == 0) {
1917 allocated_bytes_at_heapoverflow = allocation_count;
1918 // We used to simply exit, but throwing an exception gives the
1919 // program a chance to clean up. It also lets the exception be
1920 // caught.
1921
1922 // FIXME this is not a good way to tell a program to release
1923 // resources. It is neither reliable (the RTS crashes if it fails
1924 // to allocate memory from the OS) nor very usable (it is always
1925 // thrown to the main thread, which might not be able to do anything
1926 // useful with it). We really should have a more general way to
1927 // release resources in low-memory conditions. Nevertheless, this
1928 // is still a big improvement over just exiting.
1929
1930 // FIXME again: perhaps we should throw a synchronous exception
1931 // instead an asynchronous one, or have a way for the program to
1932 // register a handler to be called when heap overflow happens.
1933 throwToSelf(cap, main_thread, heapOverflow_closure);
1934 }
1935 }
1936 #if defined(SPARKBALANCE)
1937 /* JB
1938 Once we are all together... this would be the place to balance all
1939 spark pools. No concurrent stealing or adding of new sparks can
1940 occur. Should be defined in Sparks.c. */
1941 balanceSparkPoolsCaps(n_capabilities, capabilities);
1942 #endif
1943
1944 #if defined(THREADED_RTS)
1945 stgFree(idle_cap);
1946
1947 if (gc_type == SYNC_GC_SEQ) {
1948 // release our stash of capabilities.
1949 releaseAllCapabilities(n_capabilities, cap, task);
1950 }
1951 #endif
1952
1953 return;
1954 }
1955
1956 /* ---------------------------------------------------------------------------
1957 * Singleton fork(). Do not copy any running threads.
1958 * ------------------------------------------------------------------------- */
1959
1960 pid_t
1961 forkProcess(HsStablePtr *entry
1962 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1963 STG_UNUSED
1964 #endif
1965 )
1966 {
1967 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1968 pid_t pid;
1969 StgTSO* t,*next;
1970 Capability *cap;
1971 uint32_t g;
1972 Task *task = NULL;
1973 uint32_t i;
1974
1975 debugTrace(DEBUG_sched, "forking!");
1976
1977 task = newBoundTask();
1978
1979 cap = NULL;
1980 waitForCapability(&cap, task);
1981
1982 #if defined(THREADED_RTS)
1983 stopAllCapabilities(&cap, task);
1984 #endif
1985
1986 // no funny business: hold locks while we fork, otherwise if some
1987 // other thread is holding a lock when the fork happens, the data
1988 // structure protected by the lock will forever be in an
1989 // inconsistent state in the child. See also #1391.
1990 ACQUIRE_LOCK(&sched_mutex);
1991 ACQUIRE_LOCK(&sm_mutex);
1992 ACQUIRE_LOCK(&stable_mutex);
1993 ACQUIRE_LOCK(&task->lock);
1994
1995 for (i=0; i < n_capabilities; i++) {
1996 ACQUIRE_LOCK(&capabilities[i]->lock);
1997 }
1998
1999 #if defined(THREADED_RTS)
2000 ACQUIRE_LOCK(&all_tasks_mutex);
2001 #endif
2002
2003 stopTimer(); // See #4074
2004
2005 #if defined(TRACING)
2006 flushEventLog(); // so that child won't inherit dirty file buffers
2007 #endif
2008
2009 pid = fork();
2010
2011 if (pid) { // parent
2012
2013 startTimer(); // #4074
2014
2015 RELEASE_LOCK(&sched_mutex);
2016 RELEASE_LOCK(&sm_mutex);
2017 RELEASE_LOCK(&stable_mutex);
2018 RELEASE_LOCK(&task->lock);
2019
2020 for (i=0; i < n_capabilities; i++) {
2021 releaseCapability_(capabilities[i],false);
2022 RELEASE_LOCK(&capabilities[i]->lock);
2023 }
2024
2025 #if defined(THREADED_RTS)
2026 RELEASE_LOCK(&all_tasks_mutex);
2027 #endif
2028
2029 boundTaskExiting(task);
2030
2031 // just return the pid
2032 return pid;
2033
2034 } else { // child
2035
2036 #if defined(THREADED_RTS)
2037 initMutex(&sched_mutex);
2038 initMutex(&sm_mutex);
2039 initMutex(&stable_mutex);
2040 initMutex(&task->lock);
2041
2042 for (i=0; i < n_capabilities; i++) {
2043 initMutex(&capabilities[i]->lock);
2044 }
2045
2046 initMutex(&all_tasks_mutex);
2047 #endif
2048
2049 #if defined(TRACING)
2050 resetTracing();
2051 #endif
2052
2053 // Now, all OS threads except the thread that forked are
2054 // stopped. We need to stop all Haskell threads, including
2055 // those involved in foreign calls. Also we need to delete
2056 // all Tasks, because they correspond to OS threads that are
2057 // now gone.
2058
2059 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2060 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2061 next = t->global_link;
2062 // don't allow threads to catch the ThreadKilled
2063 // exception, but we do want to raiseAsync() because these
2064 // threads may be evaluating thunks that we need later.
2065 deleteThread_(t->cap,t);
2066
2067 // stop the GC from updating the InCall to point to
2068 // the TSO. This is only necessary because the
2069 // OSThread bound to the TSO has been killed, and
2070 // won't get a chance to exit in the usual way (see
2071 // also scheduleHandleThreadFinished).
2072 t->bound = NULL;
2073 }
2074 }
2075
2076 discardTasksExcept(task);
2077
2078 for (i=0; i < n_capabilities; i++) {
2079 cap = capabilities[i];
2080
2081 // Empty the run queue. It seems tempting to let all the
2082 // killed threads stay on the run queue as zombies to be
2083 // cleaned up later, but some of them may correspond to
2084 // bound threads for which the corresponding Task does not
2085 // exist.
2086 truncateRunQueue(cap);
2087 cap->n_run_queue = 0;
2088
2089 // Any suspended C-calling Tasks are no more, their OS threads
2090 // don't exist now:
2091 cap->suspended_ccalls = NULL;
2092 cap->n_suspended_ccalls = 0;
2093
2094 #if defined(THREADED_RTS)
2095 // Wipe our spare workers list, they no longer exist. New
2096 // workers will be created if necessary.
2097 cap->spare_workers = NULL;
2098 cap->n_spare_workers = 0;
2099 cap->returning_tasks_hd = NULL;
2100 cap->returning_tasks_tl = NULL;
2101 cap->n_returning_tasks = 0;
2102 #endif
2103
2104 // Release all caps except 0, we'll use that for starting
2105 // the IO manager and running the client action below.
2106 if (cap->no != 0) {
2107 task->cap = cap;
2108 releaseCapability(cap);
2109 }
2110 }
2111 cap = capabilities[0];
2112 task->cap = cap;
2113
2114 // Empty the threads lists. Otherwise, the garbage
2115 // collector may attempt to resurrect some of these threads.
2116 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2117 generations[g].threads = END_TSO_QUEUE;
2118 }
2119
2120 // On Unix, all timers are reset in the child, so we need to start
2121 // the timer again.
2122 initTimer();
2123 startTimer();
2124
2125 // TODO: need to trace various other things in the child
2126 // like startup event, capabilities, process info etc
2127 traceTaskCreate(task, cap);
2128
2129 #if defined(THREADED_RTS)
2130 ioManagerStartCap(&cap);
2131 #endif
2132
2133 // Install toplevel exception handlers, so interruption
2134 // signal will be sent to the main thread.
2135 // See Trac #12903
2136 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2137 rts_checkSchedStatus("forkProcess",cap);
2138
2139 rts_unlock(cap);
2140 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2141 }
2142 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2143 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2144 #endif
2145 }
2146
2147 /* ---------------------------------------------------------------------------
2148 * Changing the number of Capabilities
2149 *
2150 * Changing the number of Capabilities is very tricky! We can only do
2151 * it with the system fully stopped, so we do a full sync with
2152 * requestSync(SYNC_OTHER) and grab all the capabilities.
2153 *
2154 * Then we resize the appropriate data structures, and update all
2155 * references to the old data structures which have now moved.
2156 * Finally we release the Capabilities we are holding, and start
2157 * worker Tasks on the new Capabilities we created.
2158 *
2159 * ------------------------------------------------------------------------- */
2160
2161 void
2162 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2163 {
2164 #if !defined(THREADED_RTS)
2165 if (new_n_capabilities != 1) {
2166 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2167 }
2168 return;
2169 #elif defined(NOSMP)
2170 if (new_n_capabilities != 1) {
2171 errorBelch("setNumCapabilities: not supported on this platform");
2172 }
2173 return;
2174 #else
2175 Task *task;
2176 Capability *cap;
2177 uint32_t n;
2178 Capability *old_capabilities = NULL;
2179 uint32_t old_n_capabilities = n_capabilities;
2180
2181 if (new_n_capabilities == enabled_capabilities) {
2182 return;
2183 } else if (new_n_capabilities <= 0) {
2184 errorBelch("setNumCapabilities: Capability count must be positive");
2185 return;
2186 }
2187
2188
2189 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2190 enabled_capabilities, new_n_capabilities);
2191
2192 cap = rts_lock();
2193 task = cap->running_task;
2194
2195 stopAllCapabilities(&cap, task);
2196
2197 if (new_n_capabilities < enabled_capabilities)
2198 {
2199 // Reducing the number of capabilities: we do not actually
2200 // remove the extra capabilities, we just mark them as
2201 // "disabled". This has the following effects:
2202 //
2203 // - threads on a disabled capability are migrated away by the
2204 // scheduler loop
2205 //
2206 // - disabled capabilities do not participate in GC
2207 // (see scheduleDoGC())
2208 //
2209 // - No spark threads are created on this capability
2210 // (see scheduleActivateSpark())
2211 //
2212 // - We do not attempt to migrate threads *to* a disabled
2213 // capability (see schedulePushWork()).
2214 //
2215 // but in other respects, a disabled capability remains
2216 // alive. Threads may be woken up on a disabled capability,
2217 // but they will be immediately migrated away.
2218 //
2219 // This approach is much easier than trying to actually remove
2220 // the capability; we don't have to worry about GC data
2221 // structures, the nursery, etc.
2222 //
2223 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2224 capabilities[n]->disabled = true;
2225 traceCapDisable(capabilities[n]);
2226 }
2227 enabled_capabilities = new_n_capabilities;
2228 }
2229 else
2230 {
2231 // Increasing the number of enabled capabilities.
2232 //
2233 // enable any disabled capabilities, up to the required number
2234 for (n = enabled_capabilities;
2235 n < new_n_capabilities && n < n_capabilities; n++) {
2236 capabilities[n]->disabled = false;
2237 traceCapEnable(capabilities[n]);
2238 }
2239 enabled_capabilities = n;
2240
2241 if (new_n_capabilities > n_capabilities) {
2242 #if defined(TRACING)
2243 // Allocate eventlog buffers for the new capabilities. Note this
2244 // must be done before calling moreCapabilities(), because that
2245 // will emit events about creating the new capabilities and adding
2246 // them to existing capsets.
2247 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2248 #endif
2249
2250 // Resize the capabilities array
2251 // NB. after this, capabilities points somewhere new. Any pointers
2252 // of type (Capability *) are now invalid.
2253 moreCapabilities(n_capabilities, new_n_capabilities);
2254
2255 // Resize and update storage manager data structures
2256 storageAddCapabilities(n_capabilities, new_n_capabilities);
2257 }
2258 }
2259
2260 // update n_capabilities before things start running
2261 if (new_n_capabilities > n_capabilities) {
2262 n_capabilities = enabled_capabilities = new_n_capabilities;
2263 }
2264
2265 // We're done: release the original Capabilities
2266 releaseAllCapabilities(old_n_capabilities, cap,task);
2267
2268 // We can't free the old array until now, because we access it
2269 // while updating pointers in updateCapabilityRefs().
2270 if (old_capabilities) {
2271 stgFree(old_capabilities);
2272 }
2273
2274 // Notify IO manager that the number of capabilities has changed.
2275 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2276
2277 rts_unlock(cap);
2278
2279 #endif // THREADED_RTS
2280 }
2281
2282
2283
2284 /* ---------------------------------------------------------------------------
2285 * Delete all the threads in the system
2286 * ------------------------------------------------------------------------- */
2287
2288 static void
2289 deleteAllThreads ( Capability *cap )
2290 {
2291 // NOTE: only safe to call if we own all capabilities.
2292
2293 StgTSO* t, *next;
2294 uint32_t g;
2295
2296 debugTrace(DEBUG_sched,"deleting all threads");
2297 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2298 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2299 next = t->global_link;
2300 deleteThread(cap,t);
2301 }
2302 }
2303
2304 // The run queue now contains a bunch of ThreadKilled threads. We
2305 // must not throw these away: the main thread(s) will be in there
2306 // somewhere, and the main scheduler loop has to deal with it.
2307 // Also, the run queue is the only thing keeping these threads from
2308 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2309
2310 #if !defined(THREADED_RTS)
2311 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2312 ASSERT(sleeping_queue == END_TSO_QUEUE);
2313 #endif
2314 }
2315
2316 /* -----------------------------------------------------------------------------
2317 Managing the suspended_ccalls list.
2318 Locks required: sched_mutex
2319 -------------------------------------------------------------------------- */
2320
2321 STATIC_INLINE void
2322 suspendTask (Capability *cap, Task *task)
2323 {
2324 InCall *incall;
2325
2326 incall = task->incall;
2327 ASSERT(incall->next == NULL && incall->prev == NULL);
2328 incall->next = cap->suspended_ccalls;
2329 incall->prev = NULL;
2330 if (cap->suspended_ccalls) {
2331 cap->suspended_ccalls->prev = incall;
2332 }
2333 cap->suspended_ccalls = incall;
2334 cap->n_suspended_ccalls++;
2335 }
2336
2337 STATIC_INLINE void
2338 recoverSuspendedTask (Capability *cap, Task *task)
2339 {
2340 InCall *incall;
2341
2342 incall = task->incall;
2343 if (incall->prev) {
2344 incall->prev->next = incall->next;
2345 } else {
2346 ASSERT(cap->suspended_ccalls == incall);
2347 cap->suspended_ccalls = incall->next;
2348 }
2349 if (incall->next) {
2350 incall->next->prev = incall->prev;
2351 }
2352 incall->next = incall->prev = NULL;
2353 cap->n_suspended_ccalls--;
2354 }
2355
2356 /* ---------------------------------------------------------------------------
2357 * Suspending & resuming Haskell threads.
2358 *
2359 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2360 * its capability before calling the C function. This allows another
2361 * task to pick up the capability and carry on running Haskell
2362 * threads. It also means that if the C call blocks, it won't lock
2363 * the whole system.
2364 *
2365 * The Haskell thread making the C call is put to sleep for the
2366 * duration of the call, on the suspended_ccalling_threads queue. We
2367 * give out a token to the task, which it can use to resume the thread
2368 * on return from the C function.
2369 *
2370 * If this is an interruptible C call, this means that the FFI call may be
2371 * unceremoniously terminated and should be scheduled on an
2372 * unbound worker thread.
2373 * ------------------------------------------------------------------------- */
2374
2375 void *
2376 suspendThread (StgRegTable *reg, bool interruptible)
2377 {
2378 Capability *cap;
2379 int saved_errno;
2380 StgTSO *tso;
2381 Task *task;
2382 #if defined(mingw32_HOST_OS)
2383 StgWord32 saved_winerror;
2384 #endif
2385
2386 saved_errno = errno;
2387 #if defined(mingw32_HOST_OS)
2388 saved_winerror = GetLastError();
2389 #endif
2390
2391 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2392 */
2393 cap = regTableToCapability(reg);
2394
2395 task = cap->running_task;
2396 tso = cap->r.rCurrentTSO;
2397
2398 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2399
2400 // XXX this might not be necessary --SDM
2401 tso->what_next = ThreadRunGHC;
2402
2403 threadPaused(cap,tso);
2404
2405 if (interruptible) {
2406 tso->why_blocked = BlockedOnCCall_Interruptible;
2407 } else {
2408 tso->why_blocked = BlockedOnCCall;
2409 }
2410
2411 // Hand back capability
2412 task->incall->suspended_tso = tso;
2413 task->incall->suspended_cap = cap;
2414
2415 // Otherwise allocate() will write to invalid memory.
2416 cap->r.rCurrentTSO = NULL;
2417
2418 ACQUIRE_LOCK(&cap->lock);
2419
2420 suspendTask(cap,task);
2421 cap->in_haskell = false;
2422 releaseCapability_(cap,false);
2423
2424 RELEASE_LOCK(&cap->lock);
2425
2426 errno = saved_errno;
2427 #if defined(mingw32_HOST_OS)
2428 SetLastError(saved_winerror);
2429 #endif
2430 return task;
2431 }
2432
2433 StgRegTable *
2434 resumeThread (void *task_)
2435 {
2436 StgTSO *tso;
2437 InCall *incall;
2438 Capability *cap;
2439 Task *task = task_;
2440 int saved_errno;
2441 #if defined(mingw32_HOST_OS)
2442 StgWord32 saved_winerror;
2443 #endif
2444
2445 saved_errno = errno;
2446 #if defined(mingw32_HOST_OS)
2447 saved_winerror = GetLastError();
2448 #endif
2449
2450 incall = task->incall;
2451 cap = incall->suspended_cap;
2452 task->cap = cap;
2453
2454 // Wait for permission to re-enter the RTS with the result.
2455 waitForCapability(&cap,task);
2456 // we might be on a different capability now... but if so, our
2457 // entry on the suspended_ccalls list will also have been
2458 // migrated.
2459
2460 // Remove the thread from the suspended list
2461 recoverSuspendedTask(cap,task);
2462
2463 tso = incall->suspended_tso;
2464 incall->suspended_tso = NULL;
2465 incall->suspended_cap = NULL;
2466 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2467
2468 traceEventRunThread(cap, tso);
2469
2470 /* Reset blocking status */
2471 tso->why_blocked = NotBlocked;
2472
2473 if ((tso->flags & TSO_BLOCKEX) == 0) {
2474 // avoid locking the TSO if we don't have to
2475 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2476 maybePerformBlockedException(cap,tso);
2477 }
2478 }
2479
2480 cap->r.rCurrentTSO = tso;
2481 cap->in_haskell = true;
2482 errno = saved_errno;
2483 #if defined(mingw32_HOST_OS)
2484 SetLastError(saved_winerror);
2485 #endif
2486
2487 /* We might have GC'd, mark the TSO dirty again */
2488 dirty_TSO(cap,tso);
2489 dirty_STACK(cap,tso->stackobj);
2490
2491 IF_DEBUG(sanity, checkTSO(tso));
2492
2493 return &cap->r;
2494 }
2495
2496 /* ---------------------------------------------------------------------------
2497 * scheduleThread()
2498 *
2499 * scheduleThread puts a thread on the end of the runnable queue.
2500 * This will usually be done immediately after a thread is created.
2501 * The caller of scheduleThread must create the thread using e.g.
2502 * createThread and push an appropriate closure
2503 * on this thread's stack before the scheduler is invoked.
2504 * ------------------------------------------------------------------------ */
2505
2506 void
2507 scheduleThread(Capability *cap, StgTSO *tso)
2508 {
2509 // The thread goes at the *end* of the run-queue, to avoid possible
2510 // starvation of any threads already on the queue.
2511 appendToRunQueue(cap,tso);
2512 }
2513
2514 void
2515 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2516 {
2517 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2518 // move this thread from now on.
2519 #if defined(THREADED_RTS)
2520 cpu %= enabled_capabilities;
2521 if (cpu == cap->no) {
2522 appendToRunQueue(cap,tso);
2523 } else {
2524 migrateThread(cap, tso, capabilities[cpu]);
2525 }
2526 #else
2527 appendToRunQueue(cap,tso);
2528 #endif
2529 }
2530
2531 void
2532 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2533 {
2534 Task *task;
2535 DEBUG_ONLY( StgThreadID id );
2536 Capability *cap;
2537
2538 cap = *pcap;
2539
2540 // We already created/initialised the Task
2541 task = cap->running_task;
2542
2543 // This TSO is now a bound thread; make the Task and TSO
2544 // point to each other.
2545 tso->bound = task->incall;
2546 tso->cap = cap;
2547
2548 task->incall->tso = tso;
2549 task->incall->ret = ret;
2550 task->incall->rstat = NoStatus;
2551
2552 appendToRunQueue(cap,tso);
2553
2554 DEBUG_ONLY( id = tso->id );
2555 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2556
2557 cap = schedule(cap,task);
2558
2559 ASSERT(task->incall->rstat != NoStatus);
2560 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2561
2562 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2563 *pcap = cap;
2564 }
2565
2566 /* ----------------------------------------------------------------------------
2567 * Starting Tasks
2568 * ------------------------------------------------------------------------- */
2569
2570 #if defined(THREADED_RTS)
2571 void scheduleWorker (Capability *cap, Task *task)
2572 {
2573 // schedule() runs without a lock.
2574 cap = schedule(cap,task);
2575
2576 // On exit from schedule(), we have a Capability, but possibly not
2577 // the same one we started with.
2578
2579 // During shutdown, the requirement is that after all the
2580 // Capabilities are shut down, all workers that are shutting down
2581 // have finished workerTaskStop(). This is why we hold on to
2582 // cap->lock until we've finished workerTaskStop() below.
2583 //
2584 // There may be workers still involved in foreign calls; those
2585 // will just block in waitForCapability() because the
2586 // Capability has been shut down.
2587 //
2588 ACQUIRE_LOCK(&cap->lock);
2589 releaseCapability_(cap,false);
2590 workerTaskStop(task);
2591 RELEASE_LOCK(&cap->lock);
2592 }
2593 #endif
2594
2595 /* ---------------------------------------------------------------------------
2596 * Start new worker tasks on Capabilities from--to
2597 * -------------------------------------------------------------------------- */
2598
2599 static void
2600 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2601 {
2602 #if defined(THREADED_RTS)
2603 uint32_t i;
2604 Capability *cap;
2605
2606 for (i = from; i < to; i++) {
2607 cap = capabilities[i];
2608 ACQUIRE_LOCK(&cap->lock);
2609 startWorkerTask(cap);
2610 RELEASE_LOCK(&cap->lock);
2611 }
2612 #endif
2613 }
2614
2615 /* ---------------------------------------------------------------------------
2616 * initScheduler()
2617 *
2618 * Initialise the scheduler. This resets all the queues - if the
2619 * queues contained any threads, they'll be garbage collected at the
2620 * next pass.
2621 *
2622 * ------------------------------------------------------------------------ */
2623
2624 void
2625 initScheduler(void)
2626 {
2627 #if !defined(THREADED_RTS)
2628 blocked_queue_hd = END_TSO_QUEUE;
2629 blocked_queue_tl = END_TSO_QUEUE;
2630 sleeping_queue = END_TSO_QUEUE;
2631 #endif
2632
2633 sched_state = SCHED_RUNNING;
2634 recent_activity = ACTIVITY_YES;
2635
2636 #if defined(THREADED_RTS)
2637 /* Initialise the mutex and condition variables used by
2638 * the scheduler. */
2639 initMutex(&sched_mutex);
2640 #endif
2641
2642 ACQUIRE_LOCK(&sched_mutex);
2643
2644 allocated_bytes_at_heapoverflow = 0;
2645
2646 /* A capability holds the state a native thread needs in
2647 * order to execute STG code. At least one capability is
2648 * floating around (only THREADED_RTS builds have more than one).
2649 */
2650 initCapabilities();
2651
2652 initTaskManager();
2653
2654 /*
2655 * Eagerly start one worker to run each Capability, except for
2656 * Capability 0. The idea is that we're probably going to start a
2657 * bound thread on Capability 0 pretty soon, so we don't want a
2658 * worker task hogging it.
2659 */
2660 startWorkerTasks(1, n_capabilities);
2661
2662 RELEASE_LOCK(&sched_mutex);
2663
2664 }
2665
2666 void
2667 exitScheduler (bool wait_foreign USED_IF_THREADS)
2668 /* see Capability.c, shutdownCapability() */
2669 {
2670 Task *task = NULL;
2671
2672 task = newBoundTask();
2673
2674 // If we haven't killed all the threads yet, do it now.
2675 if (sched_state < SCHED_SHUTTING_DOWN) {
2676 sched_state = SCHED_INTERRUPTING;
2677 Capability *cap = task->cap;
2678 waitForCapability(&cap,task);
2679 scheduleDoGC(&cap,task,true);
2680 ASSERT(task->incall->tso == NULL);
2681 releaseCapability(cap);
2682 }
2683 sched_state = SCHED_SHUTTING_DOWN;
2684
2685 shutdownCapabilities(task, wait_foreign);
2686
2687 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2688 // n_failed_trygrab_idles, n_idle_caps);
2689
2690 boundTaskExiting(task);
2691 }
2692
2693 void
2694 freeScheduler( void )
2695 {
2696 uint32_t still_running;
2697
2698 ACQUIRE_LOCK(&sched_mutex);
2699 still_running = freeTaskManager();
2700 // We can only free the Capabilities if there are no Tasks still
2701 // running. We might have a Task about to return from a foreign
2702 // call into waitForCapability(), for example (actually,
2703 // this should be the *only* thing that a still-running Task can
2704 // do at this point, and it will block waiting for the
2705 // Capability).
2706 if (still_running == 0) {
2707 freeCapabilities();
2708 }
2709 RELEASE_LOCK(&sched_mutex);
2710 #if defined(THREADED_RTS)
2711 closeMutex(&sched_mutex);
2712 #endif
2713 }
2714
2715 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2716 void *user USED_IF_NOT_THREADS)
2717 {
2718 #if !defined(THREADED_RTS)
2719 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2720 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2721 evac(user, (StgClosure **)(void *)&sleeping_queue);
2722 #endif
2723 }
2724
2725 /* -----------------------------------------------------------------------------
2726 performGC
2727
2728 This is the interface to the garbage collector from Haskell land.
2729 We provide this so that external C code can allocate and garbage
2730 collect when called from Haskell via _ccall_GC.
2731 -------------------------------------------------------------------------- */
2732
2733 static void
2734 performGC_(bool force_major)
2735 {
2736 Task *task;
2737 Capability *cap = NULL;
2738
2739 // We must grab a new Task here, because the existing Task may be
2740 // associated with a particular Capability, and chained onto the
2741 // suspended_ccalls queue.
2742 task = newBoundTask();
2743
2744 // TODO: do we need to traceTask*() here?
2745
2746 waitForCapability(&cap,task);
2747 scheduleDoGC(&cap,task,force_major);
2748 releaseCapability(cap);
2749 boundTaskExiting(task);
2750 }
2751
2752 void
2753 performGC(void)
2754 {
2755 performGC_(false);
2756 }
2757
2758 void
2759 performMajorGC(void)
2760 {
2761 performGC_(true);
2762 }
2763
2764 /* ---------------------------------------------------------------------------
2765 Interrupt execution.
2766 Might be called inside a signal handler so it mustn't do anything fancy.
2767 ------------------------------------------------------------------------ */
2768
2769 void
2770 interruptStgRts(void)
2771 {
2772 sched_state = SCHED_INTERRUPTING;
2773 interruptAllCapabilities();
2774 #if defined(THREADED_RTS)
2775 wakeUpRts();
2776 #endif
2777 }
2778
2779 /* -----------------------------------------------------------------------------
2780 Wake up the RTS
2781
2782 This function causes at least one OS thread to wake up and run the
2783 scheduler loop. It is invoked when the RTS might be deadlocked, or
2784 an external event has arrived that may need servicing (eg. a
2785 keyboard interrupt).
2786
2787 In the single-threaded RTS we don't do anything here; we only have
2788 one thread anyway, and the event that caused us to want to wake up
2789 will have interrupted any blocking system call in progress anyway.
2790 -------------------------------------------------------------------------- */
2791
2792 #if defined(THREADED_RTS)
2793 void wakeUpRts(void)
2794 {
2795 // This forces the IO Manager thread to wakeup, which will
2796 // in turn ensure that some OS thread wakes up and runs the
2797 // scheduler loop, which will cause a GC and deadlock check.
2798 ioManagerWakeup();
2799 }
2800 #endif
2801
2802 /* -----------------------------------------------------------------------------
2803 Deleting threads
2804
2805 This is used for interruption (^C) and forking, and corresponds to
2806 raising an exception but without letting the thread catch the
2807 exception.
2808 -------------------------------------------------------------------------- */
2809
2810 static void
2811 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2812 {
2813 // NOTE: must only be called on a TSO that we have exclusive
2814 // access to, because we will call throwToSingleThreaded() below.
2815 // The TSO must be on the run queue of the Capability we own, or
2816 // we must own all Capabilities.
2817
2818 if (tso->why_blocked != BlockedOnCCall &&
2819 tso->why_blocked != BlockedOnCCall_Interruptible) {
2820 throwToSingleThreaded(tso->cap,tso,NULL);
2821 }
2822 }
2823
2824 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2825 static void
2826 deleteThread_(Capability *cap, StgTSO *tso)
2827 { // for forkProcess only:
2828 // like deleteThread(), but we delete threads in foreign calls, too.
2829
2830 if (tso->why_blocked == BlockedOnCCall ||
2831 tso->why_blocked == BlockedOnCCall_Interruptible) {
2832 tso->what_next = ThreadKilled;
2833 appendToRunQueue(tso->cap, tso);
2834 } else {
2835 deleteThread(cap,tso);
2836 }
2837 }
2838 #endif
2839
2840 /* -----------------------------------------------------------------------------
2841 raiseExceptionHelper
2842
2843 This function is called by the raise# primitive, just so that we can
2844 move some of the tricky bits of raising an exception from C-- into
2845 C. Who knows, it might be a useful re-useable thing here too.
2846 -------------------------------------------------------------------------- */
2847
2848 StgWord
2849 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2850 {
2851 Capability *cap = regTableToCapability(reg);
2852 StgThunk *raise_closure = NULL;
2853 StgPtr p, next;
2854 const StgRetInfoTable *info;
2855 //
2856 // This closure represents the expression 'raise# E' where E
2857 // is the exception raise. It is used to overwrite all the
2858 // thunks which are currently under evaluation.
2859 //
2860
2861 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2862 // LDV profiling: stg_raise_info has THUNK as its closure
2863 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2864 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2865 // 1 does not cause any problem unless profiling is performed.
2866 // However, when LDV profiling goes on, we need to linearly scan
2867 // small object pool, where raise_closure is stored, so we should
2868 // use MIN_UPD_SIZE.
2869 //
2870 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2871 // sizeofW(StgClosure)+1);
2872 //
2873
2874 //
2875 // Walk up the stack, looking for the catch frame. On the way,
2876 // we update any closures pointed to from update frames with the
2877 // raise closure that we just built.
2878 //
2879 p = tso->stackobj->sp;
2880 while(1) {
2881 info = get_ret_itbl((StgClosure *)p);
2882 next = p + stack_frame_sizeW((StgClosure *)p);
2883 switch (info->i.type) {
2884
2885 case UPDATE_FRAME:
2886 // Only create raise_closure if we need to.
2887 if (raise_closure == NULL) {
2888 raise_closure =
2889 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2890 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2891 raise_closure->payload[0] = exception;
2892 }
2893 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2894 (StgClosure *)raise_closure);
2895 p = next;
2896 continue;
2897
2898 case ATOMICALLY_FRAME:
2899 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2900 tso->stackobj->sp = p;
2901 return ATOMICALLY_FRAME;
2902
2903 case CATCH_FRAME:
2904 tso->stackobj->sp = p;
2905 return CATCH_FRAME;
2906
2907 case CATCH_STM_FRAME:
2908 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2909 tso->stackobj->sp = p;
2910 return CATCH_STM_FRAME;
2911
2912 case UNDERFLOW_FRAME:
2913 tso->stackobj->sp = p;
2914 threadStackUnderflow(cap,tso);
2915 p = tso->stackobj->sp;
2916 continue;
2917
2918 case STOP_FRAME:
2919 tso->stackobj->sp = p;
2920 return STOP_FRAME;
2921
2922 case CATCH_RETRY_FRAME: {
2923 StgTRecHeader *trec = tso -> trec;
2924 StgTRecHeader *outer = trec -> enclosing_trec;
2925 debugTrace(DEBUG_stm,
2926 "found CATCH_RETRY_FRAME at %p during raise", p);
2927 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2928 stmAbortTransaction(cap, trec);
2929 stmFreeAbortedTRec(cap, trec);
2930 tso -> trec = outer;
2931 p = next;
2932 continue;
2933 }
2934
2935 default:
2936 p = next;
2937 continue;
2938 }
2939 }
2940 }
2941
2942
2943 /* -----------------------------------------------------------------------------
2944 findRetryFrameHelper
2945
2946 This function is called by the retry# primitive. It traverses the stack
2947 leaving tso->sp referring to the frame which should handle the retry.
2948
2949 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2950 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2951
2952 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2953 create) because retries are not considered to be exceptions, despite the
2954 similar implementation.
2955
2956 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2957 not be created within memory transactions.
2958 -------------------------------------------------------------------------- */
2959
2960 StgWord
2961 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2962 {
2963 const StgRetInfoTable *info;
2964 StgPtr p, next;
2965
2966 p = tso->stackobj->sp;
2967 while (1) {
2968 info = get_ret_itbl((const StgClosure *)p);
2969 next = p + stack_frame_sizeW((StgClosure *)p);
2970 switch (info->i.type) {
2971
2972 case ATOMICALLY_FRAME:
2973 debugTrace(DEBUG_stm,
2974 "found ATOMICALLY_FRAME at %p during retry", p);
2975 tso->stackobj->sp = p;
2976 return ATOMICALLY_FRAME;
2977
2978 case CATCH_RETRY_FRAME:
2979 debugTrace(DEBUG_stm,
2980 "found CATCH_RETRY_FRAME at %p during retry", p);
2981 tso->stackobj->sp = p;
2982 return CATCH_RETRY_FRAME;
2983
2984 case CATCH_STM_FRAME: {
2985 StgTRecHeader *trec = tso -> trec;
2986 StgTRecHeader *outer = trec -> enclosing_trec;
2987 debugTrace(DEBUG_stm,
2988 "found CATCH_STM_FRAME at %p during retry", p);
2989 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2990 stmAbortTransaction(cap, trec);
2991 stmFreeAbortedTRec(cap, trec);
2992 tso -> trec = outer;
2993 p = next;
2994 continue;
2995 }
2996
2997 case UNDERFLOW_FRAME:
2998 tso->stackobj->sp = p;
2999 threadStackUnderflow(cap,tso);
3000 p = tso->stackobj->sp;
3001 continue;
3002
3003 default:
3004 ASSERT(info->i.type != CATCH_FRAME);
3005 ASSERT(info->i.type != STOP_FRAME);
3006 p = next;
3007 continue;
3008 }
3009 }
3010 }
3011
3012 /* -----------------------------------------------------------------------------
3013 resurrectThreads is called after garbage collection on the list of
3014 threads found to be garbage. Each of these threads will be woken
3015 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3016 on an MVar, or NonTermination if the thread was blocked on a Black
3017 Hole.
3018
3019 Locks: assumes we hold *all* the capabilities.
3020 -------------------------------------------------------------------------- */
3021
3022 void
3023 resurrectThreads (StgTSO *threads)
3024 {
3025 StgTSO *tso, *next;
3026 Capability *cap;
3027 generation *gen;
3028
3029 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3030 next = tso->global_link;
3031
3032 gen = Bdescr((P_)tso)->gen;
3033 tso->global_link = gen->threads;
3034 gen->threads = tso;
3035
3036 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3037
3038 // Wake up the thread on the Capability it was last on
3039 cap = tso->cap;
3040
3041 switch (tso->why_blocked) {
3042 case BlockedOnMVar:
3043 case BlockedOnMVarRead:
3044 /* Called by GC - sched_mutex lock is currently held. */
3045 throwToSingleThreaded(cap, tso,
3046 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3047 break;
3048 case BlockedOnBlackHole:
3049 throwToSingleThreaded(cap, tso,
3050 (StgClosure *)nonTermination_closure);
3051 break;
3052 case BlockedOnSTM:
3053 throwToSingleThreaded(cap, tso,
3054 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3055 break;
3056 case NotBlocked:
3057 /* This might happen if the thread was blocked on a black hole
3058 * belonging to a thread that we've just woken up (raiseAsync
3059 * can wake up threads, remember...).
3060 */
3061 continue;
3062 case BlockedOnMsgThrowTo:
3063 // This can happen if the target is masking, blocks on a
3064 // black hole, and then is found to be unreachable. In
3065 // this case, we want to let the target wake up and carry
3066 // on, and do nothing to this thread.
3067 continue;
3068 default:
3069 barf("resurrectThreads: thread blocked in a strange way: %d",
3070 tso->why_blocked);
3071 }
3072 }
3073 }