Building GHC with hadrian on FreeBSD
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47
48 #if defined(HAVE_SYS_TYPES_H)
49 #include <sys/types.h>
50 #endif
51 #if defined(HAVE_UNISTD_H)
52 #include <unistd.h>
53 #endif
54
55 #include <string.h>
56 #include <stdlib.h>
57 #include <stdarg.h>
58
59 #if defined(HAVE_ERRNO_H)
60 #include <errno.h>
61 #endif
62
63 #if defined(TRACING)
64 #include "eventlog/EventLog.h"
65 #endif
66 /* -----------------------------------------------------------------------------
67 * Global variables
68 * -------------------------------------------------------------------------- */
69
70 #if !defined(THREADED_RTS)
71 // Blocked/sleeping threads
72 StgTSO *blocked_queue_hd = NULL;
73 StgTSO *blocked_queue_tl = NULL;
74 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
75 #endif
76
77 // Bytes allocated since the last time a HeapOverflow exception was thrown by
78 // the RTS
79 uint64_t allocated_bytes_at_heapoverflow = 0;
80
81 /* Set to true when the latest garbage collection failed to reclaim enough
82 * space, and the runtime should proceed to shut itself down in an orderly
83 * fashion (emitting profiling info etc.), OR throw an exception to the main
84 * thread, if it is still alive.
85 */
86 bool heap_overflow = false;
87
88 /* flag that tracks whether we have done any execution in this time slice.
89 * LOCK: currently none, perhaps we should lock (but needs to be
90 * updated in the fast path of the scheduler).
91 *
92 * NB. must be StgWord, we do xchg() on it.
93 */
94 volatile StgWord recent_activity = ACTIVITY_YES;
95
96 /* if this flag is set as well, give up execution
97 * LOCK: none (changes monotonically)
98 */
99 volatile StgWord sched_state = SCHED_RUNNING;
100
101 /*
102 * This mutex protects most of the global scheduler data in
103 * the THREADED_RTS runtime.
104 */
105 #if defined(THREADED_RTS)
106 Mutex sched_mutex;
107 #endif
108
109 #if !defined(mingw32_HOST_OS)
110 #define FORKPROCESS_PRIMOP_SUPPORTED
111 #endif
112
113 /* -----------------------------------------------------------------------------
114 * static function prototypes
115 * -------------------------------------------------------------------------- */
116
117 static Capability *schedule (Capability *initialCapability, Task *task);
118
119 //
120 // These functions all encapsulate parts of the scheduler loop, and are
121 // abstracted only to make the structure and control flow of the
122 // scheduler clearer.
123 //
124 static void schedulePreLoop (void);
125 static void scheduleFindWork (Capability **pcap);
126 #if defined(THREADED_RTS)
127 static void scheduleYield (Capability **pcap, Task *task);
128 #endif
129 #if defined(THREADED_RTS)
130 static bool requestSync (Capability **pcap, Task *task,
131 PendingSync *sync_type, SyncType *prev_sync_type);
132 static void acquireAllCapabilities(Capability *cap, Task *task);
133 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
134 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
135 uint32_t to USED_IF_THREADS);
136 #endif
137 static void scheduleStartSignalHandlers (Capability *cap);
138 static void scheduleCheckBlockedThreads (Capability *cap);
139 static void scheduleProcessInbox(Capability **cap);
140 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
141 static void schedulePushWork(Capability *cap, Task *task);
142 #if defined(THREADED_RTS)
143 static void scheduleActivateSpark(Capability *cap);
144 #endif
145 static void schedulePostRunThread(Capability *cap, StgTSO *t);
146 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
147 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
148 uint32_t prev_what_next );
149 static void scheduleHandleThreadBlocked( StgTSO *t );
150 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
151 StgTSO *t );
152 static bool scheduleNeedHeapProfile(bool ready_to_gc);
153 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
154
155 static void deleteThread (StgTSO *tso);
156 static void deleteAllThreads (void);
157
158 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
159 static void deleteThread_(StgTSO *tso);
160 #endif
161
162 /* ---------------------------------------------------------------------------
163 Main scheduling loop.
164
165 We use round-robin scheduling, each thread returning to the
166 scheduler loop when one of these conditions is detected:
167
168 * out of heap space
169 * timer expires (thread yields)
170 * thread blocks
171 * thread ends
172 * stack overflow
173
174 ------------------------------------------------------------------------ */
175
176 static Capability *
177 schedule (Capability *initialCapability, Task *task)
178 {
179 StgTSO *t;
180 Capability *cap;
181 StgThreadReturnCode ret;
182 uint32_t prev_what_next;
183 bool ready_to_gc;
184
185 cap = initialCapability;
186
187 // Pre-condition: this task owns initialCapability.
188 // The sched_mutex is *NOT* held
189 // NB. on return, we still hold a capability.
190
191 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
192
193 schedulePreLoop();
194
195 // -----------------------------------------------------------
196 // Scheduler loop starts here:
197
198 while (1) {
199
200 // Check whether we have re-entered the RTS from Haskell without
201 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
202 // call).
203 if (cap->in_haskell) {
204 errorBelch("schedule: re-entered unsafely.\n"
205 " Perhaps a 'foreign import unsafe' should be 'safe'?");
206 stg_exit(EXIT_FAILURE);
207 }
208
209 // Note [shutdown]: The interruption / shutdown sequence.
210 //
211 // In order to cleanly shut down the runtime, we want to:
212 // * make sure that all main threads return to their callers
213 // with the state 'Interrupted'.
214 // * clean up all OS threads assocated with the runtime
215 // * free all memory etc.
216 //
217 // So the sequence goes like this:
218 //
219 // * The shutdown sequence is initiated by calling hs_exit(),
220 // interruptStgRts(), or running out of memory in the GC.
221 //
222 // * Set sched_state = SCHED_INTERRUPTING
223 //
224 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
225 // scheduleDoGC(), which halts the whole runtime by acquiring all the
226 // capabilities, does a GC and then calls deleteAllThreads() to kill all
227 // the remaining threads. The zombies are left on the run queue for
228 // cleaning up. We can't kill threads involved in foreign calls.
229 //
230 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
231 //
232 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
233 // enforced by the scheduler, which won't run any Haskell code when
234 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
235 // other capabilities by doing the GC earlier.
236 //
237 // * all workers exit when the run queue on their capability
238 // drains. All main threads will also exit when their TSO
239 // reaches the head of the run queue and they can return.
240 //
241 // * eventually all Capabilities will shut down, and the RTS can
242 // exit.
243 //
244 // * We might be left with threads blocked in foreign calls,
245 // we should really attempt to kill these somehow (TODO).
246
247 switch (sched_state) {
248 case SCHED_RUNNING:
249 break;
250 case SCHED_INTERRUPTING:
251 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
252 /* scheduleDoGC() deletes all the threads */
253 scheduleDoGC(&cap,task,true);
254
255 // after scheduleDoGC(), we must be shutting down. Either some
256 // other Capability did the final GC, or we did it above,
257 // either way we can fall through to the SCHED_SHUTTING_DOWN
258 // case now.
259 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
260 // fall through
261
262 case SCHED_SHUTTING_DOWN:
263 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
264 // If we are a worker, just exit. If we're a bound thread
265 // then we will exit below when we've removed our TSO from
266 // the run queue.
267 if (!isBoundTask(task) && emptyRunQueue(cap)) {
268 return cap;
269 }
270 break;
271 default:
272 barf("sched_state: %" FMT_Word, sched_state);
273 }
274
275 scheduleFindWork(&cap);
276
277 /* work pushing, currently relevant only for THREADED_RTS:
278 (pushes threads, wakes up idle capabilities for stealing) */
279 schedulePushWork(cap,task);
280
281 scheduleDetectDeadlock(&cap,task);
282
283 // Normally, the only way we can get here with no threads to
284 // run is if a keyboard interrupt received during
285 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
286 // Additionally, it is not fatal for the
287 // threaded RTS to reach here with no threads to run.
288 //
289 // win32: might be here due to awaitEvent() being abandoned
290 // as a result of a console event having been delivered.
291
292 #if defined(THREADED_RTS)
293 scheduleYield(&cap,task);
294
295 if (emptyRunQueue(cap)) continue; // look for work again
296 #endif
297
298 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
299 if ( emptyRunQueue(cap) ) {
300 ASSERT(sched_state >= SCHED_INTERRUPTING);
301 }
302 #endif
303
304 //
305 // Get a thread to run
306 //
307 t = popRunQueue(cap);
308
309 // Sanity check the thread we're about to run. This can be
310 // expensive if there is lots of thread switching going on...
311 IF_DEBUG(sanity,checkTSO(t));
312
313 #if defined(THREADED_RTS)
314 // Check whether we can run this thread in the current task.
315 // If not, we have to pass our capability to the right task.
316 {
317 InCall *bound = t->bound;
318
319 if (bound) {
320 if (bound->task == task) {
321 // yes, the Haskell thread is bound to the current native thread
322 } else {
323 debugTrace(DEBUG_sched,
324 "thread %lu bound to another OS thread",
325 (unsigned long)t->id);
326 // no, bound to a different Haskell thread: pass to that thread
327 pushOnRunQueue(cap,t);
328 continue;
329 }
330 } else {
331 // The thread we want to run is unbound.
332 if (task->incall->tso) {
333 debugTrace(DEBUG_sched,
334 "this OS thread cannot run thread %lu",
335 (unsigned long)t->id);
336 // no, the current native thread is bound to a different
337 // Haskell thread, so pass it to any worker thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 }
342 }
343 #endif
344
345 // If we're shutting down, and this thread has not yet been
346 // killed, kill it now. This sometimes happens when a finalizer
347 // thread is created by the final GC, or a thread previously
348 // in a foreign call returns.
349 if (sched_state >= SCHED_INTERRUPTING &&
350 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
351 deleteThread(t);
352 }
353
354 // If this capability is disabled, migrate the thread away rather
355 // than running it. NB. but not if the thread is bound: it is
356 // really hard for a bound thread to migrate itself. Believe me,
357 // I tried several ways and couldn't find a way to do it.
358 // Instead, when everything is stopped for GC, we migrate all the
359 // threads on the run queue then (see scheduleDoGC()).
360 //
361 // ToDo: what about TSO_LOCKED? Currently we're migrating those
362 // when the number of capabilities drops, but we never migrate
363 // them back if it rises again. Presumably we should, but after
364 // the thread has been migrated we no longer know what capability
365 // it was originally on.
366 #if defined(THREADED_RTS)
367 if (cap->disabled && !t->bound) {
368 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
369 migrateThread(cap, t, dest_cap);
370 continue;
371 }
372 #endif
373
374 /* context switches are initiated by the timer signal, unless
375 * the user specified "context switch as often as possible", with
376 * +RTS -C0
377 */
378 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
379 && !emptyThreadQueues(cap)) {
380 cap->context_switch = 1;
381 }
382
383 run_thread:
384
385 // CurrentTSO is the thread to run. It might be different if we
386 // loop back to run_thread, so make sure to set CurrentTSO after
387 // that.
388 cap->r.rCurrentTSO = t;
389
390 startHeapProfTimer();
391
392 // ----------------------------------------------------------------------
393 // Run the current thread
394
395 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
396 ASSERT(t->cap == cap);
397 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
398
399 prev_what_next = t->what_next;
400
401 errno = t->saved_errno;
402 #if defined(mingw32_HOST_OS)
403 SetLastError(t->saved_winerror);
404 #endif
405
406 // reset the interrupt flag before running Haskell code
407 cap->interrupt = 0;
408
409 cap->in_haskell = true;
410 cap->idle = 0;
411
412 dirty_TSO(cap,t);
413 dirty_STACK(cap,t->stackobj);
414
415 switch (recent_activity)
416 {
417 case ACTIVITY_DONE_GC: {
418 // ACTIVITY_DONE_GC means we turned off the timer signal to
419 // conserve power (see #1623). Re-enable it here.
420 uint32_t prev;
421 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
422 if (prev == ACTIVITY_DONE_GC) {
423 #if !defined(PROFILING)
424 startTimer();
425 #endif
426 }
427 break;
428 }
429 case ACTIVITY_INACTIVE:
430 // If we reached ACTIVITY_INACTIVE, then don't reset it until
431 // we've done the GC. The thread running here might just be
432 // the IO manager thread that handle_tick() woke up via
433 // wakeUpRts().
434 break;
435 default:
436 recent_activity = ACTIVITY_YES;
437 }
438
439 traceEventRunThread(cap, t);
440
441 switch (prev_what_next) {
442
443 case ThreadKilled:
444 case ThreadComplete:
445 /* Thread already finished, return to scheduler. */
446 ret = ThreadFinished;
447 break;
448
449 case ThreadRunGHC:
450 {
451 StgRegTable *r;
452 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
453 cap = regTableToCapability(r);
454 ret = r->rRet;
455 break;
456 }
457
458 case ThreadInterpret:
459 cap = interpretBCO(cap);
460 ret = cap->r.rRet;
461 break;
462
463 default:
464 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
465 }
466
467 cap->in_haskell = false;
468
469 // The TSO might have moved, eg. if it re-entered the RTS and a GC
470 // happened. So find the new location:
471 t = cap->r.rCurrentTSO;
472
473 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
474 // don't want it set when not running a Haskell thread.
475 cap->r.rCurrentTSO = NULL;
476
477 // And save the current errno in this thread.
478 // XXX: possibly bogus for SMP because this thread might already
479 // be running again, see code below.
480 t->saved_errno = errno;
481 #if defined(mingw32_HOST_OS)
482 // Similarly for Windows error code
483 t->saved_winerror = GetLastError();
484 #endif
485
486 if (ret == ThreadBlocked) {
487 if (t->why_blocked == BlockedOnBlackHole) {
488 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
489 traceEventStopThread(cap, t, t->why_blocked + 6,
490 owner != NULL ? owner->id : 0);
491 } else {
492 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
493 }
494 } else {
495 traceEventStopThread(cap, t, ret, 0);
496 }
497
498 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
499 ASSERT(t->cap == cap);
500
501 // ----------------------------------------------------------------------
502
503 // Costs for the scheduler are assigned to CCS_SYSTEM
504 stopHeapProfTimer();
505 #if defined(PROFILING)
506 cap->r.rCCCS = CCS_SYSTEM;
507 #endif
508
509 schedulePostRunThread(cap,t);
510
511 ready_to_gc = false;
512
513 switch (ret) {
514 case HeapOverflow:
515 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
516 break;
517
518 case StackOverflow:
519 // just adjust the stack for this thread, then pop it back
520 // on the run queue.
521 threadStackOverflow(cap, t);
522 pushOnRunQueue(cap,t);
523 break;
524
525 case ThreadYielding:
526 if (scheduleHandleYield(cap, t, prev_what_next)) {
527 // shortcut for switching between compiler/interpreter:
528 goto run_thread;
529 }
530 break;
531
532 case ThreadBlocked:
533 scheduleHandleThreadBlocked(t);
534 break;
535
536 case ThreadFinished:
537 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
538 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
539 break;
540
541 default:
542 barf("schedule: invalid thread return code %d", (int)ret);
543 }
544
545 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
546 scheduleDoGC(&cap,task,false);
547 }
548 } /* end of while() */
549 }
550
551 /* -----------------------------------------------------------------------------
552 * Run queue operations
553 * -------------------------------------------------------------------------- */
554
555 static void
556 removeFromRunQueue (Capability *cap, StgTSO *tso)
557 {
558 if (tso->block_info.prev == END_TSO_QUEUE) {
559 ASSERT(cap->run_queue_hd == tso);
560 cap->run_queue_hd = tso->_link;
561 } else {
562 setTSOLink(cap, tso->block_info.prev, tso->_link);
563 }
564 if (tso->_link == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_tl == tso);
566 cap->run_queue_tl = tso->block_info.prev;
567 } else {
568 setTSOPrev(cap, tso->_link, tso->block_info.prev);
569 }
570 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
571 cap->n_run_queue--;
572
573 IF_DEBUG(sanity, checkRunQueue(cap));
574 }
575
576 void
577 promoteInRunQueue (Capability *cap, StgTSO *tso)
578 {
579 removeFromRunQueue(cap, tso);
580 pushOnRunQueue(cap, tso);
581 }
582
583 /* ----------------------------------------------------------------------------
584 * Setting up the scheduler loop
585 * ------------------------------------------------------------------------- */
586
587 static void
588 schedulePreLoop(void)
589 {
590 // initialisation for scheduler - what cannot go into initScheduler()
591
592 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
593 win32AllocStack();
594 #endif
595 }
596
597 /* -----------------------------------------------------------------------------
598 * scheduleFindWork()
599 *
600 * Search for work to do, and handle messages from elsewhere.
601 * -------------------------------------------------------------------------- */
602
603 static void
604 scheduleFindWork (Capability **pcap)
605 {
606 scheduleStartSignalHandlers(*pcap);
607
608 scheduleProcessInbox(pcap);
609
610 scheduleCheckBlockedThreads(*pcap);
611
612 #if defined(THREADED_RTS)
613 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
614 #endif
615 }
616
617 #if defined(THREADED_RTS)
618 STATIC_INLINE bool
619 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
620 {
621 // we need to yield this capability to someone else if..
622 // - another thread is initiating a GC, and we didn't just do a GC
623 // (see Note [GC livelock])
624 // - another Task is returning from a foreign call
625 // - the thread at the head of the run queue cannot be run
626 // by this Task (it is bound to another Task, or it is unbound
627 // and this task it bound).
628 //
629 // Note [GC livelock]
630 //
631 // If we are interrupted to do a GC, then we do not immediately do
632 // another one. This avoids a starvation situation where one
633 // Capability keeps forcing a GC and the other Capabilities make no
634 // progress at all.
635
636 return ((pending_sync && !didGcLast) ||
637 cap->n_returning_tasks != 0 ||
638 (!emptyRunQueue(cap) && (task->incall->tso == NULL
639 ? peekRunQueue(cap)->bound != NULL
640 : peekRunQueue(cap)->bound != task->incall)));
641 }
642
643 // This is the single place where a Task goes to sleep. There are
644 // two reasons it might need to sleep:
645 // - there are no threads to run
646 // - we need to yield this Capability to someone else
647 // (see shouldYieldCapability())
648 //
649 // Careful: the scheduler loop is quite delicate. Make sure you run
650 // the tests in testsuite/concurrent (all ways) after modifying this,
651 // and also check the benchmarks in nofib/parallel for regressions.
652
653 static void
654 scheduleYield (Capability **pcap, Task *task)
655 {
656 Capability *cap = *pcap;
657 bool didGcLast = false;
658
659 // if we have work, and we don't need to give up the Capability, continue.
660 //
661 if (!shouldYieldCapability(cap,task,false) &&
662 (!emptyRunQueue(cap) ||
663 !emptyInbox(cap) ||
664 sched_state >= SCHED_INTERRUPTING)) {
665 return;
666 }
667
668 // otherwise yield (sleep), and keep yielding if necessary.
669 do {
670 if (doIdleGCWork(cap, false)) {
671 // there's more idle GC work to do
672 didGcLast = false;
673 } else {
674 // no more idle GC work to do
675 didGcLast = yieldCapability(&cap,task, !didGcLast);
676 }
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 #if defined(THREADED_RTS)
699
700 Capability *free_caps[n_capabilities], *cap0;
701 uint32_t i, n_wanted_caps, n_free_caps;
702
703 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) {
707 spare_threads = 0;
708 }
709
710 // Figure out how many capabilities we want to wake up. We need at least
711 // sparkPoolSize(cap) plus the number of spare threads we have.
712 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
713 if (n_wanted_caps == 0) return;
714
715 // First grab as many free Capabilities as we can. ToDo: we should use
716 // capabilities on the same NUMA node preferably, but not exclusively.
717 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
718 n_free_caps < n_wanted_caps && i != cap->no;
719 i = (i + 1) % n_capabilities) {
720 cap0 = capabilities[i];
721 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
722 if (!emptyRunQueue(cap0)
723 || cap0->n_returning_tasks != 0
724 || !emptyInbox(cap0)) {
725 // it already has some work, we just grabbed it at
726 // the wrong moment. Or maybe it's deadlocked!
727 releaseCapability(cap0);
728 } else {
729 free_caps[n_free_caps++] = cap0;
730 }
731 }
732 }
733
734 // We now have n_free_caps free capabilities stashed in
735 // free_caps[]. Attempt to share our run queue equally with them.
736 // This is complicated slightly by the fact that we can't move
737 // some threads:
738 //
739 // - threads that have TSO_LOCKED cannot migrate
740 // - a thread that is bound to the current Task cannot be migrated
741 //
742 // This is about the simplest thing we could do; improvements we
743 // might want to do include:
744 //
745 // - giving high priority to moving relatively new threads, on
746 // the gournds that they haven't had time to build up a
747 // working set in the cache on this CPU/Capability.
748 //
749 // - giving low priority to moving long-lived threads
750
751 if (n_free_caps > 0) {
752 StgTSO *prev, *t, *next;
753
754 debugTrace(DEBUG_sched,
755 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
756 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
757 n_free_caps);
758
759 // There are n_free_caps+1 caps in total. We will share the threads
760 // evently between them, *except* that if the run queue does not divide
761 // evenly by n_free_caps+1 then we bias towards the current capability.
762 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
763 uint32_t keep_threads =
764 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
765
766 // This also ensures that we don't give away all our threads, since
767 // (x + y) / (y + 1) >= 1 when x >= 1.
768
769 // The number of threads we have left.
770 uint32_t n = cap->n_run_queue;
771
772 // prev = the previous thread on this cap's run queue
773 prev = END_TSO_QUEUE;
774
775 // We're going to walk through the run queue, migrating threads to other
776 // capabilities until we have only keep_threads left. We might
777 // encounter a thread that cannot be migrated, in which case we add it
778 // to the current run queue and decrement keep_threads.
779 for (t = cap->run_queue_hd, i = 0;
780 t != END_TSO_QUEUE && n > keep_threads;
781 t = next)
782 {
783 next = t->_link;
784 t->_link = END_TSO_QUEUE;
785
786 // Should we keep this thread?
787 if (t->bound == task->incall // don't move my bound thread
788 || tsoLocked(t) // don't move a locked thread
789 ) {
790 if (prev == END_TSO_QUEUE) {
791 cap->run_queue_hd = t;
792 } else {
793 setTSOLink(cap, prev, t);
794 }
795 setTSOPrev(cap, t, prev);
796 prev = t;
797 if (keep_threads > 0) keep_threads--;
798 }
799
800 // Or migrate it?
801 else {
802 appendToRunQueue(free_caps[i],t);
803 traceEventMigrateThread (cap, t, free_caps[i]->no);
804
805 if (t->bound) { t->bound->task->cap = free_caps[i]; }
806 t->cap = free_caps[i];
807 n--; // we have one fewer threads now
808 i++; // move on to the next free_cap
809 if (i == n_free_caps) i = 0;
810 }
811 }
812
813 // Join up the beginning of the queue (prev)
814 // with the rest of the queue (t)
815 if (t == END_TSO_QUEUE) {
816 cap->run_queue_tl = prev;
817 } else {
818 setTSOPrev(cap, t, prev);
819 }
820 if (prev == END_TSO_QUEUE) {
821 cap->run_queue_hd = t;
822 } else {
823 setTSOLink(cap, prev, t);
824 }
825 cap->n_run_queue = n;
826
827 IF_DEBUG(sanity, checkRunQueue(cap));
828
829 // release the capabilities
830 for (i = 0; i < n_free_caps; i++) {
831 task->cap = free_caps[i];
832 if (sparkPoolSizeCap(cap) > 0) {
833 // If we have sparks to steal, wake up a worker on the
834 // capability, even if it has no threads to run.
835 releaseAndWakeupCapability(free_caps[i]);
836 } else {
837 releaseCapability(free_caps[i]);
838 }
839 }
840 }
841 task->cap = cap; // reset to point to our Capability.
842
843 #endif /* THREADED_RTS */
844
845 }
846
847 /* ----------------------------------------------------------------------------
848 * Start any pending signal handlers
849 * ------------------------------------------------------------------------- */
850
851 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
852 static void
853 scheduleStartSignalHandlers(Capability *cap)
854 {
855 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
856 // safe outside the lock
857 startSignalHandlers(cap);
858 }
859 }
860 #else
861 static void
862 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
863 {
864 }
865 #endif
866
867 /* ----------------------------------------------------------------------------
868 * Check for blocked threads that can be woken up.
869 * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
873 {
874 #if !defined(THREADED_RTS)
875 //
876 // Check whether any waiting threads need to be woken up. If the
877 // run queue is empty, and there are no other tasks running, we
878 // can wait indefinitely for something to happen.
879 //
880 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
881 {
882 awaitEvent (emptyRunQueue(cap));
883 }
884 #endif
885 }
886
887 /* ----------------------------------------------------------------------------
888 * Detect deadlock conditions and attempt to resolve them.
889 * ------------------------------------------------------------------------- */
890
891 static void
892 scheduleDetectDeadlock (Capability **pcap, Task *task)
893 {
894 Capability *cap = *pcap;
895 /*
896 * Detect deadlock: when we have no threads to run, there are no
897 * threads blocked, waiting for I/O, or sleeping, and all the
898 * other tasks are waiting for work, we must have a deadlock of
899 * some description.
900 */
901 if ( emptyThreadQueues(cap) )
902 {
903 #if defined(THREADED_RTS)
904 /*
905 * In the threaded RTS, we only check for deadlock if there
906 * has been no activity in a complete timeslice. This means
907 * we won't eagerly start a full GC just because we don't have
908 * any threads to run currently.
909 */
910 if (recent_activity != ACTIVITY_INACTIVE) return;
911 #endif
912
913 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
914
915 // Garbage collection can release some new threads due to
916 // either (a) finalizers or (b) threads resurrected because
917 // they are unreachable and will therefore be sent an
918 // exception. Any threads thus released will be immediately
919 // runnable.
920 scheduleDoGC (pcap, task, true/*force major GC*/);
921 cap = *pcap;
922 // when force_major == true. scheduleDoGC sets
923 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
924 // signal.
925
926 if ( !emptyRunQueue(cap) ) return;
927
928 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
929 /* If we have user-installed signal handlers, then wait
930 * for signals to arrive rather then bombing out with a
931 * deadlock.
932 */
933 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
934 debugTrace(DEBUG_sched,
935 "still deadlocked, waiting for signals...");
936
937 awaitUserSignals();
938
939 if (signals_pending()) {
940 startSignalHandlers(cap);
941 }
942
943 // either we have threads to run, or we were interrupted:
944 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
945
946 return;
947 }
948 #endif
949
950 #if !defined(THREADED_RTS)
951 /* Probably a real deadlock. Send the current main thread the
952 * Deadlock exception.
953 */
954 if (task->incall->tso) {
955 switch (task->incall->tso->why_blocked) {
956 case BlockedOnSTM:
957 case BlockedOnBlackHole:
958 case BlockedOnMsgThrowTo:
959 case BlockedOnMVar:
960 case BlockedOnMVarRead:
961 throwToSingleThreaded(cap, task->incall->tso,
962 (StgClosure *)nonTermination_closure);
963 return;
964 default:
965 barf("deadlock: main thread blocked in a strange way");
966 }
967 }
968 return;
969 #endif
970 }
971 }
972
973
974 /* ----------------------------------------------------------------------------
975 * Process message in the current Capability's inbox
976 * ------------------------------------------------------------------------- */
977
978 static void
979 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
980 {
981 #if defined(THREADED_RTS)
982 Message *m, *next;
983 PutMVar *p, *pnext;
984 int r;
985 Capability *cap = *pcap;
986
987 while (!emptyInbox(cap)) {
988 // Executing messages might use heap, so we should check for GC.
989 if (doYouWantToGC(cap)) {
990 scheduleDoGC(pcap, cap->running_task, false);
991 cap = *pcap;
992 }
993
994 // don't use a blocking acquire; if the lock is held by
995 // another thread then just carry on. This seems to avoid
996 // getting stuck in a message ping-pong situation with other
997 // processors. We'll check the inbox again later anyway.
998 //
999 // We should really use a more efficient queue data structure
1000 // here. The trickiness is that we must ensure a Capability
1001 // never goes idle if the inbox is non-empty, which is why we
1002 // use cap->lock (cap->lock is released as the last thing
1003 // before going idle; see Capability.c:releaseCapability()).
1004 r = TRY_ACQUIRE_LOCK(&cap->lock);
1005 if (r != 0) return;
1006
1007 m = cap->inbox;
1008 p = cap->putMVars;
1009 cap->inbox = (Message*)END_TSO_QUEUE;
1010 cap->putMVars = NULL;
1011
1012 RELEASE_LOCK(&cap->lock);
1013
1014 while (m != (Message*)END_TSO_QUEUE) {
1015 next = m->link;
1016 executeMessage(cap, m);
1017 m = next;
1018 }
1019
1020 while (p != NULL) {
1021 pnext = p->link;
1022 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1023 Unit_closure);
1024 freeStablePtr(p->mvar);
1025 stgFree(p);
1026 p = pnext;
1027 }
1028 }
1029 #endif
1030 }
1031
1032
1033 /* ----------------------------------------------------------------------------
1034 * Activate spark threads (THREADED_RTS)
1035 * ------------------------------------------------------------------------- */
1036
1037 #if defined(THREADED_RTS)
1038 static void
1039 scheduleActivateSpark(Capability *cap)
1040 {
1041 if (anySparks() && !cap->disabled)
1042 {
1043 createSparkThread(cap);
1044 debugTrace(DEBUG_sched, "creating a spark thread");
1045 }
1046 }
1047 #endif // THREADED_RTS
1048
1049 /* ----------------------------------------------------------------------------
1050 * After running a thread...
1051 * ------------------------------------------------------------------------- */
1052
1053 static void
1054 schedulePostRunThread (Capability *cap, StgTSO *t)
1055 {
1056 // We have to be able to catch transactions that are in an
1057 // infinite loop as a result of seeing an inconsistent view of
1058 // memory, e.g.
1059 //
1060 // atomically $ do
1061 // [a,b] <- mapM readTVar [ta,tb]
1062 // when (a == b) loop
1063 //
1064 // and a is never equal to b given a consistent view of memory.
1065 //
1066 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1067 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1068 debugTrace(DEBUG_sched | DEBUG_stm,
1069 "trec %p found wasting its time", t);
1070
1071 // strip the stack back to the
1072 // ATOMICALLY_FRAME, aborting the (nested)
1073 // transaction, and saving the stack of any
1074 // partially-evaluated thunks on the heap.
1075 throwToSingleThreaded_(cap, t, NULL, true);
1076
1077 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1078 }
1079 }
1080
1081 //
1082 // If the current thread's allocation limit has run out, send it
1083 // the AllocationLimitExceeded exception.
1084
1085 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1086 // Use a throwToSelf rather than a throwToSingleThreaded, because
1087 // it correctly handles the case where the thread is currently
1088 // inside mask. Also the thread might be blocked (e.g. on an
1089 // MVar), and throwToSingleThreaded doesn't unblock it
1090 // correctly in that case.
1091 throwToSelf(cap, t, allocationLimitExceeded_closure);
1092 ASSIGN_Int64((W_*)&(t->alloc_limit),
1093 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1094 }
1095
1096 /* some statistics gathering in the parallel case */
1097 }
1098
1099 /* -----------------------------------------------------------------------------
1100 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1101 * -------------------------------------------------------------------------- */
1102
1103 static bool
1104 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1105 {
1106 if (cap->r.rHpLim == NULL || cap->context_switch) {
1107 // Sometimes we miss a context switch, e.g. when calling
1108 // primitives in a tight loop, MAYBE_GC() doesn't check the
1109 // context switch flag, and we end up waiting for a GC.
1110 // See #1984, and concurrent/should_run/1984
1111 cap->context_switch = 0;
1112 appendToRunQueue(cap,t);
1113 } else {
1114 pushOnRunQueue(cap,t);
1115 }
1116
1117 // did the task ask for a large block?
1118 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1119 // if so, get one and push it on the front of the nursery.
1120 bdescr *bd;
1121 W_ blocks;
1122
1123 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1124
1125 if (blocks > BLOCKS_PER_MBLOCK) {
1126 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1127 }
1128
1129 debugTrace(DEBUG_sched,
1130 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1131 (long)t->id, what_next_strs[t->what_next], blocks);
1132
1133 // don't do this if the nursery is (nearly) full, we'll GC first.
1134 if (cap->r.rCurrentNursery->link != NULL ||
1135 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1136 // infinite loop if the
1137 // nursery has only one
1138 // block.
1139
1140 bd = allocGroupOnNode_lock(cap->node,blocks);
1141 cap->r.rNursery->n_blocks += blocks;
1142
1143 // link the new group after CurrentNursery
1144 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1145
1146 // initialise it as a nursery block. We initialise the
1147 // step, gen_no, and flags field of *every* sub-block in
1148 // this large block, because this is easier than making
1149 // sure that we always find the block head of a large
1150 // block whenever we call Bdescr() (eg. evacuate() and
1151 // isAlive() in the GC would both have to do this, at
1152 // least).
1153 {
1154 bdescr *x;
1155 for (x = bd; x < bd + blocks; x++) {
1156 initBdescr(x,g0,g0);
1157 x->free = x->start;
1158 x->flags = 0;
1159 }
1160 }
1161
1162 // This assert can be a killer if the app is doing lots
1163 // of large block allocations.
1164 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1165
1166 // now update the nursery to point to the new block
1167 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1168 cap->r.rCurrentNursery = bd;
1169
1170 // we might be unlucky and have another thread get on the
1171 // run queue before us and steal the large block, but in that
1172 // case the thread will just end up requesting another large
1173 // block.
1174 return false; /* not actually GC'ing */
1175 }
1176 }
1177
1178 return doYouWantToGC(cap);
1179 /* actual GC is done at the end of the while loop in schedule() */
1180 }
1181
1182 /* -----------------------------------------------------------------------------
1183 * Handle a thread that returned to the scheduler with ThreadYielding
1184 * -------------------------------------------------------------------------- */
1185
1186 static bool
1187 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1188 {
1189 /* put the thread back on the run queue. Then, if we're ready to
1190 * GC, check whether this is the last task to stop. If so, wake
1191 * up the GC thread. getThread will block during a GC until the
1192 * GC is finished.
1193 */
1194
1195 ASSERT(t->_link == END_TSO_QUEUE);
1196
1197 // Shortcut if we're just switching evaluators: just run the thread. See
1198 // Note [avoiding threadPaused] in Interpreter.c.
1199 if (t->what_next != prev_what_next) {
1200 debugTrace(DEBUG_sched,
1201 "--<< thread %ld (%s) stopped to switch evaluators",
1202 (long)t->id, what_next_strs[t->what_next]);
1203 return true;
1204 }
1205
1206 // Reset the context switch flag. We don't do this just before
1207 // running the thread, because that would mean we would lose ticks
1208 // during GC, which can lead to unfair scheduling (a thread hogs
1209 // the CPU because the tick always arrives during GC). This way
1210 // penalises threads that do a lot of allocation, but that seems
1211 // better than the alternative.
1212 if (cap->context_switch != 0) {
1213 cap->context_switch = 0;
1214 appendToRunQueue(cap,t);
1215 } else {
1216 pushOnRunQueue(cap,t);
1217 }
1218
1219 IF_DEBUG(sanity,
1220 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1221 checkTSO(t));
1222
1223 return false;
1224 }
1225
1226 /* -----------------------------------------------------------------------------
1227 * Handle a thread that returned to the scheduler with ThreadBlocked
1228 * -------------------------------------------------------------------------- */
1229
1230 static void
1231 scheduleHandleThreadBlocked( StgTSO *t
1232 #if !defined(DEBUG)
1233 STG_UNUSED
1234 #endif
1235 )
1236 {
1237
1238 // We don't need to do anything. The thread is blocked, and it
1239 // has tidied up its stack and placed itself on whatever queue
1240 // it needs to be on.
1241
1242 // ASSERT(t->why_blocked != NotBlocked);
1243 // Not true: for example,
1244 // - the thread may have woken itself up already, because
1245 // threadPaused() might have raised a blocked throwTo
1246 // exception, see maybePerformBlockedException().
1247
1248 #if defined(DEBUG)
1249 traceThreadStatus(DEBUG_sched, t);
1250 #endif
1251 }
1252
1253 /* -----------------------------------------------------------------------------
1254 * Handle a thread that returned to the scheduler with ThreadFinished
1255 * -------------------------------------------------------------------------- */
1256
1257 static bool
1258 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1259 {
1260 /* Need to check whether this was a main thread, and if so,
1261 * return with the return value.
1262 *
1263 * We also end up here if the thread kills itself with an
1264 * uncaught exception, see Exception.cmm.
1265 */
1266
1267 // blocked exceptions can now complete, even if the thread was in
1268 // blocked mode (see #2910).
1269 awakenBlockedExceptionQueue (cap, t);
1270
1271 //
1272 // Check whether the thread that just completed was a bound
1273 // thread, and if so return with the result.
1274 //
1275 // There is an assumption here that all thread completion goes
1276 // through this point; we need to make sure that if a thread
1277 // ends up in the ThreadKilled state, that it stays on the run
1278 // queue so it can be dealt with here.
1279 //
1280
1281 if (t->bound) {
1282
1283 if (t->bound != task->incall) {
1284 #if !defined(THREADED_RTS)
1285 // Must be a bound thread that is not the topmost one. Leave
1286 // it on the run queue until the stack has unwound to the
1287 // point where we can deal with this. Leaving it on the run
1288 // queue also ensures that the garbage collector knows about
1289 // this thread and its return value (it gets dropped from the
1290 // step->threads list so there's no other way to find it).
1291 appendToRunQueue(cap,t);
1292 return false;
1293 #else
1294 // this cannot happen in the threaded RTS, because a
1295 // bound thread can only be run by the appropriate Task.
1296 barf("finished bound thread that isn't mine");
1297 #endif
1298 }
1299
1300 ASSERT(task->incall->tso == t);
1301
1302 if (t->what_next == ThreadComplete) {
1303 if (task->incall->ret) {
1304 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1305 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1306 }
1307 task->incall->rstat = Success;
1308 } else {
1309 if (task->incall->ret) {
1310 *(task->incall->ret) = NULL;
1311 }
1312 if (sched_state >= SCHED_INTERRUPTING) {
1313 if (heap_overflow) {
1314 task->incall->rstat = HeapExhausted;
1315 } else {
1316 task->incall->rstat = Interrupted;
1317 }
1318 } else {
1319 task->incall->rstat = Killed;
1320 }
1321 }
1322 #if defined(DEBUG)
1323 removeThreadLabel((StgWord)task->incall->tso->id);
1324 #endif
1325
1326 // We no longer consider this thread and task to be bound to
1327 // each other. The TSO lives on until it is GC'd, but the
1328 // task is about to be released by the caller, and we don't
1329 // want anyone following the pointer from the TSO to the
1330 // defunct task (which might have already been
1331 // re-used). This was a real bug: the GC updated
1332 // tso->bound->tso which lead to a deadlock.
1333 t->bound = NULL;
1334 task->incall->tso = NULL;
1335
1336 return true; // tells schedule() to return
1337 }
1338
1339 return false;
1340 }
1341
1342 /* -----------------------------------------------------------------------------
1343 * Perform a heap census
1344 * -------------------------------------------------------------------------- */
1345
1346 static bool
1347 scheduleNeedHeapProfile( bool ready_to_gc )
1348 {
1349 // When we have +RTS -i0 and we're heap profiling, do a census at
1350 // every GC. This lets us get repeatable runs for debugging.
1351 if (performHeapProfile ||
1352 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1353 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1354 return true;
1355 } else {
1356 return false;
1357 }
1358 }
1359
1360 /* -----------------------------------------------------------------------------
1361 * stopAllCapabilities()
1362 *
1363 * Stop all Haskell execution. This is used when we need to make some global
1364 * change to the system, such as altering the number of capabilities, or
1365 * forking.
1366 *
1367 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1368 * -------------------------------------------------------------------------- */
1369
1370 #if defined(THREADED_RTS)
1371 static void stopAllCapabilities (Capability **pCap, Task *task)
1372 {
1373 bool was_syncing;
1374 SyncType prev_sync_type;
1375
1376 PendingSync sync = {
1377 .type = SYNC_OTHER,
1378 .idle = NULL,
1379 .task = task
1380 };
1381
1382 do {
1383 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1384 } while (was_syncing);
1385
1386 acquireAllCapabilities(*pCap,task);
1387
1388 pending_sync = 0;
1389 }
1390 #endif
1391
1392 /* -----------------------------------------------------------------------------
1393 * requestSync()
1394 *
1395 * Commence a synchronisation between all capabilities. Normally not called
1396 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1397 * has some special synchronisation requirements.
1398 *
1399 * Returns:
1400 * false if we successfully got a sync
1401 * true if there was another sync request in progress,
1402 * and we yielded to it. The value returned is the
1403 * type of the other sync request.
1404 * -------------------------------------------------------------------------- */
1405
1406 #if defined(THREADED_RTS)
1407 static bool requestSync (
1408 Capability **pcap, Task *task, PendingSync *new_sync,
1409 SyncType *prev_sync_type)
1410 {
1411 PendingSync *sync;
1412
1413 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1414 (StgWord)NULL,
1415 (StgWord)new_sync);
1416
1417 if (sync != NULL)
1418 {
1419 // sync is valid until we have called yieldCapability().
1420 // After the sync is completed, we cannot read that struct any
1421 // more because it has been freed.
1422 *prev_sync_type = sync->type;
1423 do {
1424 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1425 sync->type);
1426 ASSERT(*pcap);
1427 yieldCapability(pcap,task,true);
1428 sync = pending_sync;
1429 } while (sync != NULL);
1430
1431 // NOTE: task->cap might have changed now
1432 return true;
1433 }
1434 else
1435 {
1436 return false;
1437 }
1438 }
1439 #endif
1440
1441 /* -----------------------------------------------------------------------------
1442 * acquireAllCapabilities()
1443 *
1444 * Grab all the capabilities except the one we already hold. Used
1445 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1446 * before a fork (SYNC_OTHER).
1447 *
1448 * Only call this after requestSync(), otherwise a deadlock might
1449 * ensue if another thread is trying to synchronise.
1450 * -------------------------------------------------------------------------- */
1451
1452 #if defined(THREADED_RTS)
1453 static void acquireAllCapabilities(Capability *cap, Task *task)
1454 {
1455 Capability *tmpcap;
1456 uint32_t i;
1457
1458 ASSERT(pending_sync != NULL);
1459 for (i=0; i < n_capabilities; i++) {
1460 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1461 i, n_capabilities);
1462 tmpcap = capabilities[i];
1463 if (tmpcap != cap) {
1464 // we better hope this task doesn't get migrated to
1465 // another Capability while we're waiting for this one.
1466 // It won't, because load balancing happens while we have
1467 // all the Capabilities, but even so it's a slightly
1468 // unsavoury invariant.
1469 task->cap = tmpcap;
1470 waitForCapability(&tmpcap, task);
1471 if (tmpcap->no != i) {
1472 barf("acquireAllCapabilities: got the wrong capability");
1473 }
1474 }
1475 }
1476 task->cap = cap;
1477 }
1478 #endif
1479
1480 /* -----------------------------------------------------------------------------
1481 * releaseAllcapabilities()
1482 *
1483 * Assuming this thread holds all the capabilities, release them all except for
1484 * the one passed in as cap.
1485 * -------------------------------------------------------------------------- */
1486
1487 #if defined(THREADED_RTS)
1488 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1489 {
1490 uint32_t i;
1491
1492 for (i = 0; i < n; i++) {
1493 if (cap->no != i) {
1494 task->cap = capabilities[i];
1495 releaseCapability(capabilities[i]);
1496 }
1497 }
1498 task->cap = cap;
1499 }
1500 #endif
1501
1502 /* -----------------------------------------------------------------------------
1503 * Perform a garbage collection if necessary
1504 * -------------------------------------------------------------------------- */
1505
1506 static void
1507 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1508 bool force_major)
1509 {
1510 Capability *cap = *pcap;
1511 bool heap_census;
1512 uint32_t collect_gen;
1513 bool major_gc;
1514 #if defined(THREADED_RTS)
1515 uint32_t gc_type;
1516 uint32_t i;
1517 uint32_t need_idle;
1518 uint32_t n_gc_threads;
1519 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1520 StgTSO *tso;
1521 bool *idle_cap;
1522 // idle_cap is an array (allocated later) of size n_capabilities, where
1523 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1524 // cycle.
1525 #endif
1526
1527 if (sched_state == SCHED_SHUTTING_DOWN) {
1528 // The final GC has already been done, and the system is
1529 // shutting down. We'll probably deadlock if we try to GC
1530 // now.
1531 return;
1532 }
1533
1534 heap_census = scheduleNeedHeapProfile(true);
1535
1536 // Figure out which generation we are collecting, so that we can
1537 // decide whether this is a parallel GC or not.
1538 collect_gen = calcNeeded(force_major || heap_census, NULL);
1539 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1540
1541 #if defined(THREADED_RTS)
1542 if (sched_state < SCHED_INTERRUPTING
1543 && RtsFlags.ParFlags.parGcEnabled
1544 && collect_gen >= RtsFlags.ParFlags.parGcGen
1545 && ! oldest_gen->mark)
1546 {
1547 gc_type = SYNC_GC_PAR;
1548 } else {
1549 gc_type = SYNC_GC_SEQ;
1550 }
1551
1552 // In order to GC, there must be no threads running Haskell code.
1553 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1554 // capabilities, and release them after the GC has completed. For parallel
1555 // GC, we synchronise all the running threads using requestSync().
1556 //
1557 // Other capabilities are prevented from running yet more Haskell threads if
1558 // pending_sync is set. Tested inside yieldCapability() and
1559 // releaseCapability() in Capability.c
1560
1561 PendingSync sync = {
1562 .type = gc_type,
1563 .idle = NULL,
1564 .task = task
1565 };
1566
1567 {
1568 SyncType prev_sync = 0;
1569 bool was_syncing;
1570 do {
1571 // If -qn is not set and we have more capabilities than cores, set
1572 // the number of GC threads to #cores. We do this here rather than
1573 // in normaliseRtsOpts() because here it will work if the program
1574 // calls setNumCapabilities.
1575 //
1576 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1577 if (n_gc_threads == 0 &&
1578 enabled_capabilities > getNumberOfProcessors()) {
1579 n_gc_threads = getNumberOfProcessors();
1580 }
1581
1582 // This calculation must be inside the loop because
1583 // enabled_capabilities may change if requestSync() below fails and
1584 // we retry.
1585 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1586 if (n_gc_threads >= enabled_capabilities) {
1587 need_idle = 0;
1588 } else {
1589 need_idle = enabled_capabilities - n_gc_threads;
1590 }
1591 } else {
1592 need_idle = 0;
1593 }
1594
1595 // We need an array of size n_capabilities, but since this may
1596 // change each time around the loop we must allocate it afresh.
1597 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1598 sizeof(bool),
1599 "scheduleDoGC");
1600 sync.idle = idle_cap;
1601
1602 // When using +RTS -qn, we need some capabilities to be idle during
1603 // GC. The best bet is to choose some inactive ones, so we look for
1604 // those first:
1605 uint32_t n_idle = need_idle;
1606 for (i=0; i < n_capabilities; i++) {
1607 if (capabilities[i]->disabled) {
1608 idle_cap[i] = true;
1609 } else if (n_idle > 0 &&
1610 capabilities[i]->running_task == NULL) {
1611 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1612 n_idle--;
1613 idle_cap[i] = true;
1614 } else {
1615 idle_cap[i] = false;
1616 }
1617 }
1618 // If we didn't find enough inactive capabilities, just pick some
1619 // more to be idle.
1620 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1621 if (!idle_cap[i] && i != cap->no) {
1622 idle_cap[i] = true;
1623 n_idle--;
1624 }
1625 }
1626 ASSERT(n_idle == 0);
1627
1628 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1629 cap = *pcap;
1630 if (was_syncing) {
1631 stgFree(idle_cap);
1632 }
1633 if (was_syncing &&
1634 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1635 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1636 // someone else had a pending sync request for a GC, so
1637 // let's assume GC has been done and we don't need to GC
1638 // again.
1639 // Exception to this: if SCHED_INTERRUPTING, then we still
1640 // need to do the final GC.
1641 return;
1642 }
1643 if (sched_state == SCHED_SHUTTING_DOWN) {
1644 // The scheduler might now be shutting down. We tested
1645 // this above, but it might have become true since then as
1646 // we yielded the capability in requestSync().
1647 return;
1648 }
1649 } while (was_syncing);
1650 }
1651
1652 stat_startGCSync(gc_threads[cap->no]);
1653
1654 #if defined(DEBUG)
1655 unsigned int old_n_capabilities = n_capabilities;
1656 #endif
1657
1658 interruptAllCapabilities();
1659
1660 // The final shutdown GC is always single-threaded, because it's
1661 // possible that some of the Capabilities have no worker threads.
1662
1663 if (gc_type == SYNC_GC_SEQ) {
1664 traceEventRequestSeqGc(cap);
1665 } else {
1666 traceEventRequestParGc(cap);
1667 }
1668
1669 if (gc_type == SYNC_GC_SEQ) {
1670 // single-threaded GC: grab all the capabilities
1671 acquireAllCapabilities(cap,task);
1672 }
1673 else
1674 {
1675 // If we are load-balancing collections in this
1676 // generation, then we require all GC threads to participate
1677 // in the collection. Otherwise, we only require active
1678 // threads to participate, and we set gc_threads[i]->idle for
1679 // any idle capabilities. The rationale here is that waking
1680 // up an idle Capability takes much longer than just doing any
1681 // GC work on its behalf.
1682
1683 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1684 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1685 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1686 {
1687 for (i=0; i < n_capabilities; i++) {
1688 if (capabilities[i]->disabled) {
1689 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1690 if (idle_cap[i]) {
1691 n_idle_caps++;
1692 }
1693 } else {
1694 if (i != cap->no && idle_cap[i]) {
1695 Capability *tmpcap = capabilities[i];
1696 task->cap = tmpcap;
1697 waitForCapability(&tmpcap, task);
1698 n_idle_caps++;
1699 }
1700 }
1701 }
1702 }
1703 else
1704 {
1705 for (i=0; i < n_capabilities; i++) {
1706 if (capabilities[i]->disabled) {
1707 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1708 if (idle_cap[i]) {
1709 n_idle_caps++;
1710 }
1711 } else if (i != cap->no &&
1712 capabilities[i]->idle >=
1713 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1714 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1715 if (idle_cap[i]) {
1716 n_idle_caps++;
1717 } else {
1718 n_failed_trygrab_idles++;
1719 }
1720 }
1721 }
1722 }
1723 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1724
1725 for (i=0; i < n_capabilities; i++) {
1726 capabilities[i]->idle++;
1727 }
1728
1729 // For all capabilities participating in this GC, wait until
1730 // they have stopped mutating and are standing by for GC.
1731 waitForGcThreads(cap, idle_cap);
1732
1733 // Stable point where we can do a global check on our spark counters
1734 ASSERT(checkSparkCountInvariant());
1735 }
1736
1737 #endif
1738
1739 IF_DEBUG(scheduler, printAllThreads());
1740
1741 delete_threads_and_gc:
1742 /*
1743 * We now have all the capabilities; if we're in an interrupting
1744 * state, then we should take the opportunity to delete all the
1745 * threads in the system.
1746 * Checking for major_gc ensures that the last GC is major.
1747 */
1748 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1749 deleteAllThreads();
1750 #if defined(THREADED_RTS)
1751 // Discard all the sparks from every Capability. Why?
1752 // They'll probably be GC'd anyway since we've killed all the
1753 // threads. It just avoids the GC having to do any work to
1754 // figure out that any remaining sparks are garbage.
1755 for (i = 0; i < n_capabilities; i++) {
1756 capabilities[i]->spark_stats.gcd +=
1757 sparkPoolSize(capabilities[i]->sparks);
1758 // No race here since all Caps are stopped.
1759 discardSparksCap(capabilities[i]);
1760 }
1761 #endif
1762 sched_state = SCHED_SHUTTING_DOWN;
1763 }
1764
1765 /*
1766 * When there are disabled capabilities, we want to migrate any
1767 * threads away from them. Normally this happens in the
1768 * scheduler's loop, but only for unbound threads - it's really
1769 * hard for a bound thread to migrate itself. So we have another
1770 * go here.
1771 */
1772 #if defined(THREADED_RTS)
1773 for (i = enabled_capabilities; i < n_capabilities; i++) {
1774 Capability *tmp_cap, *dest_cap;
1775 tmp_cap = capabilities[i];
1776 ASSERT(tmp_cap->disabled);
1777 if (i != cap->no) {
1778 dest_cap = capabilities[i % enabled_capabilities];
1779 while (!emptyRunQueue(tmp_cap)) {
1780 tso = popRunQueue(tmp_cap);
1781 migrateThread(tmp_cap, tso, dest_cap);
1782 if (tso->bound) {
1783 traceTaskMigrate(tso->bound->task,
1784 tso->bound->task->cap,
1785 dest_cap);
1786 tso->bound->task->cap = dest_cap;
1787 }
1788 }
1789 }
1790 }
1791 #endif
1792
1793 // Do any remaining idle GC work from the previous GC
1794 doIdleGCWork(cap, true /* all of it */);
1795
1796 #if defined(THREADED_RTS)
1797 // reset pending_sync *before* GC, so that when the GC threads
1798 // emerge they don't immediately re-enter the GC.
1799 pending_sync = 0;
1800 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1801 #else
1802 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1803 #endif
1804
1805 // If we're shutting down, don't leave any idle GC work to do.
1806 if (sched_state == SCHED_SHUTTING_DOWN) {
1807 doIdleGCWork(cap, true /* all of it */);
1808 }
1809
1810 traceSparkCounters(cap);
1811
1812 switch (recent_activity) {
1813 case ACTIVITY_INACTIVE:
1814 if (force_major) {
1815 // We are doing a GC because the system has been idle for a
1816 // timeslice and we need to check for deadlock. Record the
1817 // fact that we've done a GC and turn off the timer signal;
1818 // it will get re-enabled if we run any threads after the GC.
1819 recent_activity = ACTIVITY_DONE_GC;
1820 #if !defined(PROFILING)
1821 stopTimer();
1822 #endif
1823 break;
1824 }
1825 // fall through...
1826
1827 case ACTIVITY_MAYBE_NO:
1828 // the GC might have taken long enough for the timer to set
1829 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1830 // but we aren't necessarily deadlocked:
1831 recent_activity = ACTIVITY_YES;
1832 break;
1833
1834 case ACTIVITY_DONE_GC:
1835 // If we are actually active, the scheduler will reset the
1836 // recent_activity flag and re-enable the timer.
1837 break;
1838 }
1839
1840 #if defined(THREADED_RTS)
1841 // Stable point where we can do a global check on our spark counters
1842 ASSERT(checkSparkCountInvariant());
1843 #endif
1844
1845 // The heap census itself is done during GarbageCollect().
1846 if (heap_census) {
1847 performHeapProfile = false;
1848 }
1849
1850 #if defined(THREADED_RTS)
1851
1852 // If n_capabilities has changed during GC, we're in trouble.
1853 ASSERT(n_capabilities == old_n_capabilities);
1854
1855 if (gc_type == SYNC_GC_PAR)
1856 {
1857 for (i = 0; i < n_capabilities; i++) {
1858 if (i != cap->no) {
1859 if (idle_cap[i]) {
1860 ASSERT(capabilities[i]->running_task == task);
1861 task->cap = capabilities[i];
1862 releaseCapability(capabilities[i]);
1863 } else {
1864 ASSERT(capabilities[i]->running_task != task);
1865 }
1866 }
1867 }
1868 task->cap = cap;
1869
1870 // releaseGCThreads() happens *after* we have released idle
1871 // capabilities. Otherwise what can happen is one of the released
1872 // threads starts a new GC, and finds that it can't acquire some of
1873 // the disabled capabilities, because the previous GC still holds
1874 // them, so those disabled capabilities will not be idle during the
1875 // next GC round. However, if we release the capabilities first,
1876 // then they will be free (because they're disabled) when the next
1877 // GC cycle happens.
1878 releaseGCThreads(cap, idle_cap);
1879 }
1880 #endif
1881 if (heap_overflow && sched_state == SCHED_RUNNING) {
1882 // GC set the heap_overflow flag. We should throw an exception if we
1883 // can, or shut down otherwise.
1884
1885 // Get the thread to which Ctrl-C is thrown
1886 StgTSO *main_thread = getTopHandlerThread();
1887 if (main_thread == NULL) {
1888 // GC set the heap_overflow flag, and there is no main thread to
1889 // throw an exception to, so we should proceed with an orderly
1890 // shutdown now. Ultimately we want the main thread to return to
1891 // its caller with HeapExhausted, at which point the caller should
1892 // call hs_exit(). The first step is to delete all the threads.
1893 sched_state = SCHED_INTERRUPTING;
1894 goto delete_threads_and_gc;
1895 }
1896
1897 heap_overflow = false;
1898 const uint64_t allocation_count = getAllocations();
1899 if (RtsFlags.GcFlags.heapLimitGrace <
1900 allocation_count - allocated_bytes_at_heapoverflow ||
1901 allocated_bytes_at_heapoverflow == 0) {
1902 allocated_bytes_at_heapoverflow = allocation_count;
1903 // We used to simply exit, but throwing an exception gives the
1904 // program a chance to clean up. It also lets the exception be
1905 // caught.
1906
1907 // FIXME this is not a good way to tell a program to release
1908 // resources. It is neither reliable (the RTS crashes if it fails
1909 // to allocate memory from the OS) nor very usable (it is always
1910 // thrown to the main thread, which might not be able to do anything
1911 // useful with it). We really should have a more general way to
1912 // release resources in low-memory conditions. Nevertheless, this
1913 // is still a big improvement over just exiting.
1914
1915 // FIXME again: perhaps we should throw a synchronous exception
1916 // instead an asynchronous one, or have a way for the program to
1917 // register a handler to be called when heap overflow happens.
1918 throwToSelf(cap, main_thread, heapOverflow_closure);
1919 }
1920 }
1921
1922 #if defined(THREADED_RTS)
1923 stgFree(idle_cap);
1924
1925 if (gc_type == SYNC_GC_SEQ) {
1926 // release our stash of capabilities.
1927 releaseAllCapabilities(n_capabilities, cap, task);
1928 }
1929 #endif
1930
1931 return;
1932 }
1933
1934 /* ---------------------------------------------------------------------------
1935 * Singleton fork(). Do not copy any running threads.
1936 * ------------------------------------------------------------------------- */
1937
1938 pid_t
1939 forkProcess(HsStablePtr *entry
1940 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1941 STG_UNUSED
1942 #endif
1943 )
1944 {
1945 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1946 pid_t pid;
1947 StgTSO* t,*next;
1948 Capability *cap;
1949 uint32_t g;
1950 Task *task = NULL;
1951 uint32_t i;
1952
1953 debugTrace(DEBUG_sched, "forking!");
1954
1955 task = newBoundTask();
1956
1957 cap = NULL;
1958 waitForCapability(&cap, task);
1959
1960 #if defined(THREADED_RTS)
1961 stopAllCapabilities(&cap, task);
1962 #endif
1963
1964 // no funny business: hold locks while we fork, otherwise if some
1965 // other thread is holding a lock when the fork happens, the data
1966 // structure protected by the lock will forever be in an
1967 // inconsistent state in the child. See also #1391.
1968 ACQUIRE_LOCK(&sched_mutex);
1969 ACQUIRE_LOCK(&sm_mutex);
1970 ACQUIRE_LOCK(&stable_ptr_mutex);
1971 ACQUIRE_LOCK(&stable_name_mutex);
1972 ACQUIRE_LOCK(&task->lock);
1973
1974 for (i=0; i < n_capabilities; i++) {
1975 ACQUIRE_LOCK(&capabilities[i]->lock);
1976 }
1977
1978 #if defined(THREADED_RTS)
1979 ACQUIRE_LOCK(&all_tasks_mutex);
1980 #endif
1981
1982 stopTimer(); // See #4074
1983
1984 #if defined(TRACING)
1985 flushEventLog(); // so that child won't inherit dirty file buffers
1986 #endif
1987
1988 pid = fork();
1989
1990 if (pid) { // parent
1991
1992 startTimer(); // #4074
1993
1994 RELEASE_LOCK(&sched_mutex);
1995 RELEASE_LOCK(&sm_mutex);
1996 RELEASE_LOCK(&stable_ptr_mutex);
1997 RELEASE_LOCK(&stable_name_mutex);
1998 RELEASE_LOCK(&task->lock);
1999
2000 #if defined(THREADED_RTS)
2001 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2002 RELEASE_LOCK(&all_tasks_mutex);
2003 #endif
2004
2005 for (i=0; i < n_capabilities; i++) {
2006 releaseCapability_(capabilities[i],false);
2007 RELEASE_LOCK(&capabilities[i]->lock);
2008 }
2009
2010 boundTaskExiting(task);
2011
2012 // just return the pid
2013 return pid;
2014
2015 } else { // child
2016
2017 #if defined(THREADED_RTS)
2018 initMutex(&sched_mutex);
2019 initMutex(&sm_mutex);
2020 initMutex(&stable_ptr_mutex);
2021 initMutex(&stable_name_mutex);
2022 initMutex(&task->lock);
2023
2024 for (i=0; i < n_capabilities; i++) {
2025 initMutex(&capabilities[i]->lock);
2026 }
2027
2028 initMutex(&all_tasks_mutex);
2029 #endif
2030
2031 #if defined(TRACING)
2032 resetTracing();
2033 #endif
2034
2035 // Now, all OS threads except the thread that forked are
2036 // stopped. We need to stop all Haskell threads, including
2037 // those involved in foreign calls. Also we need to delete
2038 // all Tasks, because they correspond to OS threads that are
2039 // now gone.
2040
2041 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2042 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2043 next = t->global_link;
2044 // don't allow threads to catch the ThreadKilled
2045 // exception, but we do want to raiseAsync() because these
2046 // threads may be evaluating thunks that we need later.
2047 deleteThread_(t);
2048
2049 // stop the GC from updating the InCall to point to
2050 // the TSO. This is only necessary because the
2051 // OSThread bound to the TSO has been killed, and
2052 // won't get a chance to exit in the usual way (see
2053 // also scheduleHandleThreadFinished).
2054 t->bound = NULL;
2055 }
2056 }
2057
2058 discardTasksExcept(task);
2059
2060 for (i=0; i < n_capabilities; i++) {
2061 cap = capabilities[i];
2062
2063 // Empty the run queue. It seems tempting to let all the
2064 // killed threads stay on the run queue as zombies to be
2065 // cleaned up later, but some of them may correspond to
2066 // bound threads for which the corresponding Task does not
2067 // exist.
2068 truncateRunQueue(cap);
2069 cap->n_run_queue = 0;
2070
2071 // Any suspended C-calling Tasks are no more, their OS threads
2072 // don't exist now:
2073 cap->suspended_ccalls = NULL;
2074 cap->n_suspended_ccalls = 0;
2075
2076 #if defined(THREADED_RTS)
2077 // Wipe our spare workers list, they no longer exist. New
2078 // workers will be created if necessary.
2079 cap->spare_workers = NULL;
2080 cap->n_spare_workers = 0;
2081 cap->returning_tasks_hd = NULL;
2082 cap->returning_tasks_tl = NULL;
2083 cap->n_returning_tasks = 0;
2084 #endif
2085
2086 // Release all caps except 0, we'll use that for starting
2087 // the IO manager and running the client action below.
2088 if (cap->no != 0) {
2089 task->cap = cap;
2090 releaseCapability(cap);
2091 }
2092 }
2093 cap = capabilities[0];
2094 task->cap = cap;
2095
2096 // Empty the threads lists. Otherwise, the garbage
2097 // collector may attempt to resurrect some of these threads.
2098 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2099 generations[g].threads = END_TSO_QUEUE;
2100 }
2101
2102 // On Unix, all timers are reset in the child, so we need to start
2103 // the timer again.
2104 initTimer();
2105 startTimer();
2106
2107 // TODO: need to trace various other things in the child
2108 // like startup event, capabilities, process info etc
2109 traceTaskCreate(task, cap);
2110
2111 #if defined(THREADED_RTS)
2112 ioManagerStartCap(&cap);
2113 #endif
2114
2115 // Install toplevel exception handlers, so interruption
2116 // signal will be sent to the main thread.
2117 // See Trac #12903
2118 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2119 rts_checkSchedStatus("forkProcess",cap);
2120
2121 rts_unlock(cap);
2122 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2123 }
2124 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2125 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2126 #endif
2127 }
2128
2129 /* ---------------------------------------------------------------------------
2130 * Changing the number of Capabilities
2131 *
2132 * Changing the number of Capabilities is very tricky! We can only do
2133 * it with the system fully stopped, so we do a full sync with
2134 * requestSync(SYNC_OTHER) and grab all the capabilities.
2135 *
2136 * Then we resize the appropriate data structures, and update all
2137 * references to the old data structures which have now moved.
2138 * Finally we release the Capabilities we are holding, and start
2139 * worker Tasks on the new Capabilities we created.
2140 *
2141 * ------------------------------------------------------------------------- */
2142
2143 void
2144 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2145 {
2146 #if !defined(THREADED_RTS)
2147 if (new_n_capabilities != 1) {
2148 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2149 }
2150 return;
2151 #elif defined(NOSMP)
2152 if (new_n_capabilities != 1) {
2153 errorBelch("setNumCapabilities: not supported on this platform");
2154 }
2155 return;
2156 #else
2157 Task *task;
2158 Capability *cap;
2159 uint32_t n;
2160 Capability *old_capabilities = NULL;
2161 uint32_t old_n_capabilities = n_capabilities;
2162
2163 if (new_n_capabilities == enabled_capabilities) {
2164 return;
2165 } else if (new_n_capabilities <= 0) {
2166 errorBelch("setNumCapabilities: Capability count must be positive");
2167 return;
2168 }
2169
2170
2171 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2172 enabled_capabilities, new_n_capabilities);
2173
2174 cap = rts_lock();
2175 task = cap->running_task;
2176
2177 stopAllCapabilities(&cap, task);
2178
2179 if (new_n_capabilities < enabled_capabilities)
2180 {
2181 // Reducing the number of capabilities: we do not actually
2182 // remove the extra capabilities, we just mark them as
2183 // "disabled". This has the following effects:
2184 //
2185 // - threads on a disabled capability are migrated away by the
2186 // scheduler loop
2187 //
2188 // - disabled capabilities do not participate in GC
2189 // (see scheduleDoGC())
2190 //
2191 // - No spark threads are created on this capability
2192 // (see scheduleActivateSpark())
2193 //
2194 // - We do not attempt to migrate threads *to* a disabled
2195 // capability (see schedulePushWork()).
2196 //
2197 // but in other respects, a disabled capability remains
2198 // alive. Threads may be woken up on a disabled capability,
2199 // but they will be immediately migrated away.
2200 //
2201 // This approach is much easier than trying to actually remove
2202 // the capability; we don't have to worry about GC data
2203 // structures, the nursery, etc.
2204 //
2205 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2206 capabilities[n]->disabled = true;
2207 traceCapDisable(capabilities[n]);
2208 }
2209 enabled_capabilities = new_n_capabilities;
2210 }
2211 else
2212 {
2213 // Increasing the number of enabled capabilities.
2214 //
2215 // enable any disabled capabilities, up to the required number
2216 for (n = enabled_capabilities;
2217 n < new_n_capabilities && n < n_capabilities; n++) {
2218 capabilities[n]->disabled = false;
2219 traceCapEnable(capabilities[n]);
2220 }
2221 enabled_capabilities = n;
2222
2223 if (new_n_capabilities > n_capabilities) {
2224 #if defined(TRACING)
2225 // Allocate eventlog buffers for the new capabilities. Note this
2226 // must be done before calling moreCapabilities(), because that
2227 // will emit events about creating the new capabilities and adding
2228 // them to existing capsets.
2229 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2230 #endif
2231
2232 // Resize the capabilities array
2233 // NB. after this, capabilities points somewhere new. Any pointers
2234 // of type (Capability *) are now invalid.
2235 moreCapabilities(n_capabilities, new_n_capabilities);
2236
2237 // Resize and update storage manager data structures
2238 storageAddCapabilities(n_capabilities, new_n_capabilities);
2239 }
2240 }
2241
2242 // update n_capabilities before things start running
2243 if (new_n_capabilities > n_capabilities) {
2244 n_capabilities = enabled_capabilities = new_n_capabilities;
2245 }
2246
2247 // We're done: release the original Capabilities
2248 releaseAllCapabilities(old_n_capabilities, cap,task);
2249
2250 // We can't free the old array until now, because we access it
2251 // while updating pointers in updateCapabilityRefs().
2252 if (old_capabilities) {
2253 stgFree(old_capabilities);
2254 }
2255
2256 // Notify IO manager that the number of capabilities has changed.
2257 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2258
2259 rts_unlock(cap);
2260
2261 #endif // THREADED_RTS
2262 }
2263
2264
2265
2266 /* ---------------------------------------------------------------------------
2267 * Delete all the threads in the system
2268 * ------------------------------------------------------------------------- */
2269
2270 static void
2271 deleteAllThreads ()
2272 {
2273 // NOTE: only safe to call if we own all capabilities.
2274
2275 StgTSO* t, *next;
2276 uint32_t g;
2277
2278 debugTrace(DEBUG_sched,"deleting all threads");
2279 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2280 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2281 next = t->global_link;
2282 deleteThread(t);
2283 }
2284 }
2285
2286 // The run queue now contains a bunch of ThreadKilled threads. We
2287 // must not throw these away: the main thread(s) will be in there
2288 // somewhere, and the main scheduler loop has to deal with it.
2289 // Also, the run queue is the only thing keeping these threads from
2290 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2291
2292 #if !defined(THREADED_RTS)
2293 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2294 ASSERT(sleeping_queue == END_TSO_QUEUE);
2295 #endif
2296 }
2297
2298 /* -----------------------------------------------------------------------------
2299 Managing the suspended_ccalls list.
2300 Locks required: sched_mutex
2301 -------------------------------------------------------------------------- */
2302
2303 STATIC_INLINE void
2304 suspendTask (Capability *cap, Task *task)
2305 {
2306 InCall *incall;
2307
2308 incall = task->incall;
2309 ASSERT(incall->next == NULL && incall->prev == NULL);
2310 incall->next = cap->suspended_ccalls;
2311 incall->prev = NULL;
2312 if (cap->suspended_ccalls) {
2313 cap->suspended_ccalls->prev = incall;
2314 }
2315 cap->suspended_ccalls = incall;
2316 cap->n_suspended_ccalls++;
2317 }
2318
2319 STATIC_INLINE void
2320 recoverSuspendedTask (Capability *cap, Task *task)
2321 {
2322 InCall *incall;
2323
2324 incall = task->incall;
2325 if (incall->prev) {
2326 incall->prev->next = incall->next;
2327 } else {
2328 ASSERT(cap->suspended_ccalls == incall);
2329 cap->suspended_ccalls = incall->next;
2330 }
2331 if (incall->next) {
2332 incall->next->prev = incall->prev;
2333 }
2334 incall->next = incall->prev = NULL;
2335 cap->n_suspended_ccalls--;
2336 }
2337
2338 /* ---------------------------------------------------------------------------
2339 * Suspending & resuming Haskell threads.
2340 *
2341 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2342 * its capability before calling the C function. This allows another
2343 * task to pick up the capability and carry on running Haskell
2344 * threads. It also means that if the C call blocks, it won't lock
2345 * the whole system.
2346 *
2347 * The Haskell thread making the C call is put to sleep for the
2348 * duration of the call, on the suspended_ccalling_threads queue. We
2349 * give out a token to the task, which it can use to resume the thread
2350 * on return from the C function.
2351 *
2352 * If this is an interruptible C call, this means that the FFI call may be
2353 * unceremoniously terminated and should be scheduled on an
2354 * unbound worker thread.
2355 * ------------------------------------------------------------------------- */
2356
2357 void *
2358 suspendThread (StgRegTable *reg, bool interruptible)
2359 {
2360 Capability *cap;
2361 int saved_errno;
2362 StgTSO *tso;
2363 Task *task;
2364 #if defined(mingw32_HOST_OS)
2365 StgWord32 saved_winerror;
2366 #endif
2367
2368 saved_errno = errno;
2369 #if defined(mingw32_HOST_OS)
2370 saved_winerror = GetLastError();
2371 #endif
2372
2373 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2374 */
2375 cap = regTableToCapability(reg);
2376
2377 task = cap->running_task;
2378 tso = cap->r.rCurrentTSO;
2379
2380 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2381
2382 // XXX this might not be necessary --SDM
2383 tso->what_next = ThreadRunGHC;
2384
2385 threadPaused(cap,tso);
2386
2387 if (interruptible) {
2388 tso->why_blocked = BlockedOnCCall_Interruptible;
2389 } else {
2390 tso->why_blocked = BlockedOnCCall;
2391 }
2392
2393 // Hand back capability
2394 task->incall->suspended_tso = tso;
2395 task->incall->suspended_cap = cap;
2396
2397 // Otherwise allocate() will write to invalid memory.
2398 cap->r.rCurrentTSO = NULL;
2399
2400 ACQUIRE_LOCK(&cap->lock);
2401
2402 suspendTask(cap,task);
2403 cap->in_haskell = false;
2404 releaseCapability_(cap,false);
2405
2406 RELEASE_LOCK(&cap->lock);
2407
2408 errno = saved_errno;
2409 #if defined(mingw32_HOST_OS)
2410 SetLastError(saved_winerror);
2411 #endif
2412 return task;
2413 }
2414
2415 StgRegTable *
2416 resumeThread (void *task_)
2417 {
2418 StgTSO *tso;
2419 InCall *incall;
2420 Capability *cap;
2421 Task *task = task_;
2422 int saved_errno;
2423 #if defined(mingw32_HOST_OS)
2424 StgWord32 saved_winerror;
2425 #endif
2426
2427 saved_errno = errno;
2428 #if defined(mingw32_HOST_OS)
2429 saved_winerror = GetLastError();
2430 #endif
2431
2432 incall = task->incall;
2433 cap = incall->suspended_cap;
2434 task->cap = cap;
2435
2436 // Wait for permission to re-enter the RTS with the result.
2437 waitForCapability(&cap,task);
2438 // we might be on a different capability now... but if so, our
2439 // entry on the suspended_ccalls list will also have been
2440 // migrated.
2441
2442 // Remove the thread from the suspended list
2443 recoverSuspendedTask(cap,task);
2444
2445 tso = incall->suspended_tso;
2446 incall->suspended_tso = NULL;
2447 incall->suspended_cap = NULL;
2448 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2449
2450 traceEventRunThread(cap, tso);
2451
2452 /* Reset blocking status */
2453 tso->why_blocked = NotBlocked;
2454
2455 if ((tso->flags & TSO_BLOCKEX) == 0) {
2456 // avoid locking the TSO if we don't have to
2457 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2458 maybePerformBlockedException(cap,tso);
2459 }
2460 }
2461
2462 cap->r.rCurrentTSO = tso;
2463 cap->in_haskell = true;
2464 errno = saved_errno;
2465 #if defined(mingw32_HOST_OS)
2466 SetLastError(saved_winerror);
2467 #endif
2468
2469 /* We might have GC'd, mark the TSO dirty again */
2470 dirty_TSO(cap,tso);
2471 dirty_STACK(cap,tso->stackobj);
2472
2473 IF_DEBUG(sanity, checkTSO(tso));
2474
2475 return &cap->r;
2476 }
2477
2478 /* ---------------------------------------------------------------------------
2479 * scheduleThread()
2480 *
2481 * scheduleThread puts a thread on the end of the runnable queue.
2482 * This will usually be done immediately after a thread is created.
2483 * The caller of scheduleThread must create the thread using e.g.
2484 * createThread and push an appropriate closure
2485 * on this thread's stack before the scheduler is invoked.
2486 * ------------------------------------------------------------------------ */
2487
2488 void
2489 scheduleThread(Capability *cap, StgTSO *tso)
2490 {
2491 // The thread goes at the *end* of the run-queue, to avoid possible
2492 // starvation of any threads already on the queue.
2493 appendToRunQueue(cap,tso);
2494 }
2495
2496 void
2497 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2498 {
2499 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2500 // move this thread from now on.
2501 #if defined(THREADED_RTS)
2502 cpu %= enabled_capabilities;
2503 if (cpu == cap->no) {
2504 appendToRunQueue(cap,tso);
2505 } else {
2506 migrateThread(cap, tso, capabilities[cpu]);
2507 }
2508 #else
2509 appendToRunQueue(cap,tso);
2510 #endif
2511 }
2512
2513 void
2514 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2515 {
2516 Task *task;
2517 DEBUG_ONLY( StgThreadID id );
2518 Capability *cap;
2519
2520 cap = *pcap;
2521
2522 // We already created/initialised the Task
2523 task = cap->running_task;
2524
2525 // This TSO is now a bound thread; make the Task and TSO
2526 // point to each other.
2527 tso->bound = task->incall;
2528 tso->cap = cap;
2529
2530 task->incall->tso = tso;
2531 task->incall->ret = ret;
2532 task->incall->rstat = NoStatus;
2533
2534 appendToRunQueue(cap,tso);
2535
2536 DEBUG_ONLY( id = tso->id );
2537 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2538
2539 cap = schedule(cap,task);
2540
2541 ASSERT(task->incall->rstat != NoStatus);
2542 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2543
2544 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2545 *pcap = cap;
2546 }
2547
2548 /* ----------------------------------------------------------------------------
2549 * Starting Tasks
2550 * ------------------------------------------------------------------------- */
2551
2552 #if defined(THREADED_RTS)
2553 void scheduleWorker (Capability *cap, Task *task)
2554 {
2555 // schedule() runs without a lock.
2556 cap = schedule(cap,task);
2557
2558 // On exit from schedule(), we have a Capability, but possibly not
2559 // the same one we started with.
2560
2561 // During shutdown, the requirement is that after all the
2562 // Capabilities are shut down, all workers that are shutting down
2563 // have finished workerTaskStop(). This is why we hold on to
2564 // cap->lock until we've finished workerTaskStop() below.
2565 //
2566 // There may be workers still involved in foreign calls; those
2567 // will just block in waitForCapability() because the
2568 // Capability has been shut down.
2569 //
2570 ACQUIRE_LOCK(&cap->lock);
2571 releaseCapability_(cap,false);
2572 workerTaskStop(task);
2573 RELEASE_LOCK(&cap->lock);
2574 }
2575 #endif
2576
2577 /* ---------------------------------------------------------------------------
2578 * Start new worker tasks on Capabilities from--to
2579 * -------------------------------------------------------------------------- */
2580
2581 static void
2582 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2583 {
2584 #if defined(THREADED_RTS)
2585 uint32_t i;
2586 Capability *cap;
2587
2588 for (i = from; i < to; i++) {
2589 cap = capabilities[i];
2590 ACQUIRE_LOCK(&cap->lock);
2591 startWorkerTask(cap);
2592 RELEASE_LOCK(&cap->lock);
2593 }
2594 #endif
2595 }
2596
2597 /* ---------------------------------------------------------------------------
2598 * initScheduler()
2599 *
2600 * Initialise the scheduler. This resets all the queues - if the
2601 * queues contained any threads, they'll be garbage collected at the
2602 * next pass.
2603 *
2604 * ------------------------------------------------------------------------ */
2605
2606 void
2607 initScheduler(void)
2608 {
2609 #if !defined(THREADED_RTS)
2610 blocked_queue_hd = END_TSO_QUEUE;
2611 blocked_queue_tl = END_TSO_QUEUE;
2612 sleeping_queue = END_TSO_QUEUE;
2613 #endif
2614
2615 sched_state = SCHED_RUNNING;
2616 recent_activity = ACTIVITY_YES;
2617
2618 #if defined(THREADED_RTS)
2619 /* Initialise the mutex and condition variables used by
2620 * the scheduler. */
2621 initMutex(&sched_mutex);
2622 #endif
2623
2624 ACQUIRE_LOCK(&sched_mutex);
2625
2626 allocated_bytes_at_heapoverflow = 0;
2627
2628 /* A capability holds the state a native thread needs in
2629 * order to execute STG code. At least one capability is
2630 * floating around (only THREADED_RTS builds have more than one).
2631 */
2632 initCapabilities();
2633
2634 initTaskManager();
2635
2636 /*
2637 * Eagerly start one worker to run each Capability, except for
2638 * Capability 0. The idea is that we're probably going to start a
2639 * bound thread on Capability 0 pretty soon, so we don't want a
2640 * worker task hogging it.
2641 */
2642 startWorkerTasks(1, n_capabilities);
2643
2644 RELEASE_LOCK(&sched_mutex);
2645
2646 }
2647
2648 void
2649 exitScheduler (bool wait_foreign USED_IF_THREADS)
2650 /* see Capability.c, shutdownCapability() */
2651 {
2652 Task *task = NULL;
2653
2654 task = newBoundTask();
2655
2656 // If we haven't killed all the threads yet, do it now.
2657 if (sched_state < SCHED_SHUTTING_DOWN) {
2658 sched_state = SCHED_INTERRUPTING;
2659 Capability *cap = task->cap;
2660 waitForCapability(&cap,task);
2661 scheduleDoGC(&cap,task,true);
2662 ASSERT(task->incall->tso == NULL);
2663 releaseCapability(cap);
2664 }
2665 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2666
2667 shutdownCapabilities(task, wait_foreign);
2668
2669 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2670 // n_failed_trygrab_idles, n_idle_caps);
2671
2672 boundTaskExiting(task);
2673 }
2674
2675 void
2676 freeScheduler( void )
2677 {
2678 uint32_t still_running;
2679
2680 ACQUIRE_LOCK(&sched_mutex);
2681 still_running = freeTaskManager();
2682 // We can only free the Capabilities if there are no Tasks still
2683 // running. We might have a Task about to return from a foreign
2684 // call into waitForCapability(), for example (actually,
2685 // this should be the *only* thing that a still-running Task can
2686 // do at this point, and it will block waiting for the
2687 // Capability).
2688 if (still_running == 0) {
2689 freeCapabilities();
2690 }
2691 RELEASE_LOCK(&sched_mutex);
2692 #if defined(THREADED_RTS)
2693 closeMutex(&sched_mutex);
2694 #endif
2695 }
2696
2697 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2698 void *user USED_IF_NOT_THREADS)
2699 {
2700 #if !defined(THREADED_RTS)
2701 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2702 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2703 evac(user, (StgClosure **)(void *)&sleeping_queue);
2704 #endif
2705 }
2706
2707 /* -----------------------------------------------------------------------------
2708 performGC
2709
2710 This is the interface to the garbage collector from Haskell land.
2711 We provide this so that external C code can allocate and garbage
2712 collect when called from Haskell via _ccall_GC.
2713 -------------------------------------------------------------------------- */
2714
2715 static void
2716 performGC_(bool force_major)
2717 {
2718 Task *task;
2719 Capability *cap = NULL;
2720
2721 // We must grab a new Task here, because the existing Task may be
2722 // associated with a particular Capability, and chained onto the
2723 // suspended_ccalls queue.
2724 task = newBoundTask();
2725
2726 // TODO: do we need to traceTask*() here?
2727
2728 waitForCapability(&cap,task);
2729 scheduleDoGC(&cap,task,force_major);
2730 releaseCapability(cap);
2731 boundTaskExiting(task);
2732 }
2733
2734 void
2735 performGC(void)
2736 {
2737 performGC_(false);
2738 }
2739
2740 void
2741 performMajorGC(void)
2742 {
2743 performGC_(true);
2744 }
2745
2746 /* ---------------------------------------------------------------------------
2747 Interrupt execution.
2748 Might be called inside a signal handler so it mustn't do anything fancy.
2749 ------------------------------------------------------------------------ */
2750
2751 void
2752 interruptStgRts(void)
2753 {
2754 ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2755 sched_state = SCHED_INTERRUPTING;
2756 interruptAllCapabilities();
2757 #if defined(THREADED_RTS)
2758 wakeUpRts();
2759 #endif
2760 }
2761
2762 /* -----------------------------------------------------------------------------
2763 Wake up the RTS
2764
2765 This function causes at least one OS thread to wake up and run the
2766 scheduler loop. It is invoked when the RTS might be deadlocked, or
2767 an external event has arrived that may need servicing (eg. a
2768 keyboard interrupt).
2769
2770 In the single-threaded RTS we don't do anything here; we only have
2771 one thread anyway, and the event that caused us to want to wake up
2772 will have interrupted any blocking system call in progress anyway.
2773 -------------------------------------------------------------------------- */
2774
2775 #if defined(THREADED_RTS)
2776 void wakeUpRts(void)
2777 {
2778 // This forces the IO Manager thread to wakeup, which will
2779 // in turn ensure that some OS thread wakes up and runs the
2780 // scheduler loop, which will cause a GC and deadlock check.
2781 ioManagerWakeup();
2782 }
2783 #endif
2784
2785 /* -----------------------------------------------------------------------------
2786 Deleting threads
2787
2788 This is used for interruption (^C) and forking, and corresponds to
2789 raising an exception but without letting the thread catch the
2790 exception.
2791 -------------------------------------------------------------------------- */
2792
2793 static void
2794 deleteThread (StgTSO *tso)
2795 {
2796 // NOTE: must only be called on a TSO that we have exclusive
2797 // access to, because we will call throwToSingleThreaded() below.
2798 // The TSO must be on the run queue of the Capability we own, or
2799 // we must own all Capabilities.
2800
2801 if (tso->why_blocked != BlockedOnCCall &&
2802 tso->why_blocked != BlockedOnCCall_Interruptible) {
2803 throwToSingleThreaded(tso->cap,tso,NULL);
2804 }
2805 }
2806
2807 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2808 static void
2809 deleteThread_(StgTSO *tso)
2810 { // for forkProcess only:
2811 // like deleteThread(), but we delete threads in foreign calls, too.
2812
2813 if (tso->why_blocked == BlockedOnCCall ||
2814 tso->why_blocked == BlockedOnCCall_Interruptible) {
2815 tso->what_next = ThreadKilled;
2816 appendToRunQueue(tso->cap, tso);
2817 } else {
2818 deleteThread(tso);
2819 }
2820 }
2821 #endif
2822
2823 /* -----------------------------------------------------------------------------
2824 raiseExceptionHelper
2825
2826 This function is called by the raise# primitive, just so that we can
2827 move some of the tricky bits of raising an exception from C-- into
2828 C. Who knows, it might be a useful re-useable thing here too.
2829 -------------------------------------------------------------------------- */
2830
2831 StgWord
2832 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2833 {
2834 Capability *cap = regTableToCapability(reg);
2835 StgThunk *raise_closure = NULL;
2836 StgPtr p, next;
2837 const StgRetInfoTable *info;
2838 //
2839 // This closure represents the expression 'raise# E' where E
2840 // is the exception raise. It is used to overwrite all the
2841 // thunks which are currently under evaluation.
2842 //
2843
2844 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2845 // LDV profiling: stg_raise_info has THUNK as its closure
2846 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2847 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2848 // 1 does not cause any problem unless profiling is performed.
2849 // However, when LDV profiling goes on, we need to linearly scan
2850 // small object pool, where raise_closure is stored, so we should
2851 // use MIN_UPD_SIZE.
2852 //
2853 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2854 // sizeofW(StgClosure)+1);
2855 //
2856
2857 //
2858 // Walk up the stack, looking for the catch frame. On the way,
2859 // we update any closures pointed to from update frames with the
2860 // raise closure that we just built.
2861 //
2862 p = tso->stackobj->sp;
2863 while(1) {
2864 info = get_ret_itbl((StgClosure *)p);
2865 next = p + stack_frame_sizeW((StgClosure *)p);
2866 switch (info->i.type) {
2867
2868 case UPDATE_FRAME:
2869 // Only create raise_closure if we need to.
2870 if (raise_closure == NULL) {
2871 raise_closure =
2872 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2873 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2874 raise_closure->payload[0] = exception;
2875 }
2876 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2877 (StgClosure *)raise_closure);
2878 p = next;
2879 continue;
2880
2881 case ATOMICALLY_FRAME:
2882 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2883 tso->stackobj->sp = p;
2884 return ATOMICALLY_FRAME;
2885
2886 case CATCH_FRAME:
2887 tso->stackobj->sp = p;
2888 return CATCH_FRAME;
2889
2890 case CATCH_STM_FRAME:
2891 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2892 tso->stackobj->sp = p;
2893 return CATCH_STM_FRAME;
2894
2895 case UNDERFLOW_FRAME:
2896 tso->stackobj->sp = p;
2897 threadStackUnderflow(cap,tso);
2898 p = tso->stackobj->sp;
2899 continue;
2900
2901 case STOP_FRAME:
2902 tso->stackobj->sp = p;
2903 return STOP_FRAME;
2904
2905 case CATCH_RETRY_FRAME: {
2906 StgTRecHeader *trec = tso -> trec;
2907 StgTRecHeader *outer = trec -> enclosing_trec;
2908 debugTrace(DEBUG_stm,
2909 "found CATCH_RETRY_FRAME at %p during raise", p);
2910 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2911 stmAbortTransaction(cap, trec);
2912 stmFreeAbortedTRec(cap, trec);
2913 tso -> trec = outer;
2914 p = next;
2915 continue;
2916 }
2917
2918 default:
2919 p = next;
2920 continue;
2921 }
2922 }
2923 }
2924
2925
2926 /* -----------------------------------------------------------------------------
2927 findRetryFrameHelper
2928
2929 This function is called by the retry# primitive. It traverses the stack
2930 leaving tso->sp referring to the frame which should handle the retry.
2931
2932 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2933 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2934
2935 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2936 create) because retries are not considered to be exceptions, despite the
2937 similar implementation.
2938
2939 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2940 not be created within memory transactions.
2941 -------------------------------------------------------------------------- */
2942
2943 StgWord
2944 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2945 {
2946 const StgRetInfoTable *info;
2947 StgPtr p, next;
2948
2949 p = tso->stackobj->sp;
2950 while (1) {
2951 info = get_ret_itbl((const StgClosure *)p);
2952 next = p + stack_frame_sizeW((StgClosure *)p);
2953 switch (info->i.type) {
2954
2955 case ATOMICALLY_FRAME:
2956 debugTrace(DEBUG_stm,
2957 "found ATOMICALLY_FRAME at %p during retry", p);
2958 tso->stackobj->sp = p;
2959 return ATOMICALLY_FRAME;
2960
2961 case CATCH_RETRY_FRAME:
2962 debugTrace(DEBUG_stm,
2963 "found CATCH_RETRY_FRAME at %p during retry", p);
2964 tso->stackobj->sp = p;
2965 return CATCH_RETRY_FRAME;
2966
2967 case CATCH_STM_FRAME: {
2968 StgTRecHeader *trec = tso -> trec;
2969 StgTRecHeader *outer = trec -> enclosing_trec;
2970 debugTrace(DEBUG_stm,
2971 "found CATCH_STM_FRAME at %p during retry", p);
2972 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2973 stmAbortTransaction(cap, trec);
2974 stmFreeAbortedTRec(cap, trec);
2975 tso -> trec = outer;
2976 p = next;
2977 continue;
2978 }
2979
2980 case UNDERFLOW_FRAME:
2981 tso->stackobj->sp = p;
2982 threadStackUnderflow(cap,tso);
2983 p = tso->stackobj->sp;
2984 continue;
2985
2986 default:
2987 ASSERT(info->i.type != CATCH_FRAME);
2988 ASSERT(info->i.type != STOP_FRAME);
2989 p = next;
2990 continue;
2991 }
2992 }
2993 }
2994
2995 /* -----------------------------------------------------------------------------
2996 resurrectThreads is called after garbage collection on the list of
2997 threads found to be garbage. Each of these threads will be woken
2998 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2999 on an MVar, or NonTermination if the thread was blocked on a Black
3000 Hole.
3001
3002 Locks: assumes we hold *all* the capabilities.
3003 -------------------------------------------------------------------------- */
3004
3005 void
3006 resurrectThreads (StgTSO *threads)
3007 {
3008 StgTSO *tso, *next;
3009 Capability *cap;
3010 generation *gen;
3011
3012 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3013 next = tso->global_link;
3014
3015 gen = Bdescr((P_)tso)->gen;
3016 tso->global_link = gen->threads;
3017 gen->threads = tso;
3018
3019 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3020
3021 // Wake up the thread on the Capability it was last on
3022 cap = tso->cap;
3023
3024 switch (tso->why_blocked) {
3025 case BlockedOnMVar:
3026 case BlockedOnMVarRead:
3027 /* Called by GC - sched_mutex lock is currently held. */
3028 throwToSingleThreaded(cap, tso,
3029 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3030 break;
3031 case BlockedOnBlackHole:
3032 throwToSingleThreaded(cap, tso,
3033 (StgClosure *)nonTermination_closure);
3034 break;
3035 case BlockedOnSTM:
3036 throwToSingleThreaded(cap, tso,
3037 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3038 break;
3039 case NotBlocked:
3040 /* This might happen if the thread was blocked on a black hole
3041 * belonging to a thread that we've just woken up (raiseAsync
3042 * can wake up threads, remember...).
3043 */
3044 continue;
3045 case BlockedOnMsgThrowTo:
3046 // This can happen if the target is masking, blocks on a
3047 // black hole, and then is found to be unreachable. In
3048 // this case, we want to let the target wake up and carry
3049 // on, and do nothing to this thread.
3050 continue;
3051 default:
3052 barf("resurrectThreads: thread blocked in a strange way: %d",
3053 tso->why_blocked);
3054 }
3055 }
3056 }