06db3fe81f9f62596822dad9f7906631896c303a
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) {
709 spare_threads = 0;
710 }
711
712 // Figure out how many capabilities we want to wake up. We need at least
713 // sparkPoolSize(cap) plus the number of spare threads we have.
714 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715 if (n_wanted_caps == 0) return;
716
717 // First grab as many free Capabilities as we can. ToDo: we should use
718 // capabilities on the same NUMA node preferably, but not exclusively.
719 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
720 n_free_caps < n_wanted_caps && i != cap->no;
721 i = (i + 1) % n_capabilities) {
722 cap0 = capabilities[i];
723 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724 if (!emptyRunQueue(cap0)
725 || cap0->n_returning_tasks != 0
726 || !emptyInbox(cap0)) {
727 // it already has some work, we just grabbed it at
728 // the wrong moment. Or maybe it's deadlocked!
729 releaseCapability(cap0);
730 } else {
731 free_caps[n_free_caps++] = cap0;
732 }
733 }
734 }
735
736 // We now have n_free_caps free capabilities stashed in
737 // free_caps[]. Attempt to share our run queue equally with them.
738 // This is complicated slightly by the fact that we can't move
739 // some threads:
740 //
741 // - threads that have TSO_LOCKED cannot migrate
742 // - a thread that is bound to the current Task cannot be migrated
743 //
744 // This is about the simplest thing we could do; improvements we
745 // might want to do include:
746 //
747 // - giving high priority to moving relatively new threads, on
748 // the gournds that they haven't had time to build up a
749 // working set in the cache on this CPU/Capability.
750 //
751 // - giving low priority to moving long-lived threads
752
753 if (n_free_caps > 0) {
754 StgTSO *prev, *t, *next;
755
756 debugTrace(DEBUG_sched,
757 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
758 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
759 n_free_caps);
760
761 // There are n_free_caps+1 caps in total. We will share the threads
762 // evently between them, *except* that if the run queue does not divide
763 // evenly by n_free_caps+1 then we bias towards the current capability.
764 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
765 uint32_t keep_threads =
766 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
767
768 // This also ensures that we don't give away all our threads, since
769 // (x + y) / (y + 1) >= 1 when x >= 1.
770
771 // The number of threads we have left.
772 uint32_t n = cap->n_run_queue;
773
774 // prev = the previous thread on this cap's run queue
775 prev = END_TSO_QUEUE;
776
777 // We're going to walk through the run queue, migrating threads to other
778 // capabilities until we have only keep_threads left. We might
779 // encounter a thread that cannot be migrated, in which case we add it
780 // to the current run queue and decrement keep_threads.
781 for (t = cap->run_queue_hd, i = 0;
782 t != END_TSO_QUEUE && n > keep_threads;
783 t = next)
784 {
785 next = t->_link;
786 t->_link = END_TSO_QUEUE;
787
788 // Should we keep this thread?
789 if (t->bound == task->incall // don't move my bound thread
790 || tsoLocked(t) // don't move a locked thread
791 ) {
792 if (prev == END_TSO_QUEUE) {
793 cap->run_queue_hd = t;
794 } else {
795 setTSOLink(cap, prev, t);
796 }
797 setTSOPrev(cap, t, prev);
798 prev = t;
799 if (keep_threads > 0) keep_threads--;
800 }
801
802 // Or migrate it?
803 else {
804 appendToRunQueue(free_caps[i],t);
805 traceEventMigrateThread (cap, t, free_caps[i]->no);
806
807 if (t->bound) { t->bound->task->cap = free_caps[i]; }
808 t->cap = free_caps[i];
809 n--; // we have one fewer threads now
810 i++; // move on to the next free_cap
811 if (i == n_free_caps) i = 0;
812 }
813 }
814
815 // Join up the beginning of the queue (prev)
816 // with the rest of the queue (t)
817 if (t == END_TSO_QUEUE) {
818 cap->run_queue_tl = prev;
819 } else {
820 setTSOPrev(cap, t, prev);
821 }
822 if (prev == END_TSO_QUEUE) {
823 cap->run_queue_hd = t;
824 } else {
825 setTSOLink(cap, prev, t);
826 }
827 cap->n_run_queue = n;
828
829 IF_DEBUG(sanity, checkRunQueue(cap));
830
831 // release the capabilities
832 for (i = 0; i < n_free_caps; i++) {
833 task->cap = free_caps[i];
834 if (sparkPoolSizeCap(cap) > 0) {
835 // If we have sparks to steal, wake up a worker on the
836 // capability, even if it has no threads to run.
837 releaseAndWakeupCapability(free_caps[i]);
838 } else {
839 releaseCapability(free_caps[i]);
840 }
841 }
842 }
843 task->cap = cap; // reset to point to our Capability.
844
845 #endif /* THREADED_RTS */
846
847 }
848
849 /* ----------------------------------------------------------------------------
850 * Start any pending signal handlers
851 * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
858 // safe outside the lock
859 startSignalHandlers(cap);
860 }
861 }
862 #else
863 static void
864 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
865 {
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870 * Check for blocked threads that can be woken up.
871 * ------------------------------------------------------------------------- */
872
873 static void
874 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
875 {
876 #if !defined(THREADED_RTS)
877 //
878 // Check whether any waiting threads need to be woken up. If the
879 // run queue is empty, and there are no other tasks running, we
880 // can wait indefinitely for something to happen.
881 //
882 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
883 {
884 awaitEvent (emptyRunQueue(cap));
885 }
886 #endif
887 }
888
889 /* ----------------------------------------------------------------------------
890 * Detect deadlock conditions and attempt to resolve them.
891 * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability **pcap, Task *task)
895 {
896 Capability *cap = *pcap;
897 /*
898 * Detect deadlock: when we have no threads to run, there are no
899 * threads blocked, waiting for I/O, or sleeping, and all the
900 * other tasks are waiting for work, we must have a deadlock of
901 * some description.
902 */
903 if ( emptyThreadQueues(cap) )
904 {
905 #if defined(THREADED_RTS)
906 /*
907 * In the threaded RTS, we only check for deadlock if there
908 * has been no activity in a complete timeslice. This means
909 * we won't eagerly start a full GC just because we don't have
910 * any threads to run currently.
911 */
912 if (recent_activity != ACTIVITY_INACTIVE) return;
913 #endif
914
915 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
916
917 // Garbage collection can release some new threads due to
918 // either (a) finalizers or (b) threads resurrected because
919 // they are unreachable and will therefore be sent an
920 // exception. Any threads thus released will be immediately
921 // runnable.
922 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
923 cap = *pcap;
924 // when force_major == rtsTrue. scheduleDoGC sets
925 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
926 // signal.
927
928 if ( !emptyRunQueue(cap) ) return;
929
930 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
931 /* If we have user-installed signal handlers, then wait
932 * for signals to arrive rather then bombing out with a
933 * deadlock.
934 */
935 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
936 debugTrace(DEBUG_sched,
937 "still deadlocked, waiting for signals...");
938
939 awaitUserSignals();
940
941 if (signals_pending()) {
942 startSignalHandlers(cap);
943 }
944
945 // either we have threads to run, or we were interrupted:
946 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
947
948 return;
949 }
950 #endif
951
952 #if !defined(THREADED_RTS)
953 /* Probably a real deadlock. Send the current main thread the
954 * Deadlock exception.
955 */
956 if (task->incall->tso) {
957 switch (task->incall->tso->why_blocked) {
958 case BlockedOnSTM:
959 case BlockedOnBlackHole:
960 case BlockedOnMsgThrowTo:
961 case BlockedOnMVar:
962 case BlockedOnMVarRead:
963 throwToSingleThreaded(cap, task->incall->tso,
964 (StgClosure *)nonTermination_closure);
965 return;
966 default:
967 barf("deadlock: main thread blocked in a strange way");
968 }
969 }
970 return;
971 #endif
972 }
973 }
974
975
976 /* ----------------------------------------------------------------------------
977 * Process message in the current Capability's inbox
978 * ------------------------------------------------------------------------- */
979
980 static void
981 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
982 {
983 #if defined(THREADED_RTS)
984 Message *m, *next;
985 PutMVar *p, *pnext;
986 int r;
987 Capability *cap = *pcap;
988
989 while (!emptyInbox(cap)) {
990 if (cap->r.rCurrentNursery->link == NULL ||
991 g0->n_new_large_words >= large_alloc_lim) {
992 scheduleDoGC(pcap, cap->running_task, rtsFalse);
993 cap = *pcap;
994 }
995
996 // don't use a blocking acquire; if the lock is held by
997 // another thread then just carry on. This seems to avoid
998 // getting stuck in a message ping-pong situation with other
999 // processors. We'll check the inbox again later anyway.
1000 //
1001 // We should really use a more efficient queue data structure
1002 // here. The trickiness is that we must ensure a Capability
1003 // never goes idle if the inbox is non-empty, which is why we
1004 // use cap->lock (cap->lock is released as the last thing
1005 // before going idle; see Capability.c:releaseCapability()).
1006 r = TRY_ACQUIRE_LOCK(&cap->lock);
1007 if (r != 0) return;
1008
1009 m = cap->inbox;
1010 p = cap->putMVars;
1011 cap->inbox = (Message*)END_TSO_QUEUE;
1012 cap->putMVars = NULL;
1013
1014 RELEASE_LOCK(&cap->lock);
1015
1016 while (m != (Message*)END_TSO_QUEUE) {
1017 next = m->link;
1018 executeMessage(cap, m);
1019 m = next;
1020 }
1021
1022 while (p != NULL) {
1023 pnext = p->link;
1024 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1025 Unit_closure);
1026 freeStablePtr(p->mvar);
1027 stgFree(p);
1028 p = pnext;
1029 }
1030 }
1031 #endif
1032 }
1033
1034
1035 /* ----------------------------------------------------------------------------
1036 * Activate spark threads (THREADED_RTS)
1037 * ------------------------------------------------------------------------- */
1038
1039 #if defined(THREADED_RTS)
1040 static void
1041 scheduleActivateSpark(Capability *cap)
1042 {
1043 if (anySparks() && !cap->disabled)
1044 {
1045 createSparkThread(cap);
1046 debugTrace(DEBUG_sched, "creating a spark thread");
1047 }
1048 }
1049 #endif // THREADED_RTS
1050
1051 /* ----------------------------------------------------------------------------
1052 * After running a thread...
1053 * ------------------------------------------------------------------------- */
1054
1055 static void
1056 schedulePostRunThread (Capability *cap, StgTSO *t)
1057 {
1058 // We have to be able to catch transactions that are in an
1059 // infinite loop as a result of seeing an inconsistent view of
1060 // memory, e.g.
1061 //
1062 // atomically $ do
1063 // [a,b] <- mapM readTVar [ta,tb]
1064 // when (a == b) loop
1065 //
1066 // and a is never equal to b given a consistent view of memory.
1067 //
1068 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1069 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1070 debugTrace(DEBUG_sched | DEBUG_stm,
1071 "trec %p found wasting its time", t);
1072
1073 // strip the stack back to the
1074 // ATOMICALLY_FRAME, aborting the (nested)
1075 // transaction, and saving the stack of any
1076 // partially-evaluated thunks on the heap.
1077 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1078
1079 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1080 }
1081 }
1082
1083 //
1084 // If the current thread's allocation limit has run out, send it
1085 // the AllocationLimitExceeded exception.
1086
1087 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1088 // Use a throwToSelf rather than a throwToSingleThreaded, because
1089 // it correctly handles the case where the thread is currently
1090 // inside mask. Also the thread might be blocked (e.g. on an
1091 // MVar), and throwToSingleThreaded doesn't unblock it
1092 // correctly in that case.
1093 throwToSelf(cap, t, allocationLimitExceeded_closure);
1094 ASSIGN_Int64((W_*)&(t->alloc_limit),
1095 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1096 }
1097
1098 /* some statistics gathering in the parallel case */
1099 }
1100
1101 /* -----------------------------------------------------------------------------
1102 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1103 * -------------------------------------------------------------------------- */
1104
1105 static rtsBool
1106 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1107 {
1108 if (cap->r.rHpLim == NULL || cap->context_switch) {
1109 // Sometimes we miss a context switch, e.g. when calling
1110 // primitives in a tight loop, MAYBE_GC() doesn't check the
1111 // context switch flag, and we end up waiting for a GC.
1112 // See #1984, and concurrent/should_run/1984
1113 cap->context_switch = 0;
1114 appendToRunQueue(cap,t);
1115 } else {
1116 pushOnRunQueue(cap,t);
1117 }
1118
1119 // did the task ask for a large block?
1120 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1121 // if so, get one and push it on the front of the nursery.
1122 bdescr *bd;
1123 W_ blocks;
1124
1125 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1126
1127 if (blocks > BLOCKS_PER_MBLOCK) {
1128 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1129 }
1130
1131 debugTrace(DEBUG_sched,
1132 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1133 (long)t->id, what_next_strs[t->what_next], blocks);
1134
1135 // don't do this if the nursery is (nearly) full, we'll GC first.
1136 if (cap->r.rCurrentNursery->link != NULL ||
1137 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1138 // infinite loop if the
1139 // nursery has only one
1140 // block.
1141
1142 bd = allocGroupOnNode_lock(cap->node,blocks);
1143 cap->r.rNursery->n_blocks += blocks;
1144
1145 // link the new group after CurrentNursery
1146 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1147
1148 // initialise it as a nursery block. We initialise the
1149 // step, gen_no, and flags field of *every* sub-block in
1150 // this large block, because this is easier than making
1151 // sure that we always find the block head of a large
1152 // block whenever we call Bdescr() (eg. evacuate() and
1153 // isAlive() in the GC would both have to do this, at
1154 // least).
1155 {
1156 bdescr *x;
1157 for (x = bd; x < bd + blocks; x++) {
1158 initBdescr(x,g0,g0);
1159 x->free = x->start;
1160 x->flags = 0;
1161 }
1162 }
1163
1164 // This assert can be a killer if the app is doing lots
1165 // of large block allocations.
1166 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1167
1168 // now update the nursery to point to the new block
1169 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1170 cap->r.rCurrentNursery = bd;
1171
1172 // we might be unlucky and have another thread get on the
1173 // run queue before us and steal the large block, but in that
1174 // case the thread will just end up requesting another large
1175 // block.
1176 return rtsFalse; /* not actually GC'ing */
1177 }
1178 }
1179
1180 // if we got here because we exceeded large_alloc_lim, then
1181 // proceed straight to GC.
1182 if (g0->n_new_large_words >= large_alloc_lim) {
1183 return rtsTrue;
1184 }
1185
1186 // Otherwise, we just ran out of space in the current nursery.
1187 // Grab another nursery if we can.
1188 if (getNewNursery(cap)) {
1189 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1190 return rtsFalse;
1191 }
1192
1193 return rtsTrue;
1194 /* actual GC is done at the end of the while loop in schedule() */
1195 }
1196
1197 /* -----------------------------------------------------------------------------
1198 * Handle a thread that returned to the scheduler with ThreadYielding
1199 * -------------------------------------------------------------------------- */
1200
1201 static rtsBool
1202 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1203 {
1204 /* put the thread back on the run queue. Then, if we're ready to
1205 * GC, check whether this is the last task to stop. If so, wake
1206 * up the GC thread. getThread will block during a GC until the
1207 * GC is finished.
1208 */
1209
1210 ASSERT(t->_link == END_TSO_QUEUE);
1211
1212 // Shortcut if we're just switching evaluators: don't bother
1213 // doing stack squeezing (which can be expensive), just run the
1214 // thread.
1215 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1216 debugTrace(DEBUG_sched,
1217 "--<< thread %ld (%s) stopped to switch evaluators",
1218 (long)t->id, what_next_strs[t->what_next]);
1219 return rtsTrue;
1220 }
1221
1222 // Reset the context switch flag. We don't do this just before
1223 // running the thread, because that would mean we would lose ticks
1224 // during GC, which can lead to unfair scheduling (a thread hogs
1225 // the CPU because the tick always arrives during GC). This way
1226 // penalises threads that do a lot of allocation, but that seems
1227 // better than the alternative.
1228 if (cap->context_switch != 0) {
1229 cap->context_switch = 0;
1230 appendToRunQueue(cap,t);
1231 } else {
1232 pushOnRunQueue(cap,t);
1233 }
1234
1235 IF_DEBUG(sanity,
1236 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237 checkTSO(t));
1238
1239 return rtsFalse;
1240 }
1241
1242 /* -----------------------------------------------------------------------------
1243 * Handle a thread that returned to the scheduler with ThreadBlocked
1244 * -------------------------------------------------------------------------- */
1245
1246 static void
1247 scheduleHandleThreadBlocked( StgTSO *t
1248 #if !defined(DEBUG)
1249 STG_UNUSED
1250 #endif
1251 )
1252 {
1253
1254 // We don't need to do anything. The thread is blocked, and it
1255 // has tidied up its stack and placed itself on whatever queue
1256 // it needs to be on.
1257
1258 // ASSERT(t->why_blocked != NotBlocked);
1259 // Not true: for example,
1260 // - the thread may have woken itself up already, because
1261 // threadPaused() might have raised a blocked throwTo
1262 // exception, see maybePerformBlockedException().
1263
1264 #ifdef DEBUG
1265 traceThreadStatus(DEBUG_sched, t);
1266 #endif
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270 * Handle a thread that returned to the scheduler with ThreadFinished
1271 * -------------------------------------------------------------------------- */
1272
1273 static rtsBool
1274 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1275 {
1276 /* Need to check whether this was a main thread, and if so,
1277 * return with the return value.
1278 *
1279 * We also end up here if the thread kills itself with an
1280 * uncaught exception, see Exception.cmm.
1281 */
1282
1283 // blocked exceptions can now complete, even if the thread was in
1284 // blocked mode (see #2910).
1285 awakenBlockedExceptionQueue (cap, t);
1286
1287 //
1288 // Check whether the thread that just completed was a bound
1289 // thread, and if so return with the result.
1290 //
1291 // There is an assumption here that all thread completion goes
1292 // through this point; we need to make sure that if a thread
1293 // ends up in the ThreadKilled state, that it stays on the run
1294 // queue so it can be dealt with here.
1295 //
1296
1297 if (t->bound) {
1298
1299 if (t->bound != task->incall) {
1300 #if !defined(THREADED_RTS)
1301 // Must be a bound thread that is not the topmost one. Leave
1302 // it on the run queue until the stack has unwound to the
1303 // point where we can deal with this. Leaving it on the run
1304 // queue also ensures that the garbage collector knows about
1305 // this thread and its return value (it gets dropped from the
1306 // step->threads list so there's no other way to find it).
1307 appendToRunQueue(cap,t);
1308 return rtsFalse;
1309 #else
1310 // this cannot happen in the threaded RTS, because a
1311 // bound thread can only be run by the appropriate Task.
1312 barf("finished bound thread that isn't mine");
1313 #endif
1314 }
1315
1316 ASSERT(task->incall->tso == t);
1317
1318 if (t->what_next == ThreadComplete) {
1319 if (task->incall->ret) {
1320 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1321 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1322 }
1323 task->incall->rstat = Success;
1324 } else {
1325 if (task->incall->ret) {
1326 *(task->incall->ret) = NULL;
1327 }
1328 if (sched_state >= SCHED_INTERRUPTING) {
1329 if (heap_overflow) {
1330 task->incall->rstat = HeapExhausted;
1331 } else {
1332 task->incall->rstat = Interrupted;
1333 }
1334 } else {
1335 task->incall->rstat = Killed;
1336 }
1337 }
1338 #ifdef DEBUG
1339 removeThreadLabel((StgWord)task->incall->tso->id);
1340 #endif
1341
1342 // We no longer consider this thread and task to be bound to
1343 // each other. The TSO lives on until it is GC'd, but the
1344 // task is about to be released by the caller, and we don't
1345 // want anyone following the pointer from the TSO to the
1346 // defunct task (which might have already been
1347 // re-used). This was a real bug: the GC updated
1348 // tso->bound->tso which lead to a deadlock.
1349 t->bound = NULL;
1350 task->incall->tso = NULL;
1351
1352 return rtsTrue; // tells schedule() to return
1353 }
1354
1355 return rtsFalse;
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359 * Perform a heap census
1360 * -------------------------------------------------------------------------- */
1361
1362 static rtsBool
1363 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1364 {
1365 // When we have +RTS -i0 and we're heap profiling, do a census at
1366 // every GC. This lets us get repeatable runs for debugging.
1367 if (performHeapProfile ||
1368 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1369 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1370 return rtsTrue;
1371 } else {
1372 return rtsFalse;
1373 }
1374 }
1375
1376 /* -----------------------------------------------------------------------------
1377 * stopAllCapabilities()
1378 *
1379 * Stop all Haskell execution. This is used when we need to make some global
1380 * change to the system, such as altering the number of capabilities, or
1381 * forking.
1382 *
1383 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1384 * -------------------------------------------------------------------------- */
1385
1386 #if defined(THREADED_RTS)
1387 static void stopAllCapabilities (Capability **pCap, Task *task)
1388 {
1389 rtsBool was_syncing;
1390 SyncType prev_sync_type;
1391
1392 PendingSync sync = {
1393 .type = SYNC_OTHER,
1394 .idle = NULL,
1395 .task = task
1396 };
1397
1398 do {
1399 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1400 } while (was_syncing);
1401
1402 acquireAllCapabilities(*pCap,task);
1403
1404 pending_sync = 0;
1405 }
1406 #endif
1407
1408 /* -----------------------------------------------------------------------------
1409 * requestSync()
1410 *
1411 * Commence a synchronisation between all capabilities. Normally not called
1412 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1413 * has some special synchronisation requirements.
1414 *
1415 * Returns:
1416 * rtsFalse if we successfully got a sync
1417 * rtsTrue if there was another sync request in progress,
1418 * and we yielded to it. The value returned is the
1419 * type of the other sync request.
1420 * -------------------------------------------------------------------------- */
1421
1422 #if defined(THREADED_RTS)
1423 static rtsBool requestSync (
1424 Capability **pcap, Task *task, PendingSync *new_sync,
1425 SyncType *prev_sync_type)
1426 {
1427 PendingSync *sync;
1428
1429 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1430 (StgWord)NULL,
1431 (StgWord)new_sync);
1432
1433 if (sync != NULL)
1434 {
1435 // sync is valid until we have called yieldCapability().
1436 // After the sync is completed, we cannot read that struct any
1437 // more because it has been freed.
1438 *prev_sync_type = sync->type;
1439 do {
1440 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1441 sync->type);
1442 ASSERT(*pcap);
1443 yieldCapability(pcap,task,rtsTrue);
1444 sync = pending_sync;
1445 } while (sync != NULL);
1446
1447 // NOTE: task->cap might have changed now
1448 return rtsTrue;
1449 }
1450 else
1451 {
1452 return rtsFalse;
1453 }
1454 }
1455 #endif
1456
1457 /* -----------------------------------------------------------------------------
1458 * acquireAllCapabilities()
1459 *
1460 * Grab all the capabilities except the one we already hold. Used
1461 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1462 * before a fork (SYNC_OTHER).
1463 *
1464 * Only call this after requestSync(), otherwise a deadlock might
1465 * ensue if another thread is trying to synchronise.
1466 * -------------------------------------------------------------------------- */
1467
1468 #ifdef THREADED_RTS
1469 static void acquireAllCapabilities(Capability *cap, Task *task)
1470 {
1471 Capability *tmpcap;
1472 uint32_t i;
1473
1474 ASSERT(pending_sync != NULL);
1475 for (i=0; i < n_capabilities; i++) {
1476 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1477 i, n_capabilities);
1478 tmpcap = capabilities[i];
1479 if (tmpcap != cap) {
1480 // we better hope this task doesn't get migrated to
1481 // another Capability while we're waiting for this one.
1482 // It won't, because load balancing happens while we have
1483 // all the Capabilities, but even so it's a slightly
1484 // unsavoury invariant.
1485 task->cap = tmpcap;
1486 waitForCapability(&tmpcap, task);
1487 if (tmpcap->no != i) {
1488 barf("acquireAllCapabilities: got the wrong capability");
1489 }
1490 }
1491 }
1492 task->cap = cap;
1493 }
1494 #endif
1495
1496 /* -----------------------------------------------------------------------------
1497 * releaseAllcapabilities()
1498 *
1499 * Assuming this thread holds all the capabilities, release them all except for
1500 * the one passed in as cap.
1501 * -------------------------------------------------------------------------- */
1502
1503 #ifdef THREADED_RTS
1504 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1505 {
1506 uint32_t i;
1507
1508 for (i = 0; i < n; i++) {
1509 if (cap->no != i) {
1510 task->cap = capabilities[i];
1511 releaseCapability(capabilities[i]);
1512 }
1513 }
1514 task->cap = cap;
1515 }
1516 #endif
1517
1518 /* -----------------------------------------------------------------------------
1519 * Perform a garbage collection if necessary
1520 * -------------------------------------------------------------------------- */
1521
1522 static void
1523 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1524 rtsBool force_major)
1525 {
1526 Capability *cap = *pcap;
1527 rtsBool heap_census;
1528 uint32_t collect_gen;
1529 rtsBool major_gc;
1530 #ifdef THREADED_RTS
1531 uint32_t gc_type;
1532 uint32_t i;
1533 uint32_t need_idle;
1534 uint32_t n_gc_threads;
1535 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1536 StgTSO *tso;
1537 rtsBool *idle_cap;
1538 #endif
1539
1540 if (sched_state == SCHED_SHUTTING_DOWN) {
1541 // The final GC has already been done, and the system is
1542 // shutting down. We'll probably deadlock if we try to GC
1543 // now.
1544 return;
1545 }
1546
1547 heap_census = scheduleNeedHeapProfile(rtsTrue);
1548
1549 // Figure out which generation we are collecting, so that we can
1550 // decide whether this is a parallel GC or not.
1551 collect_gen = calcNeeded(force_major || heap_census, NULL);
1552 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1553
1554 #ifdef THREADED_RTS
1555 if (sched_state < SCHED_INTERRUPTING
1556 && RtsFlags.ParFlags.parGcEnabled
1557 && collect_gen >= RtsFlags.ParFlags.parGcGen
1558 && ! oldest_gen->mark)
1559 {
1560 gc_type = SYNC_GC_PAR;
1561 } else {
1562 gc_type = SYNC_GC_SEQ;
1563 }
1564
1565 // In order to GC, there must be no threads running Haskell code.
1566 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1567 // capabilities, and release them after the GC has completed. For parallel
1568 // GC, we synchronise all the running threads using requestSync().
1569 //
1570 // Other capabilities are prevented from running yet more Haskell threads if
1571 // pending_sync is set. Tested inside yieldCapability() and
1572 // releaseCapability() in Capability.c
1573
1574 PendingSync sync = {
1575 .type = gc_type,
1576 .idle = NULL,
1577 .task = task
1578 };
1579
1580 {
1581 SyncType prev_sync = 0;
1582 rtsBool was_syncing;
1583 do {
1584 // If -qn is not set and we have more capabilities than cores, set
1585 // the number of GC threads to #cores. We do this here rather than
1586 // in normaliseRtsOpts() because here it will work if the program
1587 // calls setNumCapabilities.
1588 //
1589 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1590 if (n_gc_threads == 0 &&
1591 enabled_capabilities > getNumberOfProcessors()) {
1592 n_gc_threads = getNumberOfProcessors();
1593 }
1594
1595 // This calculation must be inside the loop because
1596 // enabled_capabilities may change if requestSync() below fails and
1597 // we retry.
1598 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1599 need_idle = stg_max(0, enabled_capabilities - n_gc_threads);
1600 } else {
1601 need_idle = 0;
1602 }
1603
1604 // We need an array of size n_capabilities, but since this may
1605 // change each time around the loop we must allocate it afresh.
1606 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1607 sizeof(rtsBool),
1608 "scheduleDoGC");
1609 sync.idle = idle_cap;
1610
1611 // When using +RTS -qn, we need some capabilities to be idle during
1612 // GC. The best bet is to choose some inactive ones, so we look for
1613 // those first:
1614 uint32_t n_idle = need_idle;
1615 for (i=0; i < n_capabilities; i++) {
1616 if (capabilities[i]->disabled) {
1617 idle_cap[i] = rtsTrue;
1618 } else if (n_idle > 0 &&
1619 capabilities[i]->running_task == NULL) {
1620 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1621 n_idle--;
1622 idle_cap[i] = rtsTrue;
1623 } else {
1624 idle_cap[i] = rtsFalse;
1625 }
1626 }
1627 // If we didn't find enough inactive capabilities, just pick some
1628 // more to be idle.
1629 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1630 if (!idle_cap[i] && i != cap->no) {
1631 idle_cap[i] = rtsTrue;
1632 n_idle--;
1633 }
1634 }
1635 ASSERT(n_idle == 0);
1636
1637 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1638 cap = *pcap;
1639 if (was_syncing) {
1640 stgFree(idle_cap);
1641 }
1642 if (was_syncing &&
1643 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1644 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1645 // someone else had a pending sync request for a GC, so
1646 // let's assume GC has been done and we don't need to GC
1647 // again.
1648 // Exception to this: if SCHED_INTERRUPTING, then we still
1649 // need to do the final GC.
1650 return;
1651 }
1652 if (sched_state == SCHED_SHUTTING_DOWN) {
1653 // The scheduler might now be shutting down. We tested
1654 // this above, but it might have become true since then as
1655 // we yielded the capability in requestSync().
1656 return;
1657 }
1658 } while (was_syncing);
1659 }
1660
1661 stat_startGCSync(gc_threads[cap->no]);
1662
1663 #ifdef DEBUG
1664 unsigned int old_n_capabilities = n_capabilities;
1665 #endif
1666
1667 interruptAllCapabilities();
1668
1669 // The final shutdown GC is always single-threaded, because it's
1670 // possible that some of the Capabilities have no worker threads.
1671
1672 if (gc_type == SYNC_GC_SEQ) {
1673 traceEventRequestSeqGc(cap);
1674 } else {
1675 traceEventRequestParGc(cap);
1676 }
1677
1678 if (gc_type == SYNC_GC_SEQ) {
1679 // single-threaded GC: grab all the capabilities
1680 acquireAllCapabilities(cap,task);
1681 }
1682 else
1683 {
1684 // If we are load-balancing collections in this
1685 // generation, then we require all GC threads to participate
1686 // in the collection. Otherwise, we only require active
1687 // threads to participate, and we set gc_threads[i]->idle for
1688 // any idle capabilities. The rationale here is that waking
1689 // up an idle Capability takes much longer than just doing any
1690 // GC work on its behalf.
1691
1692 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1693 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1694 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1695 {
1696 for (i=0; i < n_capabilities; i++) {
1697 if (capabilities[i]->disabled) {
1698 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1699 if (idle_cap[i]) {
1700 n_idle_caps++;
1701 }
1702 } else {
1703 if (i != cap->no && idle_cap[i]) {
1704 Capability *tmpcap = capabilities[i];
1705 task->cap = tmpcap;
1706 waitForCapability(&tmpcap, task);
1707 n_idle_caps++;
1708 }
1709 }
1710 }
1711 }
1712 else
1713 {
1714 for (i=0; i < n_capabilities; i++) {
1715 if (capabilities[i]->disabled) {
1716 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1717 if (idle_cap[i]) {
1718 n_idle_caps++;
1719 }
1720 } else if (i != cap->no &&
1721 capabilities[i]->idle >=
1722 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1723 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1724 if (idle_cap[i]) {
1725 n_idle_caps++;
1726 } else {
1727 n_failed_trygrab_idles++;
1728 }
1729 }
1730 }
1731 }
1732 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1733
1734 // We set the gc_thread[i]->idle flag if that
1735 // capability/thread is not participating in this collection.
1736 // We also keep a local record of which capabilities are idle
1737 // in idle_cap[], because scheduleDoGC() is re-entrant:
1738 // another thread might start a GC as soon as we've finished
1739 // this one, and thus the gc_thread[]->idle flags are invalid
1740 // as soon as we release any threads after GC. Getting this
1741 // wrong leads to a rare and hard to debug deadlock!
1742
1743 for (i=0; i < n_capabilities; i++) {
1744 gc_threads[i]->idle = idle_cap[i];
1745 capabilities[i]->idle++;
1746 }
1747
1748 // For all capabilities participating in this GC, wait until
1749 // they have stopped mutating and are standing by for GC.
1750 waitForGcThreads(cap);
1751
1752 #if defined(THREADED_RTS)
1753 // Stable point where we can do a global check on our spark counters
1754 ASSERT(checkSparkCountInvariant());
1755 #endif
1756 }
1757
1758 #endif
1759
1760 IF_DEBUG(scheduler, printAllThreads());
1761
1762 delete_threads_and_gc:
1763 /*
1764 * We now have all the capabilities; if we're in an interrupting
1765 * state, then we should take the opportunity to delete all the
1766 * threads in the system.
1767 * Checking for major_gc ensures that the last GC is major.
1768 */
1769 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1770 deleteAllThreads(cap);
1771 #if defined(THREADED_RTS)
1772 // Discard all the sparks from every Capability. Why?
1773 // They'll probably be GC'd anyway since we've killed all the
1774 // threads. It just avoids the GC having to do any work to
1775 // figure out that any remaining sparks are garbage.
1776 for (i = 0; i < n_capabilities; i++) {
1777 capabilities[i]->spark_stats.gcd +=
1778 sparkPoolSize(capabilities[i]->sparks);
1779 // No race here since all Caps are stopped.
1780 discardSparksCap(capabilities[i]);
1781 }
1782 #endif
1783 sched_state = SCHED_SHUTTING_DOWN;
1784 }
1785
1786 /*
1787 * When there are disabled capabilities, we want to migrate any
1788 * threads away from them. Normally this happens in the
1789 * scheduler's loop, but only for unbound threads - it's really
1790 * hard for a bound thread to migrate itself. So we have another
1791 * go here.
1792 */
1793 #if defined(THREADED_RTS)
1794 for (i = enabled_capabilities; i < n_capabilities; i++) {
1795 Capability *tmp_cap, *dest_cap;
1796 tmp_cap = capabilities[i];
1797 ASSERT(tmp_cap->disabled);
1798 if (i != cap->no) {
1799 dest_cap = capabilities[i % enabled_capabilities];
1800 while (!emptyRunQueue(tmp_cap)) {
1801 tso = popRunQueue(tmp_cap);
1802 migrateThread(tmp_cap, tso, dest_cap);
1803 if (tso->bound) {
1804 traceTaskMigrate(tso->bound->task,
1805 tso->bound->task->cap,
1806 dest_cap);
1807 tso->bound->task->cap = dest_cap;
1808 }
1809 }
1810 }
1811 }
1812 #endif
1813
1814 #if defined(THREADED_RTS)
1815 // reset pending_sync *before* GC, so that when the GC threads
1816 // emerge they don't immediately re-enter the GC.
1817 pending_sync = 0;
1818 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1819 #else
1820 GarbageCollect(collect_gen, heap_census, 0, cap);
1821 #endif
1822
1823 traceSparkCounters(cap);
1824
1825 switch (recent_activity) {
1826 case ACTIVITY_INACTIVE:
1827 if (force_major) {
1828 // We are doing a GC because the system has been idle for a
1829 // timeslice and we need to check for deadlock. Record the
1830 // fact that we've done a GC and turn off the timer signal;
1831 // it will get re-enabled if we run any threads after the GC.
1832 recent_activity = ACTIVITY_DONE_GC;
1833 #ifndef PROFILING
1834 stopTimer();
1835 #endif
1836 break;
1837 }
1838 // fall through...
1839
1840 case ACTIVITY_MAYBE_NO:
1841 // the GC might have taken long enough for the timer to set
1842 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1843 // but we aren't necessarily deadlocked:
1844 recent_activity = ACTIVITY_YES;
1845 break;
1846
1847 case ACTIVITY_DONE_GC:
1848 // If we are actually active, the scheduler will reset the
1849 // recent_activity flag and re-enable the timer.
1850 break;
1851 }
1852
1853 #if defined(THREADED_RTS)
1854 // Stable point where we can do a global check on our spark counters
1855 ASSERT(checkSparkCountInvariant());
1856 #endif
1857
1858 // The heap census itself is done during GarbageCollect().
1859 if (heap_census) {
1860 performHeapProfile = rtsFalse;
1861 }
1862
1863 #if defined(THREADED_RTS)
1864
1865 // If n_capabilities has changed during GC, we're in trouble.
1866 ASSERT(n_capabilities == old_n_capabilities);
1867
1868 if (gc_type == SYNC_GC_PAR)
1869 {
1870 releaseGCThreads(cap);
1871 for (i = 0; i < n_capabilities; i++) {
1872 if (i != cap->no) {
1873 if (idle_cap[i]) {
1874 ASSERT(capabilities[i]->running_task == task);
1875 task->cap = capabilities[i];
1876 releaseCapability(capabilities[i]);
1877 } else {
1878 ASSERT(capabilities[i]->running_task != task);
1879 }
1880 }
1881 }
1882 task->cap = cap;
1883 }
1884 #endif
1885
1886 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1887 // GC set the heap_overflow flag, so we should proceed with
1888 // an orderly shutdown now. Ultimately we want the main
1889 // thread to return to its caller with HeapExhausted, at which
1890 // point the caller should call hs_exit(). The first step is
1891 // to delete all the threads.
1892 //
1893 // Another way to do this would be to raise an exception in
1894 // the main thread, which we really should do because it gives
1895 // the program a chance to clean up. But how do we find the
1896 // main thread? It should presumably be the same one that
1897 // gets ^C exceptions, but that's all done on the Haskell side
1898 // (GHC.TopHandler).
1899 sched_state = SCHED_INTERRUPTING;
1900 goto delete_threads_and_gc;
1901 }
1902
1903 #ifdef SPARKBALANCE
1904 /* JB
1905 Once we are all together... this would be the place to balance all
1906 spark pools. No concurrent stealing or adding of new sparks can
1907 occur. Should be defined in Sparks.c. */
1908 balanceSparkPoolsCaps(n_capabilities, capabilities);
1909 #endif
1910
1911 #if defined(THREADED_RTS)
1912 stgFree(idle_cap);
1913
1914 if (gc_type == SYNC_GC_SEQ) {
1915 // release our stash of capabilities.
1916 releaseAllCapabilities(n_capabilities, cap, task);
1917 }
1918 #endif
1919
1920 return;
1921 }
1922
1923 /* ---------------------------------------------------------------------------
1924 * Singleton fork(). Do not copy any running threads.
1925 * ------------------------------------------------------------------------- */
1926
1927 pid_t
1928 forkProcess(HsStablePtr *entry
1929 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1930 STG_UNUSED
1931 #endif
1932 )
1933 {
1934 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1935 pid_t pid;
1936 StgTSO* t,*next;
1937 Capability *cap;
1938 uint32_t g;
1939 Task *task = NULL;
1940 uint32_t i;
1941
1942 debugTrace(DEBUG_sched, "forking!");
1943
1944 task = newBoundTask();
1945
1946 cap = NULL;
1947 waitForCapability(&cap, task);
1948
1949 #ifdef THREADED_RTS
1950 stopAllCapabilities(&cap, task);
1951 #endif
1952
1953 // no funny business: hold locks while we fork, otherwise if some
1954 // other thread is holding a lock when the fork happens, the data
1955 // structure protected by the lock will forever be in an
1956 // inconsistent state in the child. See also #1391.
1957 ACQUIRE_LOCK(&sched_mutex);
1958 ACQUIRE_LOCK(&sm_mutex);
1959 ACQUIRE_LOCK(&stable_mutex);
1960 ACQUIRE_LOCK(&task->lock);
1961
1962 for (i=0; i < n_capabilities; i++) {
1963 ACQUIRE_LOCK(&capabilities[i]->lock);
1964 }
1965
1966 #ifdef THREADED_RTS
1967 ACQUIRE_LOCK(&all_tasks_mutex);
1968 #endif
1969
1970 stopTimer(); // See #4074
1971
1972 #if defined(TRACING)
1973 flushEventLog(); // so that child won't inherit dirty file buffers
1974 #endif
1975
1976 pid = fork();
1977
1978 if (pid) { // parent
1979
1980 startTimer(); // #4074
1981
1982 RELEASE_LOCK(&sched_mutex);
1983 RELEASE_LOCK(&sm_mutex);
1984 RELEASE_LOCK(&stable_mutex);
1985 RELEASE_LOCK(&task->lock);
1986
1987 for (i=0; i < n_capabilities; i++) {
1988 releaseCapability_(capabilities[i],rtsFalse);
1989 RELEASE_LOCK(&capabilities[i]->lock);
1990 }
1991
1992 #ifdef THREADED_RTS
1993 RELEASE_LOCK(&all_tasks_mutex);
1994 #endif
1995
1996 boundTaskExiting(task);
1997
1998 // just return the pid
1999 return pid;
2000
2001 } else { // child
2002
2003 #if defined(THREADED_RTS)
2004 initMutex(&sched_mutex);
2005 initMutex(&sm_mutex);
2006 initMutex(&stable_mutex);
2007 initMutex(&task->lock);
2008
2009 for (i=0; i < n_capabilities; i++) {
2010 initMutex(&capabilities[i]->lock);
2011 }
2012
2013 initMutex(&all_tasks_mutex);
2014 #endif
2015
2016 #ifdef TRACING
2017 resetTracing();
2018 #endif
2019
2020 // Now, all OS threads except the thread that forked are
2021 // stopped. We need to stop all Haskell threads, including
2022 // those involved in foreign calls. Also we need to delete
2023 // all Tasks, because they correspond to OS threads that are
2024 // now gone.
2025
2026 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2027 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2028 next = t->global_link;
2029 // don't allow threads to catch the ThreadKilled
2030 // exception, but we do want to raiseAsync() because these
2031 // threads may be evaluating thunks that we need later.
2032 deleteThread_(t->cap,t);
2033
2034 // stop the GC from updating the InCall to point to
2035 // the TSO. This is only necessary because the
2036 // OSThread bound to the TSO has been killed, and
2037 // won't get a chance to exit in the usual way (see
2038 // also scheduleHandleThreadFinished).
2039 t->bound = NULL;
2040 }
2041 }
2042
2043 discardTasksExcept(task);
2044
2045 for (i=0; i < n_capabilities; i++) {
2046 cap = capabilities[i];
2047
2048 // Empty the run queue. It seems tempting to let all the
2049 // killed threads stay on the run queue as zombies to be
2050 // cleaned up later, but some of them may correspond to
2051 // bound threads for which the corresponding Task does not
2052 // exist.
2053 truncateRunQueue(cap);
2054 cap->n_run_queue = 0;
2055
2056 // Any suspended C-calling Tasks are no more, their OS threads
2057 // don't exist now:
2058 cap->suspended_ccalls = NULL;
2059 cap->n_suspended_ccalls = 0;
2060
2061 #if defined(THREADED_RTS)
2062 // Wipe our spare workers list, they no longer exist. New
2063 // workers will be created if necessary.
2064 cap->spare_workers = NULL;
2065 cap->n_spare_workers = 0;
2066 cap->returning_tasks_hd = NULL;
2067 cap->returning_tasks_tl = NULL;
2068 cap->n_returning_tasks = 0;
2069 #endif
2070
2071 // Release all caps except 0, we'll use that for starting
2072 // the IO manager and running the client action below.
2073 if (cap->no != 0) {
2074 task->cap = cap;
2075 releaseCapability(cap);
2076 }
2077 }
2078 cap = capabilities[0];
2079 task->cap = cap;
2080
2081 // Empty the threads lists. Otherwise, the garbage
2082 // collector may attempt to resurrect some of these threads.
2083 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2084 generations[g].threads = END_TSO_QUEUE;
2085 }
2086
2087 // On Unix, all timers are reset in the child, so we need to start
2088 // the timer again.
2089 initTimer();
2090 startTimer();
2091
2092 // TODO: need to trace various other things in the child
2093 // like startup event, capabilities, process info etc
2094 traceTaskCreate(task, cap);
2095
2096 #if defined(THREADED_RTS)
2097 ioManagerStartCap(&cap);
2098 #endif
2099
2100 rts_evalStableIO(&cap, entry, NULL); // run the action
2101 rts_checkSchedStatus("forkProcess",cap);
2102
2103 rts_unlock(cap);
2104 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2105 }
2106 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2107 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2108 #endif
2109 }
2110
2111 /* ---------------------------------------------------------------------------
2112 * Changing the number of Capabilities
2113 *
2114 * Changing the number of Capabilities is very tricky! We can only do
2115 * it with the system fully stopped, so we do a full sync with
2116 * requestSync(SYNC_OTHER) and grab all the capabilities.
2117 *
2118 * Then we resize the appropriate data structures, and update all
2119 * references to the old data structures which have now moved.
2120 * Finally we release the Capabilities we are holding, and start
2121 * worker Tasks on the new Capabilities we created.
2122 *
2123 * ------------------------------------------------------------------------- */
2124
2125 void
2126 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2127 {
2128 #if !defined(THREADED_RTS)
2129 if (new_n_capabilities != 1) {
2130 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2131 }
2132 return;
2133 #elif defined(NOSMP)
2134 if (new_n_capabilities != 1) {
2135 errorBelch("setNumCapabilities: not supported on this platform");
2136 }
2137 return;
2138 #else
2139 Task *task;
2140 Capability *cap;
2141 uint32_t n;
2142 Capability *old_capabilities = NULL;
2143 uint32_t old_n_capabilities = n_capabilities;
2144
2145 if (new_n_capabilities == enabled_capabilities) return;
2146
2147 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2148 enabled_capabilities, new_n_capabilities);
2149
2150 cap = rts_lock();
2151 task = cap->running_task;
2152
2153 stopAllCapabilities(&cap, task);
2154
2155 if (new_n_capabilities < enabled_capabilities)
2156 {
2157 // Reducing the number of capabilities: we do not actually
2158 // remove the extra capabilities, we just mark them as
2159 // "disabled". This has the following effects:
2160 //
2161 // - threads on a disabled capability are migrated away by the
2162 // scheduler loop
2163 //
2164 // - disabled capabilities do not participate in GC
2165 // (see scheduleDoGC())
2166 //
2167 // - No spark threads are created on this capability
2168 // (see scheduleActivateSpark())
2169 //
2170 // - We do not attempt to migrate threads *to* a disabled
2171 // capability (see schedulePushWork()).
2172 //
2173 // but in other respects, a disabled capability remains
2174 // alive. Threads may be woken up on a disabled capability,
2175 // but they will be immediately migrated away.
2176 //
2177 // This approach is much easier than trying to actually remove
2178 // the capability; we don't have to worry about GC data
2179 // structures, the nursery, etc.
2180 //
2181 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2182 capabilities[n]->disabled = rtsTrue;
2183 traceCapDisable(capabilities[n]);
2184 }
2185 enabled_capabilities = new_n_capabilities;
2186 }
2187 else
2188 {
2189 // Increasing the number of enabled capabilities.
2190 //
2191 // enable any disabled capabilities, up to the required number
2192 for (n = enabled_capabilities;
2193 n < new_n_capabilities && n < n_capabilities; n++) {
2194 capabilities[n]->disabled = rtsFalse;
2195 traceCapEnable(capabilities[n]);
2196 }
2197 enabled_capabilities = n;
2198
2199 if (new_n_capabilities > n_capabilities) {
2200 #if defined(TRACING)
2201 // Allocate eventlog buffers for the new capabilities. Note this
2202 // must be done before calling moreCapabilities(), because that
2203 // will emit events about creating the new capabilities and adding
2204 // them to existing capsets.
2205 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2206 #endif
2207
2208 // Resize the capabilities array
2209 // NB. after this, capabilities points somewhere new. Any pointers
2210 // of type (Capability *) are now invalid.
2211 moreCapabilities(n_capabilities, new_n_capabilities);
2212
2213 // Resize and update storage manager data structures
2214 storageAddCapabilities(n_capabilities, new_n_capabilities);
2215 }
2216 }
2217
2218 // update n_capabilities before things start running
2219 if (new_n_capabilities > n_capabilities) {
2220 n_capabilities = enabled_capabilities = new_n_capabilities;
2221 }
2222
2223 // We're done: release the original Capabilities
2224 releaseAllCapabilities(old_n_capabilities, cap,task);
2225
2226 // We can't free the old array until now, because we access it
2227 // while updating pointers in updateCapabilityRefs().
2228 if (old_capabilities) {
2229 stgFree(old_capabilities);
2230 }
2231
2232 // Notify IO manager that the number of capabilities has changed.
2233 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2234
2235 rts_unlock(cap);
2236
2237 #endif // THREADED_RTS
2238 }
2239
2240
2241
2242 /* ---------------------------------------------------------------------------
2243 * Delete all the threads in the system
2244 * ------------------------------------------------------------------------- */
2245
2246 static void
2247 deleteAllThreads ( Capability *cap )
2248 {
2249 // NOTE: only safe to call if we own all capabilities.
2250
2251 StgTSO* t, *next;
2252 uint32_t g;
2253
2254 debugTrace(DEBUG_sched,"deleting all threads");
2255 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2256 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2257 next = t->global_link;
2258 deleteThread(cap,t);
2259 }
2260 }
2261
2262 // The run queue now contains a bunch of ThreadKilled threads. We
2263 // must not throw these away: the main thread(s) will be in there
2264 // somewhere, and the main scheduler loop has to deal with it.
2265 // Also, the run queue is the only thing keeping these threads from
2266 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2267
2268 #if !defined(THREADED_RTS)
2269 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2270 ASSERT(sleeping_queue == END_TSO_QUEUE);
2271 #endif
2272 }
2273
2274 /* -----------------------------------------------------------------------------
2275 Managing the suspended_ccalls list.
2276 Locks required: sched_mutex
2277 -------------------------------------------------------------------------- */
2278
2279 STATIC_INLINE void
2280 suspendTask (Capability *cap, Task *task)
2281 {
2282 InCall *incall;
2283
2284 incall = task->incall;
2285 ASSERT(incall->next == NULL && incall->prev == NULL);
2286 incall->next = cap->suspended_ccalls;
2287 incall->prev = NULL;
2288 if (cap->suspended_ccalls) {
2289 cap->suspended_ccalls->prev = incall;
2290 }
2291 cap->suspended_ccalls = incall;
2292 cap->n_suspended_ccalls++;
2293 }
2294
2295 STATIC_INLINE void
2296 recoverSuspendedTask (Capability *cap, Task *task)
2297 {
2298 InCall *incall;
2299
2300 incall = task->incall;
2301 if (incall->prev) {
2302 incall->prev->next = incall->next;
2303 } else {
2304 ASSERT(cap->suspended_ccalls == incall);
2305 cap->suspended_ccalls = incall->next;
2306 }
2307 if (incall->next) {
2308 incall->next->prev = incall->prev;
2309 }
2310 incall->next = incall->prev = NULL;
2311 cap->n_suspended_ccalls--;
2312 }
2313
2314 /* ---------------------------------------------------------------------------
2315 * Suspending & resuming Haskell threads.
2316 *
2317 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2318 * its capability before calling the C function. This allows another
2319 * task to pick up the capability and carry on running Haskell
2320 * threads. It also means that if the C call blocks, it won't lock
2321 * the whole system.
2322 *
2323 * The Haskell thread making the C call is put to sleep for the
2324 * duration of the call, on the suspended_ccalling_threads queue. We
2325 * give out a token to the task, which it can use to resume the thread
2326 * on return from the C function.
2327 *
2328 * If this is an interruptible C call, this means that the FFI call may be
2329 * unceremoniously terminated and should be scheduled on an
2330 * unbound worker thread.
2331 * ------------------------------------------------------------------------- */
2332
2333 void *
2334 suspendThread (StgRegTable *reg, rtsBool interruptible)
2335 {
2336 Capability *cap;
2337 int saved_errno;
2338 StgTSO *tso;
2339 Task *task;
2340 #if mingw32_HOST_OS
2341 StgWord32 saved_winerror;
2342 #endif
2343
2344 saved_errno = errno;
2345 #if mingw32_HOST_OS
2346 saved_winerror = GetLastError();
2347 #endif
2348
2349 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2350 */
2351 cap = regTableToCapability(reg);
2352
2353 task = cap->running_task;
2354 tso = cap->r.rCurrentTSO;
2355
2356 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2357
2358 // XXX this might not be necessary --SDM
2359 tso->what_next = ThreadRunGHC;
2360
2361 threadPaused(cap,tso);
2362
2363 if (interruptible) {
2364 tso->why_blocked = BlockedOnCCall_Interruptible;
2365 } else {
2366 tso->why_blocked = BlockedOnCCall;
2367 }
2368
2369 // Hand back capability
2370 task->incall->suspended_tso = tso;
2371 task->incall->suspended_cap = cap;
2372
2373 // Otherwise allocate() will write to invalid memory.
2374 cap->r.rCurrentTSO = NULL;
2375
2376 ACQUIRE_LOCK(&cap->lock);
2377
2378 suspendTask(cap,task);
2379 cap->in_haskell = rtsFalse;
2380 releaseCapability_(cap,rtsFalse);
2381
2382 RELEASE_LOCK(&cap->lock);
2383
2384 errno = saved_errno;
2385 #if mingw32_HOST_OS
2386 SetLastError(saved_winerror);
2387 #endif
2388 return task;
2389 }
2390
2391 StgRegTable *
2392 resumeThread (void *task_)
2393 {
2394 StgTSO *tso;
2395 InCall *incall;
2396 Capability *cap;
2397 Task *task = task_;
2398 int saved_errno;
2399 #if mingw32_HOST_OS
2400 StgWord32 saved_winerror;
2401 #endif
2402
2403 saved_errno = errno;
2404 #if mingw32_HOST_OS
2405 saved_winerror = GetLastError();
2406 #endif
2407
2408 incall = task->incall;
2409 cap = incall->suspended_cap;
2410 task->cap = cap;
2411
2412 // Wait for permission to re-enter the RTS with the result.
2413 waitForCapability(&cap,task);
2414 // we might be on a different capability now... but if so, our
2415 // entry on the suspended_ccalls list will also have been
2416 // migrated.
2417
2418 // Remove the thread from the suspended list
2419 recoverSuspendedTask(cap,task);
2420
2421 tso = incall->suspended_tso;
2422 incall->suspended_tso = NULL;
2423 incall->suspended_cap = NULL;
2424 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2425
2426 traceEventRunThread(cap, tso);
2427
2428 /* Reset blocking status */
2429 tso->why_blocked = NotBlocked;
2430
2431 if ((tso->flags & TSO_BLOCKEX) == 0) {
2432 // avoid locking the TSO if we don't have to
2433 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2434 maybePerformBlockedException(cap,tso);
2435 }
2436 }
2437
2438 cap->r.rCurrentTSO = tso;
2439 cap->in_haskell = rtsTrue;
2440 errno = saved_errno;
2441 #if mingw32_HOST_OS
2442 SetLastError(saved_winerror);
2443 #endif
2444
2445 /* We might have GC'd, mark the TSO dirty again */
2446 dirty_TSO(cap,tso);
2447 dirty_STACK(cap,tso->stackobj);
2448
2449 IF_DEBUG(sanity, checkTSO(tso));
2450
2451 return &cap->r;
2452 }
2453
2454 /* ---------------------------------------------------------------------------
2455 * scheduleThread()
2456 *
2457 * scheduleThread puts a thread on the end of the runnable queue.
2458 * This will usually be done immediately after a thread is created.
2459 * The caller of scheduleThread must create the thread using e.g.
2460 * createThread and push an appropriate closure
2461 * on this thread's stack before the scheduler is invoked.
2462 * ------------------------------------------------------------------------ */
2463
2464 void
2465 scheduleThread(Capability *cap, StgTSO *tso)
2466 {
2467 // The thread goes at the *end* of the run-queue, to avoid possible
2468 // starvation of any threads already on the queue.
2469 appendToRunQueue(cap,tso);
2470 }
2471
2472 void
2473 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2474 {
2475 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2476 // move this thread from now on.
2477 #if defined(THREADED_RTS)
2478 cpu %= enabled_capabilities;
2479 if (cpu == cap->no) {
2480 appendToRunQueue(cap,tso);
2481 } else {
2482 migrateThread(cap, tso, capabilities[cpu]);
2483 }
2484 #else
2485 appendToRunQueue(cap,tso);
2486 #endif
2487 }
2488
2489 void
2490 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2491 {
2492 Task *task;
2493 DEBUG_ONLY( StgThreadID id );
2494 Capability *cap;
2495
2496 cap = *pcap;
2497
2498 // We already created/initialised the Task
2499 task = cap->running_task;
2500
2501 // This TSO is now a bound thread; make the Task and TSO
2502 // point to each other.
2503 tso->bound = task->incall;
2504 tso->cap = cap;
2505
2506 task->incall->tso = tso;
2507 task->incall->ret = ret;
2508 task->incall->rstat = NoStatus;
2509
2510 appendToRunQueue(cap,tso);
2511
2512 DEBUG_ONLY( id = tso->id );
2513 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2514
2515 cap = schedule(cap,task);
2516
2517 ASSERT(task->incall->rstat != NoStatus);
2518 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2519
2520 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2521 *pcap = cap;
2522 }
2523
2524 /* ----------------------------------------------------------------------------
2525 * Starting Tasks
2526 * ------------------------------------------------------------------------- */
2527
2528 #if defined(THREADED_RTS)
2529 void scheduleWorker (Capability *cap, Task *task)
2530 {
2531 // schedule() runs without a lock.
2532 cap = schedule(cap,task);
2533
2534 // On exit from schedule(), we have a Capability, but possibly not
2535 // the same one we started with.
2536
2537 // During shutdown, the requirement is that after all the
2538 // Capabilities are shut down, all workers that are shutting down
2539 // have finished workerTaskStop(). This is why we hold on to
2540 // cap->lock until we've finished workerTaskStop() below.
2541 //
2542 // There may be workers still involved in foreign calls; those
2543 // will just block in waitForCapability() because the
2544 // Capability has been shut down.
2545 //
2546 ACQUIRE_LOCK(&cap->lock);
2547 releaseCapability_(cap,rtsFalse);
2548 workerTaskStop(task);
2549 RELEASE_LOCK(&cap->lock);
2550 }
2551 #endif
2552
2553 /* ---------------------------------------------------------------------------
2554 * Start new worker tasks on Capabilities from--to
2555 * -------------------------------------------------------------------------- */
2556
2557 static void
2558 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2559 {
2560 #if defined(THREADED_RTS)
2561 uint32_t i;
2562 Capability *cap;
2563
2564 for (i = from; i < to; i++) {
2565 cap = capabilities[i];
2566 ACQUIRE_LOCK(&cap->lock);
2567 startWorkerTask(cap);
2568 RELEASE_LOCK(&cap->lock);
2569 }
2570 #endif
2571 }
2572
2573 /* ---------------------------------------------------------------------------
2574 * initScheduler()
2575 *
2576 * Initialise the scheduler. This resets all the queues - if the
2577 * queues contained any threads, they'll be garbage collected at the
2578 * next pass.
2579 *
2580 * ------------------------------------------------------------------------ */
2581
2582 void
2583 initScheduler(void)
2584 {
2585 #if !defined(THREADED_RTS)
2586 blocked_queue_hd = END_TSO_QUEUE;
2587 blocked_queue_tl = END_TSO_QUEUE;
2588 sleeping_queue = END_TSO_QUEUE;
2589 #endif
2590
2591 sched_state = SCHED_RUNNING;
2592 recent_activity = ACTIVITY_YES;
2593
2594 #if defined(THREADED_RTS)
2595 /* Initialise the mutex and condition variables used by
2596 * the scheduler. */
2597 initMutex(&sched_mutex);
2598 #endif
2599
2600 ACQUIRE_LOCK(&sched_mutex);
2601
2602 /* A capability holds the state a native thread needs in
2603 * order to execute STG code. At least one capability is
2604 * floating around (only THREADED_RTS builds have more than one).
2605 */
2606 initCapabilities();
2607
2608 initTaskManager();
2609
2610 /*
2611 * Eagerly start one worker to run each Capability, except for
2612 * Capability 0. The idea is that we're probably going to start a
2613 * bound thread on Capability 0 pretty soon, so we don't want a
2614 * worker task hogging it.
2615 */
2616 startWorkerTasks(1, n_capabilities);
2617
2618 RELEASE_LOCK(&sched_mutex);
2619
2620 }
2621
2622 void
2623 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2624 /* see Capability.c, shutdownCapability() */
2625 {
2626 Task *task = NULL;
2627
2628 task = newBoundTask();
2629
2630 // If we haven't killed all the threads yet, do it now.
2631 if (sched_state < SCHED_SHUTTING_DOWN) {
2632 sched_state = SCHED_INTERRUPTING;
2633 Capability *cap = task->cap;
2634 waitForCapability(&cap,task);
2635 scheduleDoGC(&cap,task,rtsTrue);
2636 ASSERT(task->incall->tso == NULL);
2637 releaseCapability(cap);
2638 }
2639 sched_state = SCHED_SHUTTING_DOWN;
2640
2641 shutdownCapabilities(task, wait_foreign);
2642
2643 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2644 // n_failed_trygrab_idles, n_idle_caps);
2645
2646 boundTaskExiting(task);
2647 }
2648
2649 void
2650 freeScheduler( void )
2651 {
2652 uint32_t still_running;
2653
2654 ACQUIRE_LOCK(&sched_mutex);
2655 still_running = freeTaskManager();
2656 // We can only free the Capabilities if there are no Tasks still
2657 // running. We might have a Task about to return from a foreign
2658 // call into waitForCapability(), for example (actually,
2659 // this should be the *only* thing that a still-running Task can
2660 // do at this point, and it will block waiting for the
2661 // Capability).
2662 if (still_running == 0) {
2663 freeCapabilities();
2664 }
2665 RELEASE_LOCK(&sched_mutex);
2666 #if defined(THREADED_RTS)
2667 closeMutex(&sched_mutex);
2668 #endif
2669 }
2670
2671 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2672 void *user USED_IF_NOT_THREADS)
2673 {
2674 #if !defined(THREADED_RTS)
2675 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2676 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2677 evac(user, (StgClosure **)(void *)&sleeping_queue);
2678 #endif
2679 }
2680
2681 /* -----------------------------------------------------------------------------
2682 performGC
2683
2684 This is the interface to the garbage collector from Haskell land.
2685 We provide this so that external C code can allocate and garbage
2686 collect when called from Haskell via _ccall_GC.
2687 -------------------------------------------------------------------------- */
2688
2689 static void
2690 performGC_(rtsBool force_major)
2691 {
2692 Task *task;
2693 Capability *cap = NULL;
2694
2695 // We must grab a new Task here, because the existing Task may be
2696 // associated with a particular Capability, and chained onto the
2697 // suspended_ccalls queue.
2698 task = newBoundTask();
2699
2700 // TODO: do we need to traceTask*() here?
2701
2702 waitForCapability(&cap,task);
2703 scheduleDoGC(&cap,task,force_major);
2704 releaseCapability(cap);
2705 boundTaskExiting(task);
2706 }
2707
2708 void
2709 performGC(void)
2710 {
2711 performGC_(rtsFalse);
2712 }
2713
2714 void
2715 performMajorGC(void)
2716 {
2717 performGC_(rtsTrue);
2718 }
2719
2720 /* ---------------------------------------------------------------------------
2721 Interrupt execution.
2722 Might be called inside a signal handler so it mustn't do anything fancy.
2723 ------------------------------------------------------------------------ */
2724
2725 void
2726 interruptStgRts(void)
2727 {
2728 sched_state = SCHED_INTERRUPTING;
2729 interruptAllCapabilities();
2730 #if defined(THREADED_RTS)
2731 wakeUpRts();
2732 #endif
2733 }
2734
2735 /* -----------------------------------------------------------------------------
2736 Wake up the RTS
2737
2738 This function causes at least one OS thread to wake up and run the
2739 scheduler loop. It is invoked when the RTS might be deadlocked, or
2740 an external event has arrived that may need servicing (eg. a
2741 keyboard interrupt).
2742
2743 In the single-threaded RTS we don't do anything here; we only have
2744 one thread anyway, and the event that caused us to want to wake up
2745 will have interrupted any blocking system call in progress anyway.
2746 -------------------------------------------------------------------------- */
2747
2748 #if defined(THREADED_RTS)
2749 void wakeUpRts(void)
2750 {
2751 // This forces the IO Manager thread to wakeup, which will
2752 // in turn ensure that some OS thread wakes up and runs the
2753 // scheduler loop, which will cause a GC and deadlock check.
2754 ioManagerWakeup();
2755 }
2756 #endif
2757
2758 /* -----------------------------------------------------------------------------
2759 Deleting threads
2760
2761 This is used for interruption (^C) and forking, and corresponds to
2762 raising an exception but without letting the thread catch the
2763 exception.
2764 -------------------------------------------------------------------------- */
2765
2766 static void
2767 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2768 {
2769 // NOTE: must only be called on a TSO that we have exclusive
2770 // access to, because we will call throwToSingleThreaded() below.
2771 // The TSO must be on the run queue of the Capability we own, or
2772 // we must own all Capabilities.
2773
2774 if (tso->why_blocked != BlockedOnCCall &&
2775 tso->why_blocked != BlockedOnCCall_Interruptible) {
2776 throwToSingleThreaded(tso->cap,tso,NULL);
2777 }
2778 }
2779
2780 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2781 static void
2782 deleteThread_(Capability *cap, StgTSO *tso)
2783 { // for forkProcess only:
2784 // like deleteThread(), but we delete threads in foreign calls, too.
2785
2786 if (tso->why_blocked == BlockedOnCCall ||
2787 tso->why_blocked == BlockedOnCCall_Interruptible) {
2788 tso->what_next = ThreadKilled;
2789 appendToRunQueue(tso->cap, tso);
2790 } else {
2791 deleteThread(cap,tso);
2792 }
2793 }
2794 #endif
2795
2796 /* -----------------------------------------------------------------------------
2797 raiseExceptionHelper
2798
2799 This function is called by the raise# primitve, just so that we can
2800 move some of the tricky bits of raising an exception from C-- into
2801 C. Who knows, it might be a useful re-useable thing here too.
2802 -------------------------------------------------------------------------- */
2803
2804 StgWord
2805 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2806 {
2807 Capability *cap = regTableToCapability(reg);
2808 StgThunk *raise_closure = NULL;
2809 StgPtr p, next;
2810 const StgRetInfoTable *info;
2811 //
2812 // This closure represents the expression 'raise# E' where E
2813 // is the exception raise. It is used to overwrite all the
2814 // thunks which are currently under evaluataion.
2815 //
2816
2817 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2818 // LDV profiling: stg_raise_info has THUNK as its closure
2819 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2820 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2821 // 1 does not cause any problem unless profiling is performed.
2822 // However, when LDV profiling goes on, we need to linearly scan
2823 // small object pool, where raise_closure is stored, so we should
2824 // use MIN_UPD_SIZE.
2825 //
2826 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2827 // sizeofW(StgClosure)+1);
2828 //
2829
2830 //
2831 // Walk up the stack, looking for the catch frame. On the way,
2832 // we update any closures pointed to from update frames with the
2833 // raise closure that we just built.
2834 //
2835 p = tso->stackobj->sp;
2836 while(1) {
2837 info = get_ret_itbl((StgClosure *)p);
2838 next = p + stack_frame_sizeW((StgClosure *)p);
2839 switch (info->i.type) {
2840
2841 case UPDATE_FRAME:
2842 // Only create raise_closure if we need to.
2843 if (raise_closure == NULL) {
2844 raise_closure =
2845 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2846 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2847 raise_closure->payload[0] = exception;
2848 }
2849 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2850 (StgClosure *)raise_closure);
2851 p = next;
2852 continue;
2853
2854 case ATOMICALLY_FRAME:
2855 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2856 tso->stackobj->sp = p;
2857 return ATOMICALLY_FRAME;
2858
2859 case CATCH_FRAME:
2860 tso->stackobj->sp = p;
2861 return CATCH_FRAME;
2862
2863 case CATCH_STM_FRAME:
2864 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2865 tso->stackobj->sp = p;
2866 return CATCH_STM_FRAME;
2867
2868 case UNDERFLOW_FRAME:
2869 tso->stackobj->sp = p;
2870 threadStackUnderflow(cap,tso);
2871 p = tso->stackobj->sp;
2872 continue;
2873
2874 case STOP_FRAME:
2875 tso->stackobj->sp = p;
2876 return STOP_FRAME;
2877
2878 case CATCH_RETRY_FRAME: {
2879 StgTRecHeader *trec = tso -> trec;
2880 StgTRecHeader *outer = trec -> enclosing_trec;
2881 debugTrace(DEBUG_stm,
2882 "found CATCH_RETRY_FRAME at %p during raise", p);
2883 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2884 stmAbortTransaction(cap, trec);
2885 stmFreeAbortedTRec(cap, trec);
2886 tso -> trec = outer;
2887 p = next;
2888 continue;
2889 }
2890
2891 default:
2892 p = next;
2893 continue;
2894 }
2895 }
2896 }
2897
2898
2899 /* -----------------------------------------------------------------------------
2900 findRetryFrameHelper
2901
2902 This function is called by the retry# primitive. It traverses the stack
2903 leaving tso->sp referring to the frame which should handle the retry.
2904
2905 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2906 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2907
2908 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2909 create) because retries are not considered to be exceptions, despite the
2910 similar implementation.
2911
2912 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2913 not be created within memory transactions.
2914 -------------------------------------------------------------------------- */
2915
2916 StgWord
2917 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2918 {
2919 const StgRetInfoTable *info;
2920 StgPtr p, next;
2921
2922 p = tso->stackobj->sp;
2923 while (1) {
2924 info = get_ret_itbl((const StgClosure *)p);
2925 next = p + stack_frame_sizeW((StgClosure *)p);
2926 switch (info->i.type) {
2927
2928 case ATOMICALLY_FRAME:
2929 debugTrace(DEBUG_stm,
2930 "found ATOMICALLY_FRAME at %p during retry", p);
2931 tso->stackobj->sp = p;
2932 return ATOMICALLY_FRAME;
2933
2934 case CATCH_RETRY_FRAME:
2935 debugTrace(DEBUG_stm,
2936 "found CATCH_RETRY_FRAME at %p during retry", p);
2937 tso->stackobj->sp = p;
2938 return CATCH_RETRY_FRAME;
2939
2940 case CATCH_STM_FRAME: {
2941 StgTRecHeader *trec = tso -> trec;
2942 StgTRecHeader *outer = trec -> enclosing_trec;
2943 debugTrace(DEBUG_stm,
2944 "found CATCH_STM_FRAME at %p during retry", p);
2945 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2946 stmAbortTransaction(cap, trec);
2947 stmFreeAbortedTRec(cap, trec);
2948 tso -> trec = outer;
2949 p = next;
2950 continue;
2951 }
2952
2953 case UNDERFLOW_FRAME:
2954 tso->stackobj->sp = p;
2955 threadStackUnderflow(cap,tso);
2956 p = tso->stackobj->sp;
2957 continue;
2958
2959 default:
2960 ASSERT(info->i.type != CATCH_FRAME);
2961 ASSERT(info->i.type != STOP_FRAME);
2962 p = next;
2963 continue;
2964 }
2965 }
2966 }
2967
2968 /* -----------------------------------------------------------------------------
2969 resurrectThreads is called after garbage collection on the list of
2970 threads found to be garbage. Each of these threads will be woken
2971 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2972 on an MVar, or NonTermination if the thread was blocked on a Black
2973 Hole.
2974
2975 Locks: assumes we hold *all* the capabilities.
2976 -------------------------------------------------------------------------- */
2977
2978 void
2979 resurrectThreads (StgTSO *threads)
2980 {
2981 StgTSO *tso, *next;
2982 Capability *cap;
2983 generation *gen;
2984
2985 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2986 next = tso->global_link;
2987
2988 gen = Bdescr((P_)tso)->gen;
2989 tso->global_link = gen->threads;
2990 gen->threads = tso;
2991
2992 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2993
2994 // Wake up the thread on the Capability it was last on
2995 cap = tso->cap;
2996
2997 switch (tso->why_blocked) {
2998 case BlockedOnMVar:
2999 case BlockedOnMVarRead:
3000 /* Called by GC - sched_mutex lock is currently held. */
3001 throwToSingleThreaded(cap, tso,
3002 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3003 break;
3004 case BlockedOnBlackHole:
3005 throwToSingleThreaded(cap, tso,
3006 (StgClosure *)nonTermination_closure);
3007 break;
3008 case BlockedOnSTM:
3009 throwToSingleThreaded(cap, tso,
3010 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3011 break;
3012 case NotBlocked:
3013 /* This might happen if the thread was blocked on a black hole
3014 * belonging to a thread that we've just woken up (raiseAsync
3015 * can wake up threads, remember...).
3016 */
3017 continue;
3018 case BlockedOnMsgThrowTo:
3019 // This can happen if the target is masking, blocks on a
3020 // black hole, and then is found to be unreachable. In
3021 // this case, we want to let the target wake up and carry
3022 // on, and do nothing to this thread.
3023 continue;
3024 default:
3025 barf("resurrectThreads: thread blocked in a strange way: %d",
3026 tso->why_blocked);
3027 }
3028 }
3029 }