908acf27fe665b8df186d55534a3f3648e12a282
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) {
709 spare_threads = 0;
710 }
711
712 // Figure out how many capabilities we want to wake up. We need at least
713 // sparkPoolSize(cap) plus the number of spare threads we have.
714 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715 if (n_wanted_caps == 0) return;
716
717 // First grab as many free Capabilities as we can. ToDo: we should use
718 // capabilities on the same NUMA node preferably, but not exclusively.
719 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
720 n_free_caps < n_wanted_caps && i != cap->no;
721 i = (i + 1) % n_capabilities) {
722 cap0 = capabilities[i];
723 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724 if (!emptyRunQueue(cap0)
725 || cap0->n_returning_tasks != 0
726 || cap0->inbox != (Message*)END_TSO_QUEUE) {
727 // it already has some work, we just grabbed it at
728 // the wrong moment. Or maybe it's deadlocked!
729 releaseCapability(cap0);
730 } else {
731 free_caps[n_free_caps++] = cap0;
732 }
733 }
734 }
735
736 // We now have n_free_caps free capabilities stashed in
737 // free_caps[]. Attempt to share our run queue equally with them.
738 // This is complicated slightly by the fact that we can't move
739 // some threads:
740 //
741 // - threads that have TSO_LOCKED cannot migrate
742 // - a thread that is bound to the current Task cannot be migrated
743 //
744 // So we walk through the run queue, migrating threads to
745 // free_caps[] round-robin, skipping over immovable threads. Each
746 // time through free_caps[] we keep one thread for ourselves,
747 // provided we haven't encountered one or more immovable threads
748 // in this pass.
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761 #ifdef SPARK_PUSHING
762 rtsBool pushed_to_all;
763 #endif
764
765 debugTrace(DEBUG_sched,
766 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
767 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
768 n_free_caps);
769
770 i = 0;
771 #ifdef SPARK_PUSHING
772 pushed_to_all = rtsFalse;
773 #endif
774
775 // We want to share threads equally amongst free_caps[] and the
776 // current capability, but sometimes we encounter immovable
777 // threads. This counter tracks the number of threads we have kept
778 // for the current capability minus the number of passes over
779 // free_caps[]. If it is great than zero (due to immovable
780 // threads), we should try to bring it back to zero again by not
781 // keeping any threads for the current capability.
782 uint32_t imbalance = 0;
783
784 // n_free_caps may be larger than the number of spare threads we have,
785 // if there were sparks in the spark pool. To avoid giving away all our
786 // threads in this case, we limit the number of caps that we give
787 // threads to, to the number of spare threads (n_run_queue-1).
788 uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
789
790 if (thread_recipients > 0) {
791 prev = END_TSO_QUEUE;
792 t = cap->run_queue_hd;
793 for (; t != END_TSO_QUEUE; t = next) {
794 next = t->_link;
795 t->_link = END_TSO_QUEUE;
796 if (t->bound == task->incall // don't move my bound thread
797 || tsoLocked(t)) { // don't move a locked thread
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 imbalance++;
806 } else if (i == thread_recipients) {
807 #ifdef SPARK_PUSHING
808 pushed_to_all = rtsTrue;
809 #endif
810 // If we have not already kept any threads for this
811 // capability during the current pass over free_caps[],
812 // keep one now.
813 if (imbalance == 0) {
814 if (prev == END_TSO_QUEUE) {
815 cap->run_queue_hd = t;
816 } else {
817 setTSOLink(cap, prev, t);
818 }
819 setTSOPrev(cap, t, prev);
820 prev = t;
821 } else {
822 imbalance--;
823 }
824 i = 0;
825 } else {
826 appendToRunQueue(free_caps[i],t);
827 cap->n_run_queue--;
828
829 traceEventMigrateThread (cap, t, free_caps[i]->no);
830
831 if (t->bound) { t->bound->task->cap = free_caps[i]; }
832 t->cap = free_caps[i];
833 i++;
834 }
835 }
836 cap->run_queue_tl = prev;
837
838 IF_DEBUG(sanity, checkRunQueue(cap));
839 }
840
841 #ifdef SPARK_PUSHING
842 /* JB I left this code in place, it would work but is not necessary */
843
844 // If there are some free capabilities that we didn't push any
845 // threads to, then try to push a spark to each one.
846 if (!pushed_to_all) {
847 StgClosure *spark;
848 // i is the next free capability to push to
849 for (; i < n_free_caps; i++) {
850 if (emptySparkPoolCap(free_caps[i])) {
851 spark = tryStealSpark(cap->sparks);
852 if (spark != NULL) {
853 /* TODO: if anyone wants to re-enable this code then
854 * they must consider the fizzledSpark(spark) case
855 * and update the per-cap spark statistics.
856 */
857 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
858
859 traceEventStealSpark(free_caps[i], t, cap->no);
860
861 newSpark(&(free_caps[i]->r), spark);
862 }
863 }
864 }
865 }
866 #endif /* SPARK_PUSHING */
867
868 // release the capabilities
869 for (i = 0; i < n_free_caps; i++) {
870 task->cap = free_caps[i];
871 if (sparkPoolSizeCap(cap) > 0) {
872 // If we have sparks to steal, wake up a worker on the
873 // capability, even if it has no threads to run.
874 releaseAndWakeupCapability(free_caps[i]);
875 } else {
876 releaseCapability(free_caps[i]);
877 }
878 }
879 }
880 task->cap = cap; // reset to point to our Capability.
881
882 #endif /* THREADED_RTS */
883
884 }
885
886 /* ----------------------------------------------------------------------------
887 * Start any pending signal handlers
888 * ------------------------------------------------------------------------- */
889
890 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
891 static void
892 scheduleStartSignalHandlers(Capability *cap)
893 {
894 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
895 // safe outside the lock
896 startSignalHandlers(cap);
897 }
898 }
899 #else
900 static void
901 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
902 {
903 }
904 #endif
905
906 /* ----------------------------------------------------------------------------
907 * Check for blocked threads that can be woken up.
908 * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
912 {
913 #if !defined(THREADED_RTS)
914 //
915 // Check whether any waiting threads need to be woken up. If the
916 // run queue is empty, and there are no other tasks running, we
917 // can wait indefinitely for something to happen.
918 //
919 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
920 {
921 awaitEvent (emptyRunQueue(cap));
922 }
923 #endif
924 }
925
926 /* ----------------------------------------------------------------------------
927 * Detect deadlock conditions and attempt to resolve them.
928 * ------------------------------------------------------------------------- */
929
930 static void
931 scheduleDetectDeadlock (Capability **pcap, Task *task)
932 {
933 Capability *cap = *pcap;
934 /*
935 * Detect deadlock: when we have no threads to run, there are no
936 * threads blocked, waiting for I/O, or sleeping, and all the
937 * other tasks are waiting for work, we must have a deadlock of
938 * some description.
939 */
940 if ( emptyThreadQueues(cap) )
941 {
942 #if defined(THREADED_RTS)
943 /*
944 * In the threaded RTS, we only check for deadlock if there
945 * has been no activity in a complete timeslice. This means
946 * we won't eagerly start a full GC just because we don't have
947 * any threads to run currently.
948 */
949 if (recent_activity != ACTIVITY_INACTIVE) return;
950 #endif
951
952 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
953
954 // Garbage collection can release some new threads due to
955 // either (a) finalizers or (b) threads resurrected because
956 // they are unreachable and will therefore be sent an
957 // exception. Any threads thus released will be immediately
958 // runnable.
959 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
960 cap = *pcap;
961 // when force_major == rtsTrue. scheduleDoGC sets
962 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
963 // signal.
964
965 if ( !emptyRunQueue(cap) ) return;
966
967 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
968 /* If we have user-installed signal handlers, then wait
969 * for signals to arrive rather then bombing out with a
970 * deadlock.
971 */
972 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
973 debugTrace(DEBUG_sched,
974 "still deadlocked, waiting for signals...");
975
976 awaitUserSignals();
977
978 if (signals_pending()) {
979 startSignalHandlers(cap);
980 }
981
982 // either we have threads to run, or we were interrupted:
983 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
984
985 return;
986 }
987 #endif
988
989 #if !defined(THREADED_RTS)
990 /* Probably a real deadlock. Send the current main thread the
991 * Deadlock exception.
992 */
993 if (task->incall->tso) {
994 switch (task->incall->tso->why_blocked) {
995 case BlockedOnSTM:
996 case BlockedOnBlackHole:
997 case BlockedOnMsgThrowTo:
998 case BlockedOnMVar:
999 case BlockedOnMVarRead:
1000 throwToSingleThreaded(cap, task->incall->tso,
1001 (StgClosure *)nonTermination_closure);
1002 return;
1003 default:
1004 barf("deadlock: main thread blocked in a strange way");
1005 }
1006 }
1007 return;
1008 #endif
1009 }
1010 }
1011
1012
1013 /* ----------------------------------------------------------------------------
1014 * Process message in the current Capability's inbox
1015 * ------------------------------------------------------------------------- */
1016
1017 static void
1018 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
1019 {
1020 #if defined(THREADED_RTS)
1021 Message *m, *next;
1022 int r;
1023 Capability *cap = *pcap;
1024
1025 while (!emptyInbox(cap)) {
1026 if (cap->r.rCurrentNursery->link == NULL ||
1027 g0->n_new_large_words >= large_alloc_lim) {
1028 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1029 cap = *pcap;
1030 }
1031
1032 // don't use a blocking acquire; if the lock is held by
1033 // another thread then just carry on. This seems to avoid
1034 // getting stuck in a message ping-pong situation with other
1035 // processors. We'll check the inbox again later anyway.
1036 //
1037 // We should really use a more efficient queue data structure
1038 // here. The trickiness is that we must ensure a Capability
1039 // never goes idle if the inbox is non-empty, which is why we
1040 // use cap->lock (cap->lock is released as the last thing
1041 // before going idle; see Capability.c:releaseCapability()).
1042 r = TRY_ACQUIRE_LOCK(&cap->lock);
1043 if (r != 0) return;
1044
1045 m = cap->inbox;
1046 cap->inbox = (Message*)END_TSO_QUEUE;
1047
1048 RELEASE_LOCK(&cap->lock);
1049
1050 while (m != (Message*)END_TSO_QUEUE) {
1051 next = m->link;
1052 executeMessage(cap, m);
1053 m = next;
1054 }
1055 }
1056 #endif
1057 }
1058
1059 /* ----------------------------------------------------------------------------
1060 * Activate spark threads (THREADED_RTS)
1061 * ------------------------------------------------------------------------- */
1062
1063 #if defined(THREADED_RTS)
1064 static void
1065 scheduleActivateSpark(Capability *cap)
1066 {
1067 if (anySparks() && !cap->disabled)
1068 {
1069 createSparkThread(cap);
1070 debugTrace(DEBUG_sched, "creating a spark thread");
1071 }
1072 }
1073 #endif // THREADED_RTS
1074
1075 /* ----------------------------------------------------------------------------
1076 * After running a thread...
1077 * ------------------------------------------------------------------------- */
1078
1079 static void
1080 schedulePostRunThread (Capability *cap, StgTSO *t)
1081 {
1082 // We have to be able to catch transactions that are in an
1083 // infinite loop as a result of seeing an inconsistent view of
1084 // memory, e.g.
1085 //
1086 // atomically $ do
1087 // [a,b] <- mapM readTVar [ta,tb]
1088 // when (a == b) loop
1089 //
1090 // and a is never equal to b given a consistent view of memory.
1091 //
1092 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1093 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1094 debugTrace(DEBUG_sched | DEBUG_stm,
1095 "trec %p found wasting its time", t);
1096
1097 // strip the stack back to the
1098 // ATOMICALLY_FRAME, aborting the (nested)
1099 // transaction, and saving the stack of any
1100 // partially-evaluated thunks on the heap.
1101 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1102
1103 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1104 }
1105 }
1106
1107 //
1108 // If the current thread's allocation limit has run out, send it
1109 // the AllocationLimitExceeded exception.
1110
1111 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1112 // Use a throwToSelf rather than a throwToSingleThreaded, because
1113 // it correctly handles the case where the thread is currently
1114 // inside mask. Also the thread might be blocked (e.g. on an
1115 // MVar), and throwToSingleThreaded doesn't unblock it
1116 // correctly in that case.
1117 throwToSelf(cap, t, allocationLimitExceeded_closure);
1118 ASSIGN_Int64((W_*)&(t->alloc_limit),
1119 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1120 }
1121
1122 /* some statistics gathering in the parallel case */
1123 }
1124
1125 /* -----------------------------------------------------------------------------
1126 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1127 * -------------------------------------------------------------------------- */
1128
1129 static rtsBool
1130 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1131 {
1132 if (cap->r.rHpLim == NULL || cap->context_switch) {
1133 // Sometimes we miss a context switch, e.g. when calling
1134 // primitives in a tight loop, MAYBE_GC() doesn't check the
1135 // context switch flag, and we end up waiting for a GC.
1136 // See #1984, and concurrent/should_run/1984
1137 cap->context_switch = 0;
1138 appendToRunQueue(cap,t);
1139 } else {
1140 pushOnRunQueue(cap,t);
1141 }
1142
1143 // did the task ask for a large block?
1144 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1145 // if so, get one and push it on the front of the nursery.
1146 bdescr *bd;
1147 W_ blocks;
1148
1149 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1150
1151 if (blocks > BLOCKS_PER_MBLOCK) {
1152 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1153 }
1154
1155 debugTrace(DEBUG_sched,
1156 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1157 (long)t->id, what_next_strs[t->what_next], blocks);
1158
1159 // don't do this if the nursery is (nearly) full, we'll GC first.
1160 if (cap->r.rCurrentNursery->link != NULL ||
1161 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1162 // infinite loop if the
1163 // nursery has only one
1164 // block.
1165
1166 bd = allocGroupOnNode_lock(cap->node,blocks);
1167 cap->r.rNursery->n_blocks += blocks;
1168
1169 // link the new group after CurrentNursery
1170 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1171
1172 // initialise it as a nursery block. We initialise the
1173 // step, gen_no, and flags field of *every* sub-block in
1174 // this large block, because this is easier than making
1175 // sure that we always find the block head of a large
1176 // block whenever we call Bdescr() (eg. evacuate() and
1177 // isAlive() in the GC would both have to do this, at
1178 // least).
1179 {
1180 bdescr *x;
1181 for (x = bd; x < bd + blocks; x++) {
1182 initBdescr(x,g0,g0);
1183 x->free = x->start;
1184 x->flags = 0;
1185 }
1186 }
1187
1188 // This assert can be a killer if the app is doing lots
1189 // of large block allocations.
1190 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1191
1192 // now update the nursery to point to the new block
1193 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1194 cap->r.rCurrentNursery = bd;
1195
1196 // we might be unlucky and have another thread get on the
1197 // run queue before us and steal the large block, but in that
1198 // case the thread will just end up requesting another large
1199 // block.
1200 return rtsFalse; /* not actually GC'ing */
1201 }
1202 }
1203
1204 // if we got here because we exceeded large_alloc_lim, then
1205 // proceed straight to GC.
1206 if (g0->n_new_large_words >= large_alloc_lim) {
1207 return rtsTrue;
1208 }
1209
1210 // Otherwise, we just ran out of space in the current nursery.
1211 // Grab another nursery if we can.
1212 if (getNewNursery(cap)) {
1213 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1214 return rtsFalse;
1215 }
1216
1217 return rtsTrue;
1218 /* actual GC is done at the end of the while loop in schedule() */
1219 }
1220
1221 /* -----------------------------------------------------------------------------
1222 * Handle a thread that returned to the scheduler with ThreadYielding
1223 * -------------------------------------------------------------------------- */
1224
1225 static rtsBool
1226 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1227 {
1228 /* put the thread back on the run queue. Then, if we're ready to
1229 * GC, check whether this is the last task to stop. If so, wake
1230 * up the GC thread. getThread will block during a GC until the
1231 * GC is finished.
1232 */
1233
1234 ASSERT(t->_link == END_TSO_QUEUE);
1235
1236 // Shortcut if we're just switching evaluators: don't bother
1237 // doing stack squeezing (which can be expensive), just run the
1238 // thread.
1239 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1240 debugTrace(DEBUG_sched,
1241 "--<< thread %ld (%s) stopped to switch evaluators",
1242 (long)t->id, what_next_strs[t->what_next]);
1243 return rtsTrue;
1244 }
1245
1246 // Reset the context switch flag. We don't do this just before
1247 // running the thread, because that would mean we would lose ticks
1248 // during GC, which can lead to unfair scheduling (a thread hogs
1249 // the CPU because the tick always arrives during GC). This way
1250 // penalises threads that do a lot of allocation, but that seems
1251 // better than the alternative.
1252 if (cap->context_switch != 0) {
1253 cap->context_switch = 0;
1254 appendToRunQueue(cap,t);
1255 } else {
1256 pushOnRunQueue(cap,t);
1257 }
1258
1259 IF_DEBUG(sanity,
1260 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1261 checkTSO(t));
1262
1263 return rtsFalse;
1264 }
1265
1266 /* -----------------------------------------------------------------------------
1267 * Handle a thread that returned to the scheduler with ThreadBlocked
1268 * -------------------------------------------------------------------------- */
1269
1270 static void
1271 scheduleHandleThreadBlocked( StgTSO *t
1272 #if !defined(DEBUG)
1273 STG_UNUSED
1274 #endif
1275 )
1276 {
1277
1278 // We don't need to do anything. The thread is blocked, and it
1279 // has tidied up its stack and placed itself on whatever queue
1280 // it needs to be on.
1281
1282 // ASSERT(t->why_blocked != NotBlocked);
1283 // Not true: for example,
1284 // - the thread may have woken itself up already, because
1285 // threadPaused() might have raised a blocked throwTo
1286 // exception, see maybePerformBlockedException().
1287
1288 #ifdef DEBUG
1289 traceThreadStatus(DEBUG_sched, t);
1290 #endif
1291 }
1292
1293 /* -----------------------------------------------------------------------------
1294 * Handle a thread that returned to the scheduler with ThreadFinished
1295 * -------------------------------------------------------------------------- */
1296
1297 static rtsBool
1298 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1299 {
1300 /* Need to check whether this was a main thread, and if so,
1301 * return with the return value.
1302 *
1303 * We also end up here if the thread kills itself with an
1304 * uncaught exception, see Exception.cmm.
1305 */
1306
1307 // blocked exceptions can now complete, even if the thread was in
1308 // blocked mode (see #2910).
1309 awakenBlockedExceptionQueue (cap, t);
1310
1311 //
1312 // Check whether the thread that just completed was a bound
1313 // thread, and if so return with the result.
1314 //
1315 // There is an assumption here that all thread completion goes
1316 // through this point; we need to make sure that if a thread
1317 // ends up in the ThreadKilled state, that it stays on the run
1318 // queue so it can be dealt with here.
1319 //
1320
1321 if (t->bound) {
1322
1323 if (t->bound != task->incall) {
1324 #if !defined(THREADED_RTS)
1325 // Must be a bound thread that is not the topmost one. Leave
1326 // it on the run queue until the stack has unwound to the
1327 // point where we can deal with this. Leaving it on the run
1328 // queue also ensures that the garbage collector knows about
1329 // this thread and its return value (it gets dropped from the
1330 // step->threads list so there's no other way to find it).
1331 appendToRunQueue(cap,t);
1332 return rtsFalse;
1333 #else
1334 // this cannot happen in the threaded RTS, because a
1335 // bound thread can only be run by the appropriate Task.
1336 barf("finished bound thread that isn't mine");
1337 #endif
1338 }
1339
1340 ASSERT(task->incall->tso == t);
1341
1342 if (t->what_next == ThreadComplete) {
1343 if (task->incall->ret) {
1344 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1345 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1346 }
1347 task->incall->rstat = Success;
1348 } else {
1349 if (task->incall->ret) {
1350 *(task->incall->ret) = NULL;
1351 }
1352 if (sched_state >= SCHED_INTERRUPTING) {
1353 if (heap_overflow) {
1354 task->incall->rstat = HeapExhausted;
1355 } else {
1356 task->incall->rstat = Interrupted;
1357 }
1358 } else {
1359 task->incall->rstat = Killed;
1360 }
1361 }
1362 #ifdef DEBUG
1363 removeThreadLabel((StgWord)task->incall->tso->id);
1364 #endif
1365
1366 // We no longer consider this thread and task to be bound to
1367 // each other. The TSO lives on until it is GC'd, but the
1368 // task is about to be released by the caller, and we don't
1369 // want anyone following the pointer from the TSO to the
1370 // defunct task (which might have already been
1371 // re-used). This was a real bug: the GC updated
1372 // tso->bound->tso which lead to a deadlock.
1373 t->bound = NULL;
1374 task->incall->tso = NULL;
1375
1376 return rtsTrue; // tells schedule() to return
1377 }
1378
1379 return rtsFalse;
1380 }
1381
1382 /* -----------------------------------------------------------------------------
1383 * Perform a heap census
1384 * -------------------------------------------------------------------------- */
1385
1386 static rtsBool
1387 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1388 {
1389 // When we have +RTS -i0 and we're heap profiling, do a census at
1390 // every GC. This lets us get repeatable runs for debugging.
1391 if (performHeapProfile ||
1392 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1393 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1394 return rtsTrue;
1395 } else {
1396 return rtsFalse;
1397 }
1398 }
1399
1400 /* -----------------------------------------------------------------------------
1401 * stopAllCapabilities()
1402 *
1403 * Stop all Haskell execution. This is used when we need to make some global
1404 * change to the system, such as altering the number of capabilities, or
1405 * forking.
1406 *
1407 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1408 * -------------------------------------------------------------------------- */
1409
1410 #if defined(THREADED_RTS)
1411 static void stopAllCapabilities (Capability **pCap, Task *task)
1412 {
1413 rtsBool was_syncing;
1414 SyncType prev_sync_type;
1415
1416 PendingSync sync = {
1417 .type = SYNC_OTHER,
1418 .idle = NULL,
1419 .task = task
1420 };
1421
1422 do {
1423 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1424 } while (was_syncing);
1425
1426 acquireAllCapabilities(*pCap,task);
1427
1428 pending_sync = 0;
1429 }
1430 #endif
1431
1432 /* -----------------------------------------------------------------------------
1433 * requestSync()
1434 *
1435 * Commence a synchronisation between all capabilities. Normally not called
1436 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1437 * has some special synchronisation requirements.
1438 *
1439 * Returns:
1440 * rtsFalse if we successfully got a sync
1441 * rtsTrue if there was another sync request in progress,
1442 * and we yielded to it. The value returned is the
1443 * type of the other sync request.
1444 * -------------------------------------------------------------------------- */
1445
1446 #if defined(THREADED_RTS)
1447 static rtsBool requestSync (
1448 Capability **pcap, Task *task, PendingSync *new_sync,
1449 SyncType *prev_sync_type)
1450 {
1451 PendingSync *sync;
1452
1453 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1454 (StgWord)NULL,
1455 (StgWord)new_sync);
1456
1457 if (sync != NULL)
1458 {
1459 // sync is valid until we have called yieldCapability().
1460 // After the sync is completed, we cannot read that struct any
1461 // more because it has been freed.
1462 *prev_sync_type = sync->type;
1463 do {
1464 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1465 sync->type);
1466 ASSERT(*pcap);
1467 yieldCapability(pcap,task,rtsTrue);
1468 sync = pending_sync;
1469 } while (sync != NULL);
1470
1471 // NOTE: task->cap might have changed now
1472 return rtsTrue;
1473 }
1474 else
1475 {
1476 return rtsFalse;
1477 }
1478 }
1479 #endif
1480
1481 /* -----------------------------------------------------------------------------
1482 * acquireAllCapabilities()
1483 *
1484 * Grab all the capabilities except the one we already hold. Used
1485 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1486 * before a fork (SYNC_OTHER).
1487 *
1488 * Only call this after requestSync(), otherwise a deadlock might
1489 * ensue if another thread is trying to synchronise.
1490 * -------------------------------------------------------------------------- */
1491
1492 #ifdef THREADED_RTS
1493 static void acquireAllCapabilities(Capability *cap, Task *task)
1494 {
1495 Capability *tmpcap;
1496 uint32_t i;
1497
1498 ASSERT(pending_sync != NULL);
1499 for (i=0; i < n_capabilities; i++) {
1500 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1501 i, n_capabilities);
1502 tmpcap = capabilities[i];
1503 if (tmpcap != cap) {
1504 // we better hope this task doesn't get migrated to
1505 // another Capability while we're waiting for this one.
1506 // It won't, because load balancing happens while we have
1507 // all the Capabilities, but even so it's a slightly
1508 // unsavoury invariant.
1509 task->cap = tmpcap;
1510 waitForCapability(&tmpcap, task);
1511 if (tmpcap->no != i) {
1512 barf("acquireAllCapabilities: got the wrong capability");
1513 }
1514 }
1515 }
1516 task->cap = cap;
1517 }
1518 #endif
1519
1520 /* -----------------------------------------------------------------------------
1521 * releaseAllcapabilities()
1522 *
1523 * Assuming this thread holds all the capabilities, release them all except for
1524 * the one passed in as cap.
1525 * -------------------------------------------------------------------------- */
1526
1527 #ifdef THREADED_RTS
1528 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1529 {
1530 uint32_t i;
1531
1532 for (i = 0; i < n; i++) {
1533 if (cap->no != i) {
1534 task->cap = capabilities[i];
1535 releaseCapability(capabilities[i]);
1536 }
1537 }
1538 task->cap = cap;
1539 }
1540 #endif
1541
1542 /* -----------------------------------------------------------------------------
1543 * Perform a garbage collection if necessary
1544 * -------------------------------------------------------------------------- */
1545
1546 static void
1547 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1548 rtsBool force_major)
1549 {
1550 Capability *cap = *pcap;
1551 rtsBool heap_census;
1552 uint32_t collect_gen;
1553 rtsBool major_gc;
1554 #ifdef THREADED_RTS
1555 uint32_t gc_type;
1556 uint32_t i;
1557 uint32_t need_idle;
1558 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1559 StgTSO *tso;
1560 rtsBool *idle_cap;
1561 #endif
1562
1563 if (sched_state == SCHED_SHUTTING_DOWN) {
1564 // The final GC has already been done, and the system is
1565 // shutting down. We'll probably deadlock if we try to GC
1566 // now.
1567 return;
1568 }
1569
1570 heap_census = scheduleNeedHeapProfile(rtsTrue);
1571
1572 // Figure out which generation we are collecting, so that we can
1573 // decide whether this is a parallel GC or not.
1574 collect_gen = calcNeeded(force_major || heap_census, NULL);
1575 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1576
1577 #ifdef THREADED_RTS
1578 if (sched_state < SCHED_INTERRUPTING
1579 && RtsFlags.ParFlags.parGcEnabled
1580 && collect_gen >= RtsFlags.ParFlags.parGcGen
1581 && ! oldest_gen->mark)
1582 {
1583 gc_type = SYNC_GC_PAR;
1584 } else {
1585 gc_type = SYNC_GC_SEQ;
1586 }
1587
1588 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1589 need_idle = stg_max(0, enabled_capabilities -
1590 RtsFlags.ParFlags.parGcThreads);
1591 } else {
1592 need_idle = 0;
1593 }
1594
1595 // In order to GC, there must be no threads running Haskell code.
1596 // Therefore, the GC thread needs to hold *all* the capabilities,
1597 // and release them after the GC has completed.
1598 //
1599 // This seems to be the simplest way: previous attempts involved
1600 // making all the threads with capabilities give up their
1601 // capabilities and sleep except for the *last* one, which
1602 // actually did the GC. But it's quite hard to arrange for all
1603 // the other tasks to sleep and stay asleep.
1604 //
1605
1606 /* Other capabilities are prevented from running yet more Haskell
1607 threads if pending_sync is set. Tested inside
1608 yieldCapability() and releaseCapability() in Capability.c */
1609
1610 PendingSync sync = {
1611 .type = gc_type,
1612 .idle = NULL,
1613 .task = task
1614 };
1615
1616 {
1617 SyncType prev_sync = 0;
1618 rtsBool was_syncing;
1619 do {
1620 // We need an array of size n_capabilities, but since this may
1621 // change each time around the loop we must allocate it afresh.
1622 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1623 sizeof(rtsBool),
1624 "scheduleDoGC");
1625 sync.idle = idle_cap;
1626
1627 // When using +RTS -qn, we need some capabilities to be idle during
1628 // GC. The best bet is to choose some inactive ones, so we look for
1629 // those first:
1630 uint32_t n_idle = need_idle;
1631 for (i=0; i < n_capabilities; i++) {
1632 if (capabilities[i]->disabled) {
1633 idle_cap[i] = rtsTrue;
1634 } else if (n_idle > 0 &&
1635 capabilities[i]->running_task == NULL) {
1636 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1637 n_idle--;
1638 idle_cap[i] = rtsTrue;
1639 } else {
1640 idle_cap[i] = rtsFalse;
1641 }
1642 }
1643 // If we didn't find enough inactive capabilities, just pick some
1644 // more to be idle.
1645 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1646 if (!idle_cap[i] && i != cap->no) {
1647 idle_cap[i] = rtsTrue;
1648 n_idle--;
1649 }
1650 }
1651 ASSERT(n_idle == 0);
1652
1653 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1654 cap = *pcap;
1655 if (was_syncing) {
1656 stgFree(idle_cap);
1657 }
1658 if (was_syncing &&
1659 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1660 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1661 // someone else had a pending sync request for a GC, so
1662 // let's assume GC has been done and we don't need to GC
1663 // again.
1664 // Exception to this: if SCHED_INTERRUPTING, then we still
1665 // need to do the final GC.
1666 return;
1667 }
1668 if (sched_state == SCHED_SHUTTING_DOWN) {
1669 // The scheduler might now be shutting down. We tested
1670 // this above, but it might have become true since then as
1671 // we yielded the capability in requestSync().
1672 return;
1673 }
1674 } while (was_syncing);
1675 }
1676
1677 stat_startGCSync(gc_threads[cap->no]);
1678
1679 #ifdef DEBUG
1680 unsigned int old_n_capabilities = n_capabilities;
1681 #endif
1682
1683 interruptAllCapabilities();
1684
1685 // The final shutdown GC is always single-threaded, because it's
1686 // possible that some of the Capabilities have no worker threads.
1687
1688 if (gc_type == SYNC_GC_SEQ) {
1689 traceEventRequestSeqGc(cap);
1690 } else {
1691 traceEventRequestParGc(cap);
1692 }
1693
1694 if (gc_type == SYNC_GC_SEQ) {
1695 // single-threaded GC: grab all the capabilities
1696 acquireAllCapabilities(cap,task);
1697 }
1698 else
1699 {
1700 // If we are load-balancing collections in this
1701 // generation, then we require all GC threads to participate
1702 // in the collection. Otherwise, we only require active
1703 // threads to participate, and we set gc_threads[i]->idle for
1704 // any idle capabilities. The rationale here is that waking
1705 // up an idle Capability takes much longer than just doing any
1706 // GC work on its behalf.
1707
1708 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1709 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1710 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1711 {
1712 for (i=0; i < n_capabilities; i++) {
1713 if (capabilities[i]->disabled) {
1714 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1715 if (idle_cap[i]) {
1716 n_idle_caps++;
1717 }
1718 } else {
1719 if (i != cap->no && idle_cap[i]) {
1720 Capability *tmpcap = capabilities[i];
1721 task->cap = tmpcap;
1722 waitForCapability(&tmpcap, task);
1723 n_idle_caps++;
1724 }
1725 }
1726 }
1727 }
1728 else
1729 {
1730 for (i=0; i < n_capabilities; i++) {
1731 if (capabilities[i]->disabled) {
1732 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1733 if (idle_cap[i]) {
1734 n_idle_caps++;
1735 }
1736 } else if (i != cap->no &&
1737 capabilities[i]->idle >=
1738 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1739 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1740 if (idle_cap[i]) {
1741 n_idle_caps++;
1742 } else {
1743 n_failed_trygrab_idles++;
1744 }
1745 }
1746 }
1747 }
1748 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1749
1750 // We set the gc_thread[i]->idle flag if that
1751 // capability/thread is not participating in this collection.
1752 // We also keep a local record of which capabilities are idle
1753 // in idle_cap[], because scheduleDoGC() is re-entrant:
1754 // another thread might start a GC as soon as we've finished
1755 // this one, and thus the gc_thread[]->idle flags are invalid
1756 // as soon as we release any threads after GC. Getting this
1757 // wrong leads to a rare and hard to debug deadlock!
1758
1759 for (i=0; i < n_capabilities; i++) {
1760 gc_threads[i]->idle = idle_cap[i];
1761 capabilities[i]->idle++;
1762 }
1763
1764 // For all capabilities participating in this GC, wait until
1765 // they have stopped mutating and are standing by for GC.
1766 waitForGcThreads(cap);
1767
1768 #if defined(THREADED_RTS)
1769 // Stable point where we can do a global check on our spark counters
1770 ASSERT(checkSparkCountInvariant());
1771 #endif
1772 }
1773
1774 #endif
1775
1776 IF_DEBUG(scheduler, printAllThreads());
1777
1778 delete_threads_and_gc:
1779 /*
1780 * We now have all the capabilities; if we're in an interrupting
1781 * state, then we should take the opportunity to delete all the
1782 * threads in the system.
1783 * Checking for major_gc ensures that the last GC is major.
1784 */
1785 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1786 deleteAllThreads(cap);
1787 #if defined(THREADED_RTS)
1788 // Discard all the sparks from every Capability. Why?
1789 // They'll probably be GC'd anyway since we've killed all the
1790 // threads. It just avoids the GC having to do any work to
1791 // figure out that any remaining sparks are garbage.
1792 for (i = 0; i < n_capabilities; i++) {
1793 capabilities[i]->spark_stats.gcd +=
1794 sparkPoolSize(capabilities[i]->sparks);
1795 // No race here since all Caps are stopped.
1796 discardSparksCap(capabilities[i]);
1797 }
1798 #endif
1799 sched_state = SCHED_SHUTTING_DOWN;
1800 }
1801
1802 /*
1803 * When there are disabled capabilities, we want to migrate any
1804 * threads away from them. Normally this happens in the
1805 * scheduler's loop, but only for unbound threads - it's really
1806 * hard for a bound thread to migrate itself. So we have another
1807 * go here.
1808 */
1809 #if defined(THREADED_RTS)
1810 for (i = enabled_capabilities; i < n_capabilities; i++) {
1811 Capability *tmp_cap, *dest_cap;
1812 tmp_cap = capabilities[i];
1813 ASSERT(tmp_cap->disabled);
1814 if (i != cap->no) {
1815 dest_cap = capabilities[i % enabled_capabilities];
1816 while (!emptyRunQueue(tmp_cap)) {
1817 tso = popRunQueue(tmp_cap);
1818 migrateThread(tmp_cap, tso, dest_cap);
1819 if (tso->bound) {
1820 traceTaskMigrate(tso->bound->task,
1821 tso->bound->task->cap,
1822 dest_cap);
1823 tso->bound->task->cap = dest_cap;
1824 }
1825 }
1826 }
1827 }
1828 #endif
1829
1830 #if defined(THREADED_RTS)
1831 // reset pending_sync *before* GC, so that when the GC threads
1832 // emerge they don't immediately re-enter the GC.
1833 pending_sync = 0;
1834 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1835 #else
1836 GarbageCollect(collect_gen, heap_census, 0, cap);
1837 #endif
1838
1839 traceSparkCounters(cap);
1840
1841 switch (recent_activity) {
1842 case ACTIVITY_INACTIVE:
1843 if (force_major) {
1844 // We are doing a GC because the system has been idle for a
1845 // timeslice and we need to check for deadlock. Record the
1846 // fact that we've done a GC and turn off the timer signal;
1847 // it will get re-enabled if we run any threads after the GC.
1848 recent_activity = ACTIVITY_DONE_GC;
1849 #ifndef PROFILING
1850 stopTimer();
1851 #endif
1852 break;
1853 }
1854 // fall through...
1855
1856 case ACTIVITY_MAYBE_NO:
1857 // the GC might have taken long enough for the timer to set
1858 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1859 // but we aren't necessarily deadlocked:
1860 recent_activity = ACTIVITY_YES;
1861 break;
1862
1863 case ACTIVITY_DONE_GC:
1864 // If we are actually active, the scheduler will reset the
1865 // recent_activity flag and re-enable the timer.
1866 break;
1867 }
1868
1869 #if defined(THREADED_RTS)
1870 // Stable point where we can do a global check on our spark counters
1871 ASSERT(checkSparkCountInvariant());
1872 #endif
1873
1874 // The heap census itself is done during GarbageCollect().
1875 if (heap_census) {
1876 performHeapProfile = rtsFalse;
1877 }
1878
1879 #if defined(THREADED_RTS)
1880
1881 // If n_capabilities has changed during GC, we're in trouble.
1882 ASSERT(n_capabilities == old_n_capabilities);
1883
1884 if (gc_type == SYNC_GC_PAR)
1885 {
1886 releaseGCThreads(cap);
1887 for (i = 0; i < n_capabilities; i++) {
1888 if (i != cap->no) {
1889 if (idle_cap[i]) {
1890 ASSERT(capabilities[i]->running_task == task);
1891 task->cap = capabilities[i];
1892 releaseCapability(capabilities[i]);
1893 } else {
1894 ASSERT(capabilities[i]->running_task != task);
1895 }
1896 }
1897 }
1898 task->cap = cap;
1899 }
1900 #endif
1901
1902 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1903 // GC set the heap_overflow flag, so we should proceed with
1904 // an orderly shutdown now. Ultimately we want the main
1905 // thread to return to its caller with HeapExhausted, at which
1906 // point the caller should call hs_exit(). The first step is
1907 // to delete all the threads.
1908 //
1909 // Another way to do this would be to raise an exception in
1910 // the main thread, which we really should do because it gives
1911 // the program a chance to clean up. But how do we find the
1912 // main thread? It should presumably be the same one that
1913 // gets ^C exceptions, but that's all done on the Haskell side
1914 // (GHC.TopHandler).
1915 sched_state = SCHED_INTERRUPTING;
1916 goto delete_threads_and_gc;
1917 }
1918
1919 #ifdef SPARKBALANCE
1920 /* JB
1921 Once we are all together... this would be the place to balance all
1922 spark pools. No concurrent stealing or adding of new sparks can
1923 occur. Should be defined in Sparks.c. */
1924 balanceSparkPoolsCaps(n_capabilities, capabilities);
1925 #endif
1926
1927 #if defined(THREADED_RTS)
1928 stgFree(idle_cap);
1929
1930 if (gc_type == SYNC_GC_SEQ) {
1931 // release our stash of capabilities.
1932 releaseAllCapabilities(n_capabilities, cap, task);
1933 }
1934 #endif
1935
1936 return;
1937 }
1938
1939 /* ---------------------------------------------------------------------------
1940 * Singleton fork(). Do not copy any running threads.
1941 * ------------------------------------------------------------------------- */
1942
1943 pid_t
1944 forkProcess(HsStablePtr *entry
1945 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1946 STG_UNUSED
1947 #endif
1948 )
1949 {
1950 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1951 pid_t pid;
1952 StgTSO* t,*next;
1953 Capability *cap;
1954 uint32_t g;
1955 Task *task = NULL;
1956 uint32_t i;
1957
1958 debugTrace(DEBUG_sched, "forking!");
1959
1960 task = newBoundTask();
1961
1962 cap = NULL;
1963 waitForCapability(&cap, task);
1964
1965 #ifdef THREADED_RTS
1966 stopAllCapabilities(&cap, task);
1967 #endif
1968
1969 // no funny business: hold locks while we fork, otherwise if some
1970 // other thread is holding a lock when the fork happens, the data
1971 // structure protected by the lock will forever be in an
1972 // inconsistent state in the child. See also #1391.
1973 ACQUIRE_LOCK(&sched_mutex);
1974 ACQUIRE_LOCK(&sm_mutex);
1975 ACQUIRE_LOCK(&stable_mutex);
1976 ACQUIRE_LOCK(&task->lock);
1977
1978 for (i=0; i < n_capabilities; i++) {
1979 ACQUIRE_LOCK(&capabilities[i]->lock);
1980 }
1981
1982 #ifdef THREADED_RTS
1983 ACQUIRE_LOCK(&all_tasks_mutex);
1984 #endif
1985
1986 stopTimer(); // See #4074
1987
1988 #if defined(TRACING)
1989 flushEventLog(); // so that child won't inherit dirty file buffers
1990 #endif
1991
1992 pid = fork();
1993
1994 if (pid) { // parent
1995
1996 startTimer(); // #4074
1997
1998 RELEASE_LOCK(&sched_mutex);
1999 RELEASE_LOCK(&sm_mutex);
2000 RELEASE_LOCK(&stable_mutex);
2001 RELEASE_LOCK(&task->lock);
2002
2003 for (i=0; i < n_capabilities; i++) {
2004 releaseCapability_(capabilities[i],rtsFalse);
2005 RELEASE_LOCK(&capabilities[i]->lock);
2006 }
2007
2008 #ifdef THREADED_RTS
2009 RELEASE_LOCK(&all_tasks_mutex);
2010 #endif
2011
2012 boundTaskExiting(task);
2013
2014 // just return the pid
2015 return pid;
2016
2017 } else { // child
2018
2019 #if defined(THREADED_RTS)
2020 initMutex(&sched_mutex);
2021 initMutex(&sm_mutex);
2022 initMutex(&stable_mutex);
2023 initMutex(&task->lock);
2024
2025 for (i=0; i < n_capabilities; i++) {
2026 initMutex(&capabilities[i]->lock);
2027 }
2028
2029 initMutex(&all_tasks_mutex);
2030 #endif
2031
2032 #ifdef TRACING
2033 resetTracing();
2034 #endif
2035
2036 // Now, all OS threads except the thread that forked are
2037 // stopped. We need to stop all Haskell threads, including
2038 // those involved in foreign calls. Also we need to delete
2039 // all Tasks, because they correspond to OS threads that are
2040 // now gone.
2041
2042 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2043 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2044 next = t->global_link;
2045 // don't allow threads to catch the ThreadKilled
2046 // exception, but we do want to raiseAsync() because these
2047 // threads may be evaluating thunks that we need later.
2048 deleteThread_(t->cap,t);
2049
2050 // stop the GC from updating the InCall to point to
2051 // the TSO. This is only necessary because the
2052 // OSThread bound to the TSO has been killed, and
2053 // won't get a chance to exit in the usual way (see
2054 // also scheduleHandleThreadFinished).
2055 t->bound = NULL;
2056 }
2057 }
2058
2059 discardTasksExcept(task);
2060
2061 for (i=0; i < n_capabilities; i++) {
2062 cap = capabilities[i];
2063
2064 // Empty the run queue. It seems tempting to let all the
2065 // killed threads stay on the run queue as zombies to be
2066 // cleaned up later, but some of them may correspond to
2067 // bound threads for which the corresponding Task does not
2068 // exist.
2069 truncateRunQueue(cap);
2070 cap->n_run_queue = 0;
2071
2072 // Any suspended C-calling Tasks are no more, their OS threads
2073 // don't exist now:
2074 cap->suspended_ccalls = NULL;
2075 cap->n_suspended_ccalls = 0;
2076
2077 #if defined(THREADED_RTS)
2078 // Wipe our spare workers list, they no longer exist. New
2079 // workers will be created if necessary.
2080 cap->spare_workers = NULL;
2081 cap->n_spare_workers = 0;
2082 cap->returning_tasks_hd = NULL;
2083 cap->returning_tasks_tl = NULL;
2084 cap->n_returning_tasks = 0;
2085 #endif
2086
2087 // Release all caps except 0, we'll use that for starting
2088 // the IO manager and running the client action below.
2089 if (cap->no != 0) {
2090 task->cap = cap;
2091 releaseCapability(cap);
2092 }
2093 }
2094 cap = capabilities[0];
2095 task->cap = cap;
2096
2097 // Empty the threads lists. Otherwise, the garbage
2098 // collector may attempt to resurrect some of these threads.
2099 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2100 generations[g].threads = END_TSO_QUEUE;
2101 }
2102
2103 // On Unix, all timers are reset in the child, so we need to start
2104 // the timer again.
2105 initTimer();
2106 startTimer();
2107
2108 // TODO: need to trace various other things in the child
2109 // like startup event, capabilities, process info etc
2110 traceTaskCreate(task, cap);
2111
2112 #if defined(THREADED_RTS)
2113 ioManagerStartCap(&cap);
2114 #endif
2115
2116 rts_evalStableIO(&cap, entry, NULL); // run the action
2117 rts_checkSchedStatus("forkProcess",cap);
2118
2119 rts_unlock(cap);
2120 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2121 }
2122 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2123 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2124 #endif
2125 }
2126
2127 /* ---------------------------------------------------------------------------
2128 * Changing the number of Capabilities
2129 *
2130 * Changing the number of Capabilities is very tricky! We can only do
2131 * it with the system fully stopped, so we do a full sync with
2132 * requestSync(SYNC_OTHER) and grab all the capabilities.
2133 *
2134 * Then we resize the appropriate data structures, and update all
2135 * references to the old data structures which have now moved.
2136 * Finally we release the Capabilities we are holding, and start
2137 * worker Tasks on the new Capabilities we created.
2138 *
2139 * ------------------------------------------------------------------------- */
2140
2141 void
2142 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2143 {
2144 #if !defined(THREADED_RTS)
2145 if (new_n_capabilities != 1) {
2146 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2147 }
2148 return;
2149 #elif defined(NOSMP)
2150 if (new_n_capabilities != 1) {
2151 errorBelch("setNumCapabilities: not supported on this platform");
2152 }
2153 return;
2154 #else
2155 Task *task;
2156 Capability *cap;
2157 uint32_t n;
2158 Capability *old_capabilities = NULL;
2159 uint32_t old_n_capabilities = n_capabilities;
2160
2161 if (new_n_capabilities == enabled_capabilities) return;
2162
2163 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2164 enabled_capabilities, new_n_capabilities);
2165
2166 cap = rts_lock();
2167 task = cap->running_task;
2168
2169 stopAllCapabilities(&cap, task);
2170
2171 if (new_n_capabilities < enabled_capabilities)
2172 {
2173 // Reducing the number of capabilities: we do not actually
2174 // remove the extra capabilities, we just mark them as
2175 // "disabled". This has the following effects:
2176 //
2177 // - threads on a disabled capability are migrated away by the
2178 // scheduler loop
2179 //
2180 // - disabled capabilities do not participate in GC
2181 // (see scheduleDoGC())
2182 //
2183 // - No spark threads are created on this capability
2184 // (see scheduleActivateSpark())
2185 //
2186 // - We do not attempt to migrate threads *to* a disabled
2187 // capability (see schedulePushWork()).
2188 //
2189 // but in other respects, a disabled capability remains
2190 // alive. Threads may be woken up on a disabled capability,
2191 // but they will be immediately migrated away.
2192 //
2193 // This approach is much easier than trying to actually remove
2194 // the capability; we don't have to worry about GC data
2195 // structures, the nursery, etc.
2196 //
2197 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2198 capabilities[n]->disabled = rtsTrue;
2199 traceCapDisable(capabilities[n]);
2200 }
2201 enabled_capabilities = new_n_capabilities;
2202 }
2203 else
2204 {
2205 // Increasing the number of enabled capabilities.
2206 //
2207 // enable any disabled capabilities, up to the required number
2208 for (n = enabled_capabilities;
2209 n < new_n_capabilities && n < n_capabilities; n++) {
2210 capabilities[n]->disabled = rtsFalse;
2211 traceCapEnable(capabilities[n]);
2212 }
2213 enabled_capabilities = n;
2214
2215 if (new_n_capabilities > n_capabilities) {
2216 #if defined(TRACING)
2217 // Allocate eventlog buffers for the new capabilities. Note this
2218 // must be done before calling moreCapabilities(), because that
2219 // will emit events about creating the new capabilities and adding
2220 // them to existing capsets.
2221 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2222 #endif
2223
2224 // Resize the capabilities array
2225 // NB. after this, capabilities points somewhere new. Any pointers
2226 // of type (Capability *) are now invalid.
2227 moreCapabilities(n_capabilities, new_n_capabilities);
2228
2229 // Resize and update storage manager data structures
2230 storageAddCapabilities(n_capabilities, new_n_capabilities);
2231 }
2232 }
2233
2234 // update n_capabilities before things start running
2235 if (new_n_capabilities > n_capabilities) {
2236 n_capabilities = enabled_capabilities = new_n_capabilities;
2237 }
2238
2239 // We're done: release the original Capabilities
2240 releaseAllCapabilities(old_n_capabilities, cap,task);
2241
2242 // We can't free the old array until now, because we access it
2243 // while updating pointers in updateCapabilityRefs().
2244 if (old_capabilities) {
2245 stgFree(old_capabilities);
2246 }
2247
2248 // Notify IO manager that the number of capabilities has changed.
2249 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2250
2251 rts_unlock(cap);
2252
2253 #endif // THREADED_RTS
2254 }
2255
2256
2257
2258 /* ---------------------------------------------------------------------------
2259 * Delete all the threads in the system
2260 * ------------------------------------------------------------------------- */
2261
2262 static void
2263 deleteAllThreads ( Capability *cap )
2264 {
2265 // NOTE: only safe to call if we own all capabilities.
2266
2267 StgTSO* t, *next;
2268 uint32_t g;
2269
2270 debugTrace(DEBUG_sched,"deleting all threads");
2271 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2272 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2273 next = t->global_link;
2274 deleteThread(cap,t);
2275 }
2276 }
2277
2278 // The run queue now contains a bunch of ThreadKilled threads. We
2279 // must not throw these away: the main thread(s) will be in there
2280 // somewhere, and the main scheduler loop has to deal with it.
2281 // Also, the run queue is the only thing keeping these threads from
2282 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2283
2284 #if !defined(THREADED_RTS)
2285 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2286 ASSERT(sleeping_queue == END_TSO_QUEUE);
2287 #endif
2288 }
2289
2290 /* -----------------------------------------------------------------------------
2291 Managing the suspended_ccalls list.
2292 Locks required: sched_mutex
2293 -------------------------------------------------------------------------- */
2294
2295 STATIC_INLINE void
2296 suspendTask (Capability *cap, Task *task)
2297 {
2298 InCall *incall;
2299
2300 incall = task->incall;
2301 ASSERT(incall->next == NULL && incall->prev == NULL);
2302 incall->next = cap->suspended_ccalls;
2303 incall->prev = NULL;
2304 if (cap->suspended_ccalls) {
2305 cap->suspended_ccalls->prev = incall;
2306 }
2307 cap->suspended_ccalls = incall;
2308 cap->n_suspended_ccalls++;
2309 }
2310
2311 STATIC_INLINE void
2312 recoverSuspendedTask (Capability *cap, Task *task)
2313 {
2314 InCall *incall;
2315
2316 incall = task->incall;
2317 if (incall->prev) {
2318 incall->prev->next = incall->next;
2319 } else {
2320 ASSERT(cap->suspended_ccalls == incall);
2321 cap->suspended_ccalls = incall->next;
2322 }
2323 if (incall->next) {
2324 incall->next->prev = incall->prev;
2325 }
2326 incall->next = incall->prev = NULL;
2327 cap->n_suspended_ccalls--;
2328 }
2329
2330 /* ---------------------------------------------------------------------------
2331 * Suspending & resuming Haskell threads.
2332 *
2333 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2334 * its capability before calling the C function. This allows another
2335 * task to pick up the capability and carry on running Haskell
2336 * threads. It also means that if the C call blocks, it won't lock
2337 * the whole system.
2338 *
2339 * The Haskell thread making the C call is put to sleep for the
2340 * duration of the call, on the suspended_ccalling_threads queue. We
2341 * give out a token to the task, which it can use to resume the thread
2342 * on return from the C function.
2343 *
2344 * If this is an interruptible C call, this means that the FFI call may be
2345 * unceremoniously terminated and should be scheduled on an
2346 * unbound worker thread.
2347 * ------------------------------------------------------------------------- */
2348
2349 void *
2350 suspendThread (StgRegTable *reg, rtsBool interruptible)
2351 {
2352 Capability *cap;
2353 int saved_errno;
2354 StgTSO *tso;
2355 Task *task;
2356 #if mingw32_HOST_OS
2357 StgWord32 saved_winerror;
2358 #endif
2359
2360 saved_errno = errno;
2361 #if mingw32_HOST_OS
2362 saved_winerror = GetLastError();
2363 #endif
2364
2365 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2366 */
2367 cap = regTableToCapability(reg);
2368
2369 task = cap->running_task;
2370 tso = cap->r.rCurrentTSO;
2371
2372 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2373
2374 // XXX this might not be necessary --SDM
2375 tso->what_next = ThreadRunGHC;
2376
2377 threadPaused(cap,tso);
2378
2379 if (interruptible) {
2380 tso->why_blocked = BlockedOnCCall_Interruptible;
2381 } else {
2382 tso->why_blocked = BlockedOnCCall;
2383 }
2384
2385 // Hand back capability
2386 task->incall->suspended_tso = tso;
2387 task->incall->suspended_cap = cap;
2388
2389 // Otherwise allocate() will write to invalid memory.
2390 cap->r.rCurrentTSO = NULL;
2391
2392 ACQUIRE_LOCK(&cap->lock);
2393
2394 suspendTask(cap,task);
2395 cap->in_haskell = rtsFalse;
2396 releaseCapability_(cap,rtsFalse);
2397
2398 RELEASE_LOCK(&cap->lock);
2399
2400 errno = saved_errno;
2401 #if mingw32_HOST_OS
2402 SetLastError(saved_winerror);
2403 #endif
2404 return task;
2405 }
2406
2407 StgRegTable *
2408 resumeThread (void *task_)
2409 {
2410 StgTSO *tso;
2411 InCall *incall;
2412 Capability *cap;
2413 Task *task = task_;
2414 int saved_errno;
2415 #if mingw32_HOST_OS
2416 StgWord32 saved_winerror;
2417 #endif
2418
2419 saved_errno = errno;
2420 #if mingw32_HOST_OS
2421 saved_winerror = GetLastError();
2422 #endif
2423
2424 incall = task->incall;
2425 cap = incall->suspended_cap;
2426 task->cap = cap;
2427
2428 // Wait for permission to re-enter the RTS with the result.
2429 waitForCapability(&cap,task);
2430 // we might be on a different capability now... but if so, our
2431 // entry on the suspended_ccalls list will also have been
2432 // migrated.
2433
2434 // Remove the thread from the suspended list
2435 recoverSuspendedTask(cap,task);
2436
2437 tso = incall->suspended_tso;
2438 incall->suspended_tso = NULL;
2439 incall->suspended_cap = NULL;
2440 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2441
2442 traceEventRunThread(cap, tso);
2443
2444 /* Reset blocking status */
2445 tso->why_blocked = NotBlocked;
2446
2447 if ((tso->flags & TSO_BLOCKEX) == 0) {
2448 // avoid locking the TSO if we don't have to
2449 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2450 maybePerformBlockedException(cap,tso);
2451 }
2452 }
2453
2454 cap->r.rCurrentTSO = tso;
2455 cap->in_haskell = rtsTrue;
2456 errno = saved_errno;
2457 #if mingw32_HOST_OS
2458 SetLastError(saved_winerror);
2459 #endif
2460
2461 /* We might have GC'd, mark the TSO dirty again */
2462 dirty_TSO(cap,tso);
2463 dirty_STACK(cap,tso->stackobj);
2464
2465 IF_DEBUG(sanity, checkTSO(tso));
2466
2467 return &cap->r;
2468 }
2469
2470 /* ---------------------------------------------------------------------------
2471 * scheduleThread()
2472 *
2473 * scheduleThread puts a thread on the end of the runnable queue.
2474 * This will usually be done immediately after a thread is created.
2475 * The caller of scheduleThread must create the thread using e.g.
2476 * createThread and push an appropriate closure
2477 * on this thread's stack before the scheduler is invoked.
2478 * ------------------------------------------------------------------------ */
2479
2480 void
2481 scheduleThread(Capability *cap, StgTSO *tso)
2482 {
2483 // The thread goes at the *end* of the run-queue, to avoid possible
2484 // starvation of any threads already on the queue.
2485 appendToRunQueue(cap,tso);
2486 }
2487
2488 void
2489 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2490 {
2491 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2492 // move this thread from now on.
2493 #if defined(THREADED_RTS)
2494 cpu %= enabled_capabilities;
2495 if (cpu == cap->no) {
2496 appendToRunQueue(cap,tso);
2497 } else {
2498 migrateThread(cap, tso, capabilities[cpu]);
2499 }
2500 #else
2501 appendToRunQueue(cap,tso);
2502 #endif
2503 }
2504
2505 void
2506 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2507 {
2508 Task *task;
2509 DEBUG_ONLY( StgThreadID id );
2510 Capability *cap;
2511
2512 cap = *pcap;
2513
2514 // We already created/initialised the Task
2515 task = cap->running_task;
2516
2517 // This TSO is now a bound thread; make the Task and TSO
2518 // point to each other.
2519 tso->bound = task->incall;
2520 tso->cap = cap;
2521
2522 task->incall->tso = tso;
2523 task->incall->ret = ret;
2524 task->incall->rstat = NoStatus;
2525
2526 appendToRunQueue(cap,tso);
2527
2528 DEBUG_ONLY( id = tso->id );
2529 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2530
2531 cap = schedule(cap,task);
2532
2533 ASSERT(task->incall->rstat != NoStatus);
2534 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2535
2536 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2537 *pcap = cap;
2538 }
2539
2540 /* ----------------------------------------------------------------------------
2541 * Starting Tasks
2542 * ------------------------------------------------------------------------- */
2543
2544 #if defined(THREADED_RTS)
2545 void scheduleWorker (Capability *cap, Task *task)
2546 {
2547 // schedule() runs without a lock.
2548 cap = schedule(cap,task);
2549
2550 // On exit from schedule(), we have a Capability, but possibly not
2551 // the same one we started with.
2552
2553 // During shutdown, the requirement is that after all the
2554 // Capabilities are shut down, all workers that are shutting down
2555 // have finished workerTaskStop(). This is why we hold on to
2556 // cap->lock until we've finished workerTaskStop() below.
2557 //
2558 // There may be workers still involved in foreign calls; those
2559 // will just block in waitForCapability() because the
2560 // Capability has been shut down.
2561 //
2562 ACQUIRE_LOCK(&cap->lock);
2563 releaseCapability_(cap,rtsFalse);
2564 workerTaskStop(task);
2565 RELEASE_LOCK(&cap->lock);
2566 }
2567 #endif
2568
2569 /* ---------------------------------------------------------------------------
2570 * Start new worker tasks on Capabilities from--to
2571 * -------------------------------------------------------------------------- */
2572
2573 static void
2574 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2575 {
2576 #if defined(THREADED_RTS)
2577 uint32_t i;
2578 Capability *cap;
2579
2580 for (i = from; i < to; i++) {
2581 cap = capabilities[i];
2582 ACQUIRE_LOCK(&cap->lock);
2583 startWorkerTask(cap);
2584 RELEASE_LOCK(&cap->lock);
2585 }
2586 #endif
2587 }
2588
2589 /* ---------------------------------------------------------------------------
2590 * initScheduler()
2591 *
2592 * Initialise the scheduler. This resets all the queues - if the
2593 * queues contained any threads, they'll be garbage collected at the
2594 * next pass.
2595 *
2596 * ------------------------------------------------------------------------ */
2597
2598 void
2599 initScheduler(void)
2600 {
2601 #if !defined(THREADED_RTS)
2602 blocked_queue_hd = END_TSO_QUEUE;
2603 blocked_queue_tl = END_TSO_QUEUE;
2604 sleeping_queue = END_TSO_QUEUE;
2605 #endif
2606
2607 sched_state = SCHED_RUNNING;
2608 recent_activity = ACTIVITY_YES;
2609
2610 #if defined(THREADED_RTS)
2611 /* Initialise the mutex and condition variables used by
2612 * the scheduler. */
2613 initMutex(&sched_mutex);
2614 #endif
2615
2616 ACQUIRE_LOCK(&sched_mutex);
2617
2618 /* A capability holds the state a native thread needs in
2619 * order to execute STG code. At least one capability is
2620 * floating around (only THREADED_RTS builds have more than one).
2621 */
2622 initCapabilities();
2623
2624 initTaskManager();
2625
2626 /*
2627 * Eagerly start one worker to run each Capability, except for
2628 * Capability 0. The idea is that we're probably going to start a
2629 * bound thread on Capability 0 pretty soon, so we don't want a
2630 * worker task hogging it.
2631 */
2632 startWorkerTasks(1, n_capabilities);
2633
2634 RELEASE_LOCK(&sched_mutex);
2635
2636 }
2637
2638 void
2639 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2640 /* see Capability.c, shutdownCapability() */
2641 {
2642 Task *task = NULL;
2643
2644 task = newBoundTask();
2645
2646 // If we haven't killed all the threads yet, do it now.
2647 if (sched_state < SCHED_SHUTTING_DOWN) {
2648 sched_state = SCHED_INTERRUPTING;
2649 Capability *cap = task->cap;
2650 waitForCapability(&cap,task);
2651 scheduleDoGC(&cap,task,rtsTrue);
2652 ASSERT(task->incall->tso == NULL);
2653 releaseCapability(cap);
2654 }
2655 sched_state = SCHED_SHUTTING_DOWN;
2656
2657 shutdownCapabilities(task, wait_foreign);
2658
2659 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2660 // n_failed_trygrab_idles, n_idle_caps);
2661
2662 boundTaskExiting(task);
2663 }
2664
2665 void
2666 freeScheduler( void )
2667 {
2668 uint32_t still_running;
2669
2670 ACQUIRE_LOCK(&sched_mutex);
2671 still_running = freeTaskManager();
2672 // We can only free the Capabilities if there are no Tasks still
2673 // running. We might have a Task about to return from a foreign
2674 // call into waitForCapability(), for example (actually,
2675 // this should be the *only* thing that a still-running Task can
2676 // do at this point, and it will block waiting for the
2677 // Capability).
2678 if (still_running == 0) {
2679 freeCapabilities();
2680 }
2681 RELEASE_LOCK(&sched_mutex);
2682 #if defined(THREADED_RTS)
2683 closeMutex(&sched_mutex);
2684 #endif
2685 }
2686
2687 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2688 void *user USED_IF_NOT_THREADS)
2689 {
2690 #if !defined(THREADED_RTS)
2691 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2692 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2693 evac(user, (StgClosure **)(void *)&sleeping_queue);
2694 #endif
2695 }
2696
2697 /* -----------------------------------------------------------------------------
2698 performGC
2699
2700 This is the interface to the garbage collector from Haskell land.
2701 We provide this so that external C code can allocate and garbage
2702 collect when called from Haskell via _ccall_GC.
2703 -------------------------------------------------------------------------- */
2704
2705 static void
2706 performGC_(rtsBool force_major)
2707 {
2708 Task *task;
2709 Capability *cap = NULL;
2710
2711 // We must grab a new Task here, because the existing Task may be
2712 // associated with a particular Capability, and chained onto the
2713 // suspended_ccalls queue.
2714 task = newBoundTask();
2715
2716 // TODO: do we need to traceTask*() here?
2717
2718 waitForCapability(&cap,task);
2719 scheduleDoGC(&cap,task,force_major);
2720 releaseCapability(cap);
2721 boundTaskExiting(task);
2722 }
2723
2724 void
2725 performGC(void)
2726 {
2727 performGC_(rtsFalse);
2728 }
2729
2730 void
2731 performMajorGC(void)
2732 {
2733 performGC_(rtsTrue);
2734 }
2735
2736 /* ---------------------------------------------------------------------------
2737 Interrupt execution.
2738 Might be called inside a signal handler so it mustn't do anything fancy.
2739 ------------------------------------------------------------------------ */
2740
2741 void
2742 interruptStgRts(void)
2743 {
2744 sched_state = SCHED_INTERRUPTING;
2745 interruptAllCapabilities();
2746 #if defined(THREADED_RTS)
2747 wakeUpRts();
2748 #endif
2749 }
2750
2751 /* -----------------------------------------------------------------------------
2752 Wake up the RTS
2753
2754 This function causes at least one OS thread to wake up and run the
2755 scheduler loop. It is invoked when the RTS might be deadlocked, or
2756 an external event has arrived that may need servicing (eg. a
2757 keyboard interrupt).
2758
2759 In the single-threaded RTS we don't do anything here; we only have
2760 one thread anyway, and the event that caused us to want to wake up
2761 will have interrupted any blocking system call in progress anyway.
2762 -------------------------------------------------------------------------- */
2763
2764 #if defined(THREADED_RTS)
2765 void wakeUpRts(void)
2766 {
2767 // This forces the IO Manager thread to wakeup, which will
2768 // in turn ensure that some OS thread wakes up and runs the
2769 // scheduler loop, which will cause a GC and deadlock check.
2770 ioManagerWakeup();
2771 }
2772 #endif
2773
2774 /* -----------------------------------------------------------------------------
2775 Deleting threads
2776
2777 This is used for interruption (^C) and forking, and corresponds to
2778 raising an exception but without letting the thread catch the
2779 exception.
2780 -------------------------------------------------------------------------- */
2781
2782 static void
2783 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2784 {
2785 // NOTE: must only be called on a TSO that we have exclusive
2786 // access to, because we will call throwToSingleThreaded() below.
2787 // The TSO must be on the run queue of the Capability we own, or
2788 // we must own all Capabilities.
2789
2790 if (tso->why_blocked != BlockedOnCCall &&
2791 tso->why_blocked != BlockedOnCCall_Interruptible) {
2792 throwToSingleThreaded(tso->cap,tso,NULL);
2793 }
2794 }
2795
2796 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2797 static void
2798 deleteThread_(Capability *cap, StgTSO *tso)
2799 { // for forkProcess only:
2800 // like deleteThread(), but we delete threads in foreign calls, too.
2801
2802 if (tso->why_blocked == BlockedOnCCall ||
2803 tso->why_blocked == BlockedOnCCall_Interruptible) {
2804 tso->what_next = ThreadKilled;
2805 appendToRunQueue(tso->cap, tso);
2806 } else {
2807 deleteThread(cap,tso);
2808 }
2809 }
2810 #endif
2811
2812 /* -----------------------------------------------------------------------------
2813 raiseExceptionHelper
2814
2815 This function is called by the raise# primitve, just so that we can
2816 move some of the tricky bits of raising an exception from C-- into
2817 C. Who knows, it might be a useful re-useable thing here too.
2818 -------------------------------------------------------------------------- */
2819
2820 StgWord
2821 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2822 {
2823 Capability *cap = regTableToCapability(reg);
2824 StgThunk *raise_closure = NULL;
2825 StgPtr p, next;
2826 const StgRetInfoTable *info;
2827 //
2828 // This closure represents the expression 'raise# E' where E
2829 // is the exception raise. It is used to overwrite all the
2830 // thunks which are currently under evaluataion.
2831 //
2832
2833 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2834 // LDV profiling: stg_raise_info has THUNK as its closure
2835 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2836 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2837 // 1 does not cause any problem unless profiling is performed.
2838 // However, when LDV profiling goes on, we need to linearly scan
2839 // small object pool, where raise_closure is stored, so we should
2840 // use MIN_UPD_SIZE.
2841 //
2842 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2843 // sizeofW(StgClosure)+1);
2844 //
2845
2846 //
2847 // Walk up the stack, looking for the catch frame. On the way,
2848 // we update any closures pointed to from update frames with the
2849 // raise closure that we just built.
2850 //
2851 p = tso->stackobj->sp;
2852 while(1) {
2853 info = get_ret_itbl((StgClosure *)p);
2854 next = p + stack_frame_sizeW((StgClosure *)p);
2855 switch (info->i.type) {
2856
2857 case UPDATE_FRAME:
2858 // Only create raise_closure if we need to.
2859 if (raise_closure == NULL) {
2860 raise_closure =
2861 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2862 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2863 raise_closure->payload[0] = exception;
2864 }
2865 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2866 (StgClosure *)raise_closure);
2867 p = next;
2868 continue;
2869
2870 case ATOMICALLY_FRAME:
2871 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2872 tso->stackobj->sp = p;
2873 return ATOMICALLY_FRAME;
2874
2875 case CATCH_FRAME:
2876 tso->stackobj->sp = p;
2877 return CATCH_FRAME;
2878
2879 case CATCH_STM_FRAME:
2880 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2881 tso->stackobj->sp = p;
2882 return CATCH_STM_FRAME;
2883
2884 case UNDERFLOW_FRAME:
2885 tso->stackobj->sp = p;
2886 threadStackUnderflow(cap,tso);
2887 p = tso->stackobj->sp;
2888 continue;
2889
2890 case STOP_FRAME:
2891 tso->stackobj->sp = p;
2892 return STOP_FRAME;
2893
2894 case CATCH_RETRY_FRAME: {
2895 StgTRecHeader *trec = tso -> trec;
2896 StgTRecHeader *outer = trec -> enclosing_trec;
2897 debugTrace(DEBUG_stm,
2898 "found CATCH_RETRY_FRAME at %p during raise", p);
2899 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2900 stmAbortTransaction(cap, trec);
2901 stmFreeAbortedTRec(cap, trec);
2902 tso -> trec = outer;
2903 p = next;
2904 continue;
2905 }
2906
2907 default:
2908 p = next;
2909 continue;
2910 }
2911 }
2912 }
2913
2914
2915 /* -----------------------------------------------------------------------------
2916 findRetryFrameHelper
2917
2918 This function is called by the retry# primitive. It traverses the stack
2919 leaving tso->sp referring to the frame which should handle the retry.
2920
2921 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2922 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2923
2924 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2925 create) because retries are not considered to be exceptions, despite the
2926 similar implementation.
2927
2928 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2929 not be created within memory transactions.
2930 -------------------------------------------------------------------------- */
2931
2932 StgWord
2933 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2934 {
2935 const StgRetInfoTable *info;
2936 StgPtr p, next;
2937
2938 p = tso->stackobj->sp;
2939 while (1) {
2940 info = get_ret_itbl((const StgClosure *)p);
2941 next = p + stack_frame_sizeW((StgClosure *)p);
2942 switch (info->i.type) {
2943
2944 case ATOMICALLY_FRAME:
2945 debugTrace(DEBUG_stm,
2946 "found ATOMICALLY_FRAME at %p during retry", p);
2947 tso->stackobj->sp = p;
2948 return ATOMICALLY_FRAME;
2949
2950 case CATCH_RETRY_FRAME:
2951 debugTrace(DEBUG_stm,
2952 "found CATCH_RETRY_FRAME at %p during retry", p);
2953 tso->stackobj->sp = p;
2954 return CATCH_RETRY_FRAME;
2955
2956 case CATCH_STM_FRAME: {
2957 StgTRecHeader *trec = tso -> trec;
2958 StgTRecHeader *outer = trec -> enclosing_trec;
2959 debugTrace(DEBUG_stm,
2960 "found CATCH_STM_FRAME at %p during retry", p);
2961 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2962 stmAbortTransaction(cap, trec);
2963 stmFreeAbortedTRec(cap, trec);
2964 tso -> trec = outer;
2965 p = next;
2966 continue;
2967 }
2968
2969 case UNDERFLOW_FRAME:
2970 tso->stackobj->sp = p;
2971 threadStackUnderflow(cap,tso);
2972 p = tso->stackobj->sp;
2973 continue;
2974
2975 default:
2976 ASSERT(info->i.type != CATCH_FRAME);
2977 ASSERT(info->i.type != STOP_FRAME);
2978 p = next;
2979 continue;
2980 }
2981 }
2982 }
2983
2984 /* -----------------------------------------------------------------------------
2985 resurrectThreads is called after garbage collection on the list of
2986 threads found to be garbage. Each of these threads will be woken
2987 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2988 on an MVar, or NonTermination if the thread was blocked on a Black
2989 Hole.
2990
2991 Locks: assumes we hold *all* the capabilities.
2992 -------------------------------------------------------------------------- */
2993
2994 void
2995 resurrectThreads (StgTSO *threads)
2996 {
2997 StgTSO *tso, *next;
2998 Capability *cap;
2999 generation *gen;
3000
3001 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3002 next = tso->global_link;
3003
3004 gen = Bdescr((P_)tso)->gen;
3005 tso->global_link = gen->threads;
3006 gen->threads = tso;
3007
3008 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3009
3010 // Wake up the thread on the Capability it was last on
3011 cap = tso->cap;
3012
3013 switch (tso->why_blocked) {
3014 case BlockedOnMVar:
3015 case BlockedOnMVarRead:
3016 /* Called by GC - sched_mutex lock is currently held. */
3017 throwToSingleThreaded(cap, tso,
3018 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3019 break;
3020 case BlockedOnBlackHole:
3021 throwToSingleThreaded(cap, tso,
3022 (StgClosure *)nonTermination_closure);
3023 break;
3024 case BlockedOnSTM:
3025 throwToSingleThreaded(cap, tso,
3026 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3027 break;
3028 case NotBlocked:
3029 /* This might happen if the thread was blocked on a black hole
3030 * belonging to a thread that we've just woken up (raiseAsync
3031 * can wake up threads, remember...).
3032 */
3033 continue;
3034 case BlockedOnMsgThrowTo:
3035 // This can happen if the target is masking, blocks on a
3036 // black hole, and then is found to be unreachable. In
3037 // this case, we want to let the target wake up and carry
3038 // on, and do nothing to this thread.
3039 continue;
3040 default:
3041 barf("resurrectThreads: thread blocked in a strange way: %d",
3042 tso->why_blocked);
3043 }
3044 }
3045 }