Schedule.c: remove some unused parameters
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (StgTSO *tso);
155 static void deleteAllThreads (void);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183
184 cap = initialCapability;
185
186 // Pre-condition: this task owns initialCapability.
187 // The sched_mutex is *NOT* held
188 // NB. on return, we still hold a capability.
189
190 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
191
192 schedulePreLoop();
193
194 // -----------------------------------------------------------
195 // Scheduler loop starts here:
196
197 while (1) {
198
199 // Check whether we have re-entered the RTS from Haskell without
200 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
201 // call).
202 if (cap->in_haskell) {
203 errorBelch("schedule: re-entered unsafely.\n"
204 " Perhaps a 'foreign import unsafe' should be 'safe'?");
205 stg_exit(EXIT_FAILURE);
206 }
207
208 // Note [shutdown]: The interruption / shutdown sequence.
209 //
210 // In order to cleanly shut down the runtime, we want to:
211 // * make sure that all main threads return to their callers
212 // with the state 'Interrupted'.
213 // * clean up all OS threads assocated with the runtime
214 // * free all memory etc.
215 //
216 // So the sequence goes like this:
217 //
218 // * The shutdown sequence is initiated by calling hs_exit(),
219 // interruptStgRts(), or running out of memory in the GC.
220 //
221 // * Set sched_state = SCHED_INTERRUPTING
222 //
223 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
224 // scheduleDoGC(), which halts the whole runtime by acquiring all the
225 // capabilities, does a GC and then calls deleteAllThreads() to kill all
226 // the remaining threads. The zombies are left on the run queue for
227 // cleaning up. We can't kill threads involved in foreign calls.
228 //
229 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
230 //
231 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
232 // enforced by the scheduler, which won't run any Haskell code when
233 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
234 // other capabilities by doing the GC earlier.
235 //
236 // * all workers exit when the run queue on their capability
237 // drains. All main threads will also exit when their TSO
238 // reaches the head of the run queue and they can return.
239 //
240 // * eventually all Capabilities will shut down, and the RTS can
241 // exit.
242 //
243 // * We might be left with threads blocked in foreign calls,
244 // we should really attempt to kill these somehow (TODO).
245
246 switch (sched_state) {
247 case SCHED_RUNNING:
248 break;
249 case SCHED_INTERRUPTING:
250 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
251 /* scheduleDoGC() deletes all the threads */
252 scheduleDoGC(&cap,task,true);
253
254 // after scheduleDoGC(), we must be shutting down. Either some
255 // other Capability did the final GC, or we did it above,
256 // either way we can fall through to the SCHED_SHUTTING_DOWN
257 // case now.
258 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
259 // fall through
260
261 case SCHED_SHUTTING_DOWN:
262 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
263 // If we are a worker, just exit. If we're a bound thread
264 // then we will exit below when we've removed our TSO from
265 // the run queue.
266 if (!isBoundTask(task) && emptyRunQueue(cap)) {
267 return cap;
268 }
269 break;
270 default:
271 barf("sched_state: %" FMT_Word, sched_state);
272 }
273
274 scheduleFindWork(&cap);
275
276 /* work pushing, currently relevant only for THREADED_RTS:
277 (pushes threads, wakes up idle capabilities for stealing) */
278 schedulePushWork(cap,task);
279
280 scheduleDetectDeadlock(&cap,task);
281
282 // Normally, the only way we can get here with no threads to
283 // run is if a keyboard interrupt received during
284 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
285 // Additionally, it is not fatal for the
286 // threaded RTS to reach here with no threads to run.
287 //
288 // win32: might be here due to awaitEvent() being abandoned
289 // as a result of a console event having been delivered.
290
291 #if defined(THREADED_RTS)
292 scheduleYield(&cap,task);
293
294 if (emptyRunQueue(cap)) continue; // look for work again
295 #endif
296
297 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
298 if ( emptyRunQueue(cap) ) {
299 ASSERT(sched_state >= SCHED_INTERRUPTING);
300 }
301 #endif
302
303 //
304 // Get a thread to run
305 //
306 t = popRunQueue(cap);
307
308 // Sanity check the thread we're about to run. This can be
309 // expensive if there is lots of thread switching going on...
310 IF_DEBUG(sanity,checkTSO(t));
311
312 #if defined(THREADED_RTS)
313 // Check whether we can run this thread in the current task.
314 // If not, we have to pass our capability to the right task.
315 {
316 InCall *bound = t->bound;
317
318 if (bound) {
319 if (bound->task == task) {
320 // yes, the Haskell thread is bound to the current native thread
321 } else {
322 debugTrace(DEBUG_sched,
323 "thread %lu bound to another OS thread",
324 (unsigned long)t->id);
325 // no, bound to a different Haskell thread: pass to that thread
326 pushOnRunQueue(cap,t);
327 continue;
328 }
329 } else {
330 // The thread we want to run is unbound.
331 if (task->incall->tso) {
332 debugTrace(DEBUG_sched,
333 "this OS thread cannot run thread %lu",
334 (unsigned long)t->id);
335 // no, the current native thread is bound to a different
336 // Haskell thread, so pass it to any worker thread
337 pushOnRunQueue(cap,t);
338 continue;
339 }
340 }
341 }
342 #endif
343
344 // If we're shutting down, and this thread has not yet been
345 // killed, kill it now. This sometimes happens when a finalizer
346 // thread is created by the final GC, or a thread previously
347 // in a foreign call returns.
348 if (sched_state >= SCHED_INTERRUPTING &&
349 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
350 deleteThread(t);
351 }
352
353 // If this capability is disabled, migrate the thread away rather
354 // than running it. NB. but not if the thread is bound: it is
355 // really hard for a bound thread to migrate itself. Believe me,
356 // I tried several ways and couldn't find a way to do it.
357 // Instead, when everything is stopped for GC, we migrate all the
358 // threads on the run queue then (see scheduleDoGC()).
359 //
360 // ToDo: what about TSO_LOCKED? Currently we're migrating those
361 // when the number of capabilities drops, but we never migrate
362 // them back if it rises again. Presumably we should, but after
363 // the thread has been migrated we no longer know what capability
364 // it was originally on.
365 #if defined(THREADED_RTS)
366 if (cap->disabled && !t->bound) {
367 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
368 migrateThread(cap, t, dest_cap);
369 continue;
370 }
371 #endif
372
373 /* context switches are initiated by the timer signal, unless
374 * the user specified "context switch as often as possible", with
375 * +RTS -C0
376 */
377 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
378 && !emptyThreadQueues(cap)) {
379 cap->context_switch = 1;
380 }
381
382 run_thread:
383
384 // CurrentTSO is the thread to run. It might be different if we
385 // loop back to run_thread, so make sure to set CurrentTSO after
386 // that.
387 cap->r.rCurrentTSO = t;
388
389 startHeapProfTimer();
390
391 // ----------------------------------------------------------------------
392 // Run the current thread
393
394 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
395 ASSERT(t->cap == cap);
396 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
397
398 prev_what_next = t->what_next;
399
400 errno = t->saved_errno;
401 #if defined(mingw32_HOST_OS)
402 SetLastError(t->saved_winerror);
403 #endif
404
405 // reset the interrupt flag before running Haskell code
406 cap->interrupt = 0;
407
408 cap->in_haskell = true;
409 cap->idle = 0;
410
411 dirty_TSO(cap,t);
412 dirty_STACK(cap,t->stackobj);
413
414 switch (recent_activity)
415 {
416 case ACTIVITY_DONE_GC: {
417 // ACTIVITY_DONE_GC means we turned off the timer signal to
418 // conserve power (see #1623). Re-enable it here.
419 uint32_t prev;
420 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
421 if (prev == ACTIVITY_DONE_GC) {
422 #if !defined(PROFILING)
423 startTimer();
424 #endif
425 }
426 break;
427 }
428 case ACTIVITY_INACTIVE:
429 // If we reached ACTIVITY_INACTIVE, then don't reset it until
430 // we've done the GC. The thread running here might just be
431 // the IO manager thread that handle_tick() woke up via
432 // wakeUpRts().
433 break;
434 default:
435 recent_activity = ACTIVITY_YES;
436 }
437
438 traceEventRunThread(cap, t);
439
440 switch (prev_what_next) {
441
442 case ThreadKilled:
443 case ThreadComplete:
444 /* Thread already finished, return to scheduler. */
445 ret = ThreadFinished;
446 break;
447
448 case ThreadRunGHC:
449 {
450 StgRegTable *r;
451 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
452 cap = regTableToCapability(r);
453 ret = r->rRet;
454 break;
455 }
456
457 case ThreadInterpret:
458 cap = interpretBCO(cap);
459 ret = cap->r.rRet;
460 break;
461
462 default:
463 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
464 }
465
466 cap->in_haskell = false;
467
468 // The TSO might have moved, eg. if it re-entered the RTS and a GC
469 // happened. So find the new location:
470 t = cap->r.rCurrentTSO;
471
472 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
473 // don't want it set when not running a Haskell thread.
474 cap->r.rCurrentTSO = NULL;
475
476 // And save the current errno in this thread.
477 // XXX: possibly bogus for SMP because this thread might already
478 // be running again, see code below.
479 t->saved_errno = errno;
480 #if defined(mingw32_HOST_OS)
481 // Similarly for Windows error code
482 t->saved_winerror = GetLastError();
483 #endif
484
485 if (ret == ThreadBlocked) {
486 if (t->why_blocked == BlockedOnBlackHole) {
487 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
488 traceEventStopThread(cap, t, t->why_blocked + 6,
489 owner != NULL ? owner->id : 0);
490 } else {
491 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
492 }
493 } else {
494 traceEventStopThread(cap, t, ret, 0);
495 }
496
497 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
498 ASSERT(t->cap == cap);
499
500 // ----------------------------------------------------------------------
501
502 // Costs for the scheduler are assigned to CCS_SYSTEM
503 stopHeapProfTimer();
504 #if defined(PROFILING)
505 cap->r.rCCCS = CCS_SYSTEM;
506 #endif
507
508 schedulePostRunThread(cap,t);
509
510 ready_to_gc = false;
511
512 switch (ret) {
513 case HeapOverflow:
514 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
515 break;
516
517 case StackOverflow:
518 // just adjust the stack for this thread, then pop it back
519 // on the run queue.
520 threadStackOverflow(cap, t);
521 pushOnRunQueue(cap,t);
522 break;
523
524 case ThreadYielding:
525 if (scheduleHandleYield(cap, t, prev_what_next)) {
526 // shortcut for switching between compiler/interpreter:
527 goto run_thread;
528 }
529 break;
530
531 case ThreadBlocked:
532 scheduleHandleThreadBlocked(t);
533 break;
534
535 case ThreadFinished:
536 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
537 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
538 break;
539
540 default:
541 barf("schedule: invalid thread return code %d", (int)ret);
542 }
543
544 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
545 scheduleDoGC(&cap,task,false);
546 }
547 } /* end of while() */
548 }
549
550 /* -----------------------------------------------------------------------------
551 * Run queue operations
552 * -------------------------------------------------------------------------- */
553
554 static void
555 removeFromRunQueue (Capability *cap, StgTSO *tso)
556 {
557 if (tso->block_info.prev == END_TSO_QUEUE) {
558 ASSERT(cap->run_queue_hd == tso);
559 cap->run_queue_hd = tso->_link;
560 } else {
561 setTSOLink(cap, tso->block_info.prev, tso->_link);
562 }
563 if (tso->_link == END_TSO_QUEUE) {
564 ASSERT(cap->run_queue_tl == tso);
565 cap->run_queue_tl = tso->block_info.prev;
566 } else {
567 setTSOPrev(cap, tso->_link, tso->block_info.prev);
568 }
569 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
570 cap->n_run_queue--;
571
572 IF_DEBUG(sanity, checkRunQueue(cap));
573 }
574
575 void
576 promoteInRunQueue (Capability *cap, StgTSO *tso)
577 {
578 removeFromRunQueue(cap, tso);
579 pushOnRunQueue(cap, tso);
580 }
581
582 /* ----------------------------------------------------------------------------
583 * Setting up the scheduler loop
584 * ------------------------------------------------------------------------- */
585
586 static void
587 schedulePreLoop(void)
588 {
589 // initialisation for scheduler - what cannot go into initScheduler()
590
591 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
592 win32AllocStack();
593 #endif
594 }
595
596 /* -----------------------------------------------------------------------------
597 * scheduleFindWork()
598 *
599 * Search for work to do, and handle messages from elsewhere.
600 * -------------------------------------------------------------------------- */
601
602 static void
603 scheduleFindWork (Capability **pcap)
604 {
605 scheduleStartSignalHandlers(*pcap);
606
607 scheduleProcessInbox(pcap);
608
609 scheduleCheckBlockedThreads(*pcap);
610
611 #if defined(THREADED_RTS)
612 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
613 #endif
614 }
615
616 #if defined(THREADED_RTS)
617 STATIC_INLINE bool
618 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
619 {
620 // we need to yield this capability to someone else if..
621 // - another thread is initiating a GC, and we didn't just do a GC
622 // (see Note [GC livelock])
623 // - another Task is returning from a foreign call
624 // - the thread at the head of the run queue cannot be run
625 // by this Task (it is bound to another Task, or it is unbound
626 // and this task it bound).
627 //
628 // Note [GC livelock]
629 //
630 // If we are interrupted to do a GC, then we do not immediately do
631 // another one. This avoids a starvation situation where one
632 // Capability keeps forcing a GC and the other Capabilities make no
633 // progress at all.
634
635 return ((pending_sync && !didGcLast) ||
636 cap->n_returning_tasks != 0 ||
637 (!emptyRunQueue(cap) && (task->incall->tso == NULL
638 ? peekRunQueue(cap)->bound != NULL
639 : peekRunQueue(cap)->bound != task->incall)));
640 }
641
642 // This is the single place where a Task goes to sleep. There are
643 // two reasons it might need to sleep:
644 // - there are no threads to run
645 // - we need to yield this Capability to someone else
646 // (see shouldYieldCapability())
647 //
648 // Careful: the scheduler loop is quite delicate. Make sure you run
649 // the tests in testsuite/concurrent (all ways) after modifying this,
650 // and also check the benchmarks in nofib/parallel for regressions.
651
652 static void
653 scheduleYield (Capability **pcap, Task *task)
654 {
655 Capability *cap = *pcap;
656 bool didGcLast = false;
657
658 // if we have work, and we don't need to give up the Capability, continue.
659 //
660 if (!shouldYieldCapability(cap,task,false) &&
661 (!emptyRunQueue(cap) ||
662 !emptyInbox(cap) ||
663 sched_state >= SCHED_INTERRUPTING)) {
664 return;
665 }
666
667 // otherwise yield (sleep), and keep yielding if necessary.
668 do {
669 if (doIdleGCWork(cap, false)) {
670 didGcLast = false;
671 } else {
672 didGcLast = yieldCapability(&cap,task, !didGcLast);
673 }
674 }
675 while (shouldYieldCapability(cap,task,didGcLast));
676
677 // note there may still be no threads on the run queue at this
678 // point, the caller has to check.
679
680 *pcap = cap;
681 return;
682 }
683 #endif
684
685 /* -----------------------------------------------------------------------------
686 * schedulePushWork()
687 *
688 * Push work to other Capabilities if we have some.
689 * -------------------------------------------------------------------------- */
690
691 static void
692 schedulePushWork(Capability *cap USED_IF_THREADS,
693 Task *task USED_IF_THREADS)
694 {
695 #if defined(THREADED_RTS)
696
697 Capability *free_caps[n_capabilities], *cap0;
698 uint32_t i, n_wanted_caps, n_free_caps;
699
700 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
701
702 // migration can be turned off with +RTS -qm
703 if (!RtsFlags.ParFlags.migrate) {
704 spare_threads = 0;
705 }
706
707 // Figure out how many capabilities we want to wake up. We need at least
708 // sparkPoolSize(cap) plus the number of spare threads we have.
709 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
710 if (n_wanted_caps == 0) return;
711
712 // First grab as many free Capabilities as we can. ToDo: we should use
713 // capabilities on the same NUMA node preferably, but not exclusively.
714 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
715 n_free_caps < n_wanted_caps && i != cap->no;
716 i = (i + 1) % n_capabilities) {
717 cap0 = capabilities[i];
718 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
719 if (!emptyRunQueue(cap0)
720 || cap0->n_returning_tasks != 0
721 || !emptyInbox(cap0)) {
722 // it already has some work, we just grabbed it at
723 // the wrong moment. Or maybe it's deadlocked!
724 releaseCapability(cap0);
725 } else {
726 free_caps[n_free_caps++] = cap0;
727 }
728 }
729 }
730
731 // We now have n_free_caps free capabilities stashed in
732 // free_caps[]. Attempt to share our run queue equally with them.
733 // This is complicated slightly by the fact that we can't move
734 // some threads:
735 //
736 // - threads that have TSO_LOCKED cannot migrate
737 // - a thread that is bound to the current Task cannot be migrated
738 //
739 // This is about the simplest thing we could do; improvements we
740 // might want to do include:
741 //
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
745 //
746 // - giving low priority to moving long-lived threads
747
748 if (n_free_caps > 0) {
749 StgTSO *prev, *t, *next;
750
751 debugTrace(DEBUG_sched,
752 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
753 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
754 n_free_caps);
755
756 // There are n_free_caps+1 caps in total. We will share the threads
757 // evently between them, *except* that if the run queue does not divide
758 // evenly by n_free_caps+1 then we bias towards the current capability.
759 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
760 uint32_t keep_threads =
761 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
762
763 // This also ensures that we don't give away all our threads, since
764 // (x + y) / (y + 1) >= 1 when x >= 1.
765
766 // The number of threads we have left.
767 uint32_t n = cap->n_run_queue;
768
769 // prev = the previous thread on this cap's run queue
770 prev = END_TSO_QUEUE;
771
772 // We're going to walk through the run queue, migrating threads to other
773 // capabilities until we have only keep_threads left. We might
774 // encounter a thread that cannot be migrated, in which case we add it
775 // to the current run queue and decrement keep_threads.
776 for (t = cap->run_queue_hd, i = 0;
777 t != END_TSO_QUEUE && n > keep_threads;
778 t = next)
779 {
780 next = t->_link;
781 t->_link = END_TSO_QUEUE;
782
783 // Should we keep this thread?
784 if (t->bound == task->incall // don't move my bound thread
785 || tsoLocked(t) // don't move a locked thread
786 ) {
787 if (prev == END_TSO_QUEUE) {
788 cap->run_queue_hd = t;
789 } else {
790 setTSOLink(cap, prev, t);
791 }
792 setTSOPrev(cap, t, prev);
793 prev = t;
794 if (keep_threads > 0) keep_threads--;
795 }
796
797 // Or migrate it?
798 else {
799 appendToRunQueue(free_caps[i],t);
800 traceEventMigrateThread (cap, t, free_caps[i]->no);
801
802 if (t->bound) { t->bound->task->cap = free_caps[i]; }
803 t->cap = free_caps[i];
804 n--; // we have one fewer threads now
805 i++; // move on to the next free_cap
806 if (i == n_free_caps) i = 0;
807 }
808 }
809
810 // Join up the beginning of the queue (prev)
811 // with the rest of the queue (t)
812 if (t == END_TSO_QUEUE) {
813 cap->run_queue_tl = prev;
814 } else {
815 setTSOPrev(cap, t, prev);
816 }
817 if (prev == END_TSO_QUEUE) {
818 cap->run_queue_hd = t;
819 } else {
820 setTSOLink(cap, prev, t);
821 }
822 cap->n_run_queue = n;
823
824 IF_DEBUG(sanity, checkRunQueue(cap));
825
826 // release the capabilities
827 for (i = 0; i < n_free_caps; i++) {
828 task->cap = free_caps[i];
829 if (sparkPoolSizeCap(cap) > 0) {
830 // If we have sparks to steal, wake up a worker on the
831 // capability, even if it has no threads to run.
832 releaseAndWakeupCapability(free_caps[i]);
833 } else {
834 releaseCapability(free_caps[i]);
835 }
836 }
837 }
838 task->cap = cap; // reset to point to our Capability.
839
840 #endif /* THREADED_RTS */
841
842 }
843
844 /* ----------------------------------------------------------------------------
845 * Start any pending signal handlers
846 * ------------------------------------------------------------------------- */
847
848 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
849 static void
850 scheduleStartSignalHandlers(Capability *cap)
851 {
852 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
853 // safe outside the lock
854 startSignalHandlers(cap);
855 }
856 }
857 #else
858 static void
859 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
860 {
861 }
862 #endif
863
864 /* ----------------------------------------------------------------------------
865 * Check for blocked threads that can be woken up.
866 * ------------------------------------------------------------------------- */
867
868 static void
869 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
870 {
871 #if !defined(THREADED_RTS)
872 //
873 // Check whether any waiting threads need to be woken up. If the
874 // run queue is empty, and there are no other tasks running, we
875 // can wait indefinitely for something to happen.
876 //
877 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
878 {
879 awaitEvent (emptyRunQueue(cap));
880 }
881 #endif
882 }
883
884 /* ----------------------------------------------------------------------------
885 * Detect deadlock conditions and attempt to resolve them.
886 * ------------------------------------------------------------------------- */
887
888 static void
889 scheduleDetectDeadlock (Capability **pcap, Task *task)
890 {
891 Capability *cap = *pcap;
892 /*
893 * Detect deadlock: when we have no threads to run, there are no
894 * threads blocked, waiting for I/O, or sleeping, and all the
895 * other tasks are waiting for work, we must have a deadlock of
896 * some description.
897 */
898 if ( emptyThreadQueues(cap) )
899 {
900 #if defined(THREADED_RTS)
901 /*
902 * In the threaded RTS, we only check for deadlock if there
903 * has been no activity in a complete timeslice. This means
904 * we won't eagerly start a full GC just because we don't have
905 * any threads to run currently.
906 */
907 if (recent_activity != ACTIVITY_INACTIVE) return;
908 #endif
909
910 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
911
912 // Garbage collection can release some new threads due to
913 // either (a) finalizers or (b) threads resurrected because
914 // they are unreachable and will therefore be sent an
915 // exception. Any threads thus released will be immediately
916 // runnable.
917 scheduleDoGC (pcap, task, true/*force major GC*/);
918 cap = *pcap;
919 // when force_major == true. scheduleDoGC sets
920 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
921 // signal.
922
923 if ( !emptyRunQueue(cap) ) return;
924
925 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
926 /* If we have user-installed signal handlers, then wait
927 * for signals to arrive rather then bombing out with a
928 * deadlock.
929 */
930 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
931 debugTrace(DEBUG_sched,
932 "still deadlocked, waiting for signals...");
933
934 awaitUserSignals();
935
936 if (signals_pending()) {
937 startSignalHandlers(cap);
938 }
939
940 // either we have threads to run, or we were interrupted:
941 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
942
943 return;
944 }
945 #endif
946
947 #if !defined(THREADED_RTS)
948 /* Probably a real deadlock. Send the current main thread the
949 * Deadlock exception.
950 */
951 if (task->incall->tso) {
952 switch (task->incall->tso->why_blocked) {
953 case BlockedOnSTM:
954 case BlockedOnBlackHole:
955 case BlockedOnMsgThrowTo:
956 case BlockedOnMVar:
957 case BlockedOnMVarRead:
958 throwToSingleThreaded(cap, task->incall->tso,
959 (StgClosure *)nonTermination_closure);
960 return;
961 default:
962 barf("deadlock: main thread blocked in a strange way");
963 }
964 }
965 return;
966 #endif
967 }
968 }
969
970
971 /* ----------------------------------------------------------------------------
972 * Process message in the current Capability's inbox
973 * ------------------------------------------------------------------------- */
974
975 static void
976 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
977 {
978 #if defined(THREADED_RTS)
979 Message *m, *next;
980 PutMVar *p, *pnext;
981 int r;
982 Capability *cap = *pcap;
983
984 while (!emptyInbox(cap)) {
985 // Executing messages might use heap, so we should check for GC.
986 if (doYouWantToGC(cap)) {
987 scheduleDoGC(pcap, cap->running_task, false);
988 cap = *pcap;
989 }
990
991 // don't use a blocking acquire; if the lock is held by
992 // another thread then just carry on. This seems to avoid
993 // getting stuck in a message ping-pong situation with other
994 // processors. We'll check the inbox again later anyway.
995 //
996 // We should really use a more efficient queue data structure
997 // here. The trickiness is that we must ensure a Capability
998 // never goes idle if the inbox is non-empty, which is why we
999 // use cap->lock (cap->lock is released as the last thing
1000 // before going idle; see Capability.c:releaseCapability()).
1001 r = TRY_ACQUIRE_LOCK(&cap->lock);
1002 if (r != 0) return;
1003
1004 m = cap->inbox;
1005 p = cap->putMVars;
1006 cap->inbox = (Message*)END_TSO_QUEUE;
1007 cap->putMVars = NULL;
1008
1009 RELEASE_LOCK(&cap->lock);
1010
1011 while (m != (Message*)END_TSO_QUEUE) {
1012 next = m->link;
1013 executeMessage(cap, m);
1014 m = next;
1015 }
1016
1017 while (p != NULL) {
1018 pnext = p->link;
1019 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1020 Unit_closure);
1021 freeStablePtr(p->mvar);
1022 stgFree(p);
1023 p = pnext;
1024 }
1025 }
1026 #endif
1027 }
1028
1029
1030 /* ----------------------------------------------------------------------------
1031 * Activate spark threads (THREADED_RTS)
1032 * ------------------------------------------------------------------------- */
1033
1034 #if defined(THREADED_RTS)
1035 static void
1036 scheduleActivateSpark(Capability *cap)
1037 {
1038 if (anySparks() && !cap->disabled)
1039 {
1040 createSparkThread(cap);
1041 debugTrace(DEBUG_sched, "creating a spark thread");
1042 }
1043 }
1044 #endif // THREADED_RTS
1045
1046 /* ----------------------------------------------------------------------------
1047 * After running a thread...
1048 * ------------------------------------------------------------------------- */
1049
1050 static void
1051 schedulePostRunThread (Capability *cap, StgTSO *t)
1052 {
1053 // We have to be able to catch transactions that are in an
1054 // infinite loop as a result of seeing an inconsistent view of
1055 // memory, e.g.
1056 //
1057 // atomically $ do
1058 // [a,b] <- mapM readTVar [ta,tb]
1059 // when (a == b) loop
1060 //
1061 // and a is never equal to b given a consistent view of memory.
1062 //
1063 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1064 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1065 debugTrace(DEBUG_sched | DEBUG_stm,
1066 "trec %p found wasting its time", t);
1067
1068 // strip the stack back to the
1069 // ATOMICALLY_FRAME, aborting the (nested)
1070 // transaction, and saving the stack of any
1071 // partially-evaluated thunks on the heap.
1072 throwToSingleThreaded_(cap, t, NULL, true);
1073
1074 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1075 }
1076 }
1077
1078 //
1079 // If the current thread's allocation limit has run out, send it
1080 // the AllocationLimitExceeded exception.
1081
1082 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1083 // Use a throwToSelf rather than a throwToSingleThreaded, because
1084 // it correctly handles the case where the thread is currently
1085 // inside mask. Also the thread might be blocked (e.g. on an
1086 // MVar), and throwToSingleThreaded doesn't unblock it
1087 // correctly in that case.
1088 throwToSelf(cap, t, allocationLimitExceeded_closure);
1089 ASSIGN_Int64((W_*)&(t->alloc_limit),
1090 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1091 }
1092
1093 /* some statistics gathering in the parallel case */
1094 }
1095
1096 /* -----------------------------------------------------------------------------
1097 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1098 * -------------------------------------------------------------------------- */
1099
1100 static bool
1101 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1102 {
1103 if (cap->r.rHpLim == NULL || cap->context_switch) {
1104 // Sometimes we miss a context switch, e.g. when calling
1105 // primitives in a tight loop, MAYBE_GC() doesn't check the
1106 // context switch flag, and we end up waiting for a GC.
1107 // See #1984, and concurrent/should_run/1984
1108 cap->context_switch = 0;
1109 appendToRunQueue(cap,t);
1110 } else {
1111 pushOnRunQueue(cap,t);
1112 }
1113
1114 // did the task ask for a large block?
1115 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1116 // if so, get one and push it on the front of the nursery.
1117 bdescr *bd;
1118 W_ blocks;
1119
1120 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1121
1122 if (blocks > BLOCKS_PER_MBLOCK) {
1123 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1124 }
1125
1126 debugTrace(DEBUG_sched,
1127 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1128 (long)t->id, what_next_strs[t->what_next], blocks);
1129
1130 // don't do this if the nursery is (nearly) full, we'll GC first.
1131 if (cap->r.rCurrentNursery->link != NULL ||
1132 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1133 // infinite loop if the
1134 // nursery has only one
1135 // block.
1136
1137 bd = allocGroupOnNode_lock(cap->node,blocks);
1138 cap->r.rNursery->n_blocks += blocks;
1139
1140 // link the new group after CurrentNursery
1141 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1142
1143 // initialise it as a nursery block. We initialise the
1144 // step, gen_no, and flags field of *every* sub-block in
1145 // this large block, because this is easier than making
1146 // sure that we always find the block head of a large
1147 // block whenever we call Bdescr() (eg. evacuate() and
1148 // isAlive() in the GC would both have to do this, at
1149 // least).
1150 {
1151 bdescr *x;
1152 for (x = bd; x < bd + blocks; x++) {
1153 initBdescr(x,g0,g0);
1154 x->free = x->start;
1155 x->flags = 0;
1156 }
1157 }
1158
1159 // This assert can be a killer if the app is doing lots
1160 // of large block allocations.
1161 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1162
1163 // now update the nursery to point to the new block
1164 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1165 cap->r.rCurrentNursery = bd;
1166
1167 // we might be unlucky and have another thread get on the
1168 // run queue before us and steal the large block, but in that
1169 // case the thread will just end up requesting another large
1170 // block.
1171 return false; /* not actually GC'ing */
1172 }
1173 }
1174
1175 return doYouWantToGC(cap);
1176 /* actual GC is done at the end of the while loop in schedule() */
1177 }
1178
1179 /* -----------------------------------------------------------------------------
1180 * Handle a thread that returned to the scheduler with ThreadYielding
1181 * -------------------------------------------------------------------------- */
1182
1183 static bool
1184 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1185 {
1186 /* put the thread back on the run queue. Then, if we're ready to
1187 * GC, check whether this is the last task to stop. If so, wake
1188 * up the GC thread. getThread will block during a GC until the
1189 * GC is finished.
1190 */
1191
1192 ASSERT(t->_link == END_TSO_QUEUE);
1193
1194 // Shortcut if we're just switching evaluators: just run the thread. See
1195 // Note [avoiding threadPaused] in Interpreter.c.
1196 if (t->what_next != prev_what_next) {
1197 debugTrace(DEBUG_sched,
1198 "--<< thread %ld (%s) stopped to switch evaluators",
1199 (long)t->id, what_next_strs[t->what_next]);
1200 return true;
1201 }
1202
1203 // Reset the context switch flag. We don't do this just before
1204 // running the thread, because that would mean we would lose ticks
1205 // during GC, which can lead to unfair scheduling (a thread hogs
1206 // the CPU because the tick always arrives during GC). This way
1207 // penalises threads that do a lot of allocation, but that seems
1208 // better than the alternative.
1209 if (cap->context_switch != 0) {
1210 cap->context_switch = 0;
1211 appendToRunQueue(cap,t);
1212 } else {
1213 pushOnRunQueue(cap,t);
1214 }
1215
1216 IF_DEBUG(sanity,
1217 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1218 checkTSO(t));
1219
1220 return false;
1221 }
1222
1223 /* -----------------------------------------------------------------------------
1224 * Handle a thread that returned to the scheduler with ThreadBlocked
1225 * -------------------------------------------------------------------------- */
1226
1227 static void
1228 scheduleHandleThreadBlocked( StgTSO *t
1229 #if !defined(DEBUG)
1230 STG_UNUSED
1231 #endif
1232 )
1233 {
1234
1235 // We don't need to do anything. The thread is blocked, and it
1236 // has tidied up its stack and placed itself on whatever queue
1237 // it needs to be on.
1238
1239 // ASSERT(t->why_blocked != NotBlocked);
1240 // Not true: for example,
1241 // - the thread may have woken itself up already, because
1242 // threadPaused() might have raised a blocked throwTo
1243 // exception, see maybePerformBlockedException().
1244
1245 #if defined(DEBUG)
1246 traceThreadStatus(DEBUG_sched, t);
1247 #endif
1248 }
1249
1250 /* -----------------------------------------------------------------------------
1251 * Handle a thread that returned to the scheduler with ThreadFinished
1252 * -------------------------------------------------------------------------- */
1253
1254 static bool
1255 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1256 {
1257 /* Need to check whether this was a main thread, and if so,
1258 * return with the return value.
1259 *
1260 * We also end up here if the thread kills itself with an
1261 * uncaught exception, see Exception.cmm.
1262 */
1263
1264 // blocked exceptions can now complete, even if the thread was in
1265 // blocked mode (see #2910).
1266 awakenBlockedExceptionQueue (cap, t);
1267
1268 //
1269 // Check whether the thread that just completed was a bound
1270 // thread, and if so return with the result.
1271 //
1272 // There is an assumption here that all thread completion goes
1273 // through this point; we need to make sure that if a thread
1274 // ends up in the ThreadKilled state, that it stays on the run
1275 // queue so it can be dealt with here.
1276 //
1277
1278 if (t->bound) {
1279
1280 if (t->bound != task->incall) {
1281 #if !defined(THREADED_RTS)
1282 // Must be a bound thread that is not the topmost one. Leave
1283 // it on the run queue until the stack has unwound to the
1284 // point where we can deal with this. Leaving it on the run
1285 // queue also ensures that the garbage collector knows about
1286 // this thread and its return value (it gets dropped from the
1287 // step->threads list so there's no other way to find it).
1288 appendToRunQueue(cap,t);
1289 return false;
1290 #else
1291 // this cannot happen in the threaded RTS, because a
1292 // bound thread can only be run by the appropriate Task.
1293 barf("finished bound thread that isn't mine");
1294 #endif
1295 }
1296
1297 ASSERT(task->incall->tso == t);
1298
1299 if (t->what_next == ThreadComplete) {
1300 if (task->incall->ret) {
1301 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1302 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1303 }
1304 task->incall->rstat = Success;
1305 } else {
1306 if (task->incall->ret) {
1307 *(task->incall->ret) = NULL;
1308 }
1309 if (sched_state >= SCHED_INTERRUPTING) {
1310 if (heap_overflow) {
1311 task->incall->rstat = HeapExhausted;
1312 } else {
1313 task->incall->rstat = Interrupted;
1314 }
1315 } else {
1316 task->incall->rstat = Killed;
1317 }
1318 }
1319 #if defined(DEBUG)
1320 removeThreadLabel((StgWord)task->incall->tso->id);
1321 #endif
1322
1323 // We no longer consider this thread and task to be bound to
1324 // each other. The TSO lives on until it is GC'd, but the
1325 // task is about to be released by the caller, and we don't
1326 // want anyone following the pointer from the TSO to the
1327 // defunct task (which might have already been
1328 // re-used). This was a real bug: the GC updated
1329 // tso->bound->tso which lead to a deadlock.
1330 t->bound = NULL;
1331 task->incall->tso = NULL;
1332
1333 return true; // tells schedule() to return
1334 }
1335
1336 return false;
1337 }
1338
1339 /* -----------------------------------------------------------------------------
1340 * Perform a heap census
1341 * -------------------------------------------------------------------------- */
1342
1343 static bool
1344 scheduleNeedHeapProfile( bool ready_to_gc )
1345 {
1346 // When we have +RTS -i0 and we're heap profiling, do a census at
1347 // every GC. This lets us get repeatable runs for debugging.
1348 if (performHeapProfile ||
1349 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1350 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1351 return true;
1352 } else {
1353 return false;
1354 }
1355 }
1356
1357 /* -----------------------------------------------------------------------------
1358 * stopAllCapabilities()
1359 *
1360 * Stop all Haskell execution. This is used when we need to make some global
1361 * change to the system, such as altering the number of capabilities, or
1362 * forking.
1363 *
1364 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1365 * -------------------------------------------------------------------------- */
1366
1367 #if defined(THREADED_RTS)
1368 static void stopAllCapabilities (Capability **pCap, Task *task)
1369 {
1370 bool was_syncing;
1371 SyncType prev_sync_type;
1372
1373 PendingSync sync = {
1374 .type = SYNC_OTHER,
1375 .idle = NULL,
1376 .task = task
1377 };
1378
1379 do {
1380 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1381 } while (was_syncing);
1382
1383 acquireAllCapabilities(*pCap,task);
1384
1385 pending_sync = 0;
1386 }
1387 #endif
1388
1389 /* -----------------------------------------------------------------------------
1390 * requestSync()
1391 *
1392 * Commence a synchronisation between all capabilities. Normally not called
1393 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1394 * has some special synchronisation requirements.
1395 *
1396 * Returns:
1397 * false if we successfully got a sync
1398 * true if there was another sync request in progress,
1399 * and we yielded to it. The value returned is the
1400 * type of the other sync request.
1401 * -------------------------------------------------------------------------- */
1402
1403 #if defined(THREADED_RTS)
1404 static bool requestSync (
1405 Capability **pcap, Task *task, PendingSync *new_sync,
1406 SyncType *prev_sync_type)
1407 {
1408 PendingSync *sync;
1409
1410 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1411 (StgWord)NULL,
1412 (StgWord)new_sync);
1413
1414 if (sync != NULL)
1415 {
1416 // sync is valid until we have called yieldCapability().
1417 // After the sync is completed, we cannot read that struct any
1418 // more because it has been freed.
1419 *prev_sync_type = sync->type;
1420 do {
1421 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1422 sync->type);
1423 ASSERT(*pcap);
1424 yieldCapability(pcap,task,true);
1425 sync = pending_sync;
1426 } while (sync != NULL);
1427
1428 // NOTE: task->cap might have changed now
1429 return true;
1430 }
1431 else
1432 {
1433 return false;
1434 }
1435 }
1436 #endif
1437
1438 /* -----------------------------------------------------------------------------
1439 * acquireAllCapabilities()
1440 *
1441 * Grab all the capabilities except the one we already hold. Used
1442 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1443 * before a fork (SYNC_OTHER).
1444 *
1445 * Only call this after requestSync(), otherwise a deadlock might
1446 * ensue if another thread is trying to synchronise.
1447 * -------------------------------------------------------------------------- */
1448
1449 #if defined(THREADED_RTS)
1450 static void acquireAllCapabilities(Capability *cap, Task *task)
1451 {
1452 Capability *tmpcap;
1453 uint32_t i;
1454
1455 ASSERT(pending_sync != NULL);
1456 for (i=0; i < n_capabilities; i++) {
1457 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1458 i, n_capabilities);
1459 tmpcap = capabilities[i];
1460 if (tmpcap != cap) {
1461 // we better hope this task doesn't get migrated to
1462 // another Capability while we're waiting for this one.
1463 // It won't, because load balancing happens while we have
1464 // all the Capabilities, but even so it's a slightly
1465 // unsavoury invariant.
1466 task->cap = tmpcap;
1467 waitForCapability(&tmpcap, task);
1468 if (tmpcap->no != i) {
1469 barf("acquireAllCapabilities: got the wrong capability");
1470 }
1471 }
1472 }
1473 task->cap = cap;
1474 }
1475 #endif
1476
1477 /* -----------------------------------------------------------------------------
1478 * releaseAllcapabilities()
1479 *
1480 * Assuming this thread holds all the capabilities, release them all except for
1481 * the one passed in as cap.
1482 * -------------------------------------------------------------------------- */
1483
1484 #if defined(THREADED_RTS)
1485 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1486 {
1487 uint32_t i;
1488
1489 for (i = 0; i < n; i++) {
1490 if (cap->no != i) {
1491 task->cap = capabilities[i];
1492 releaseCapability(capabilities[i]);
1493 }
1494 }
1495 task->cap = cap;
1496 }
1497 #endif
1498
1499 /* -----------------------------------------------------------------------------
1500 * Perform a garbage collection if necessary
1501 * -------------------------------------------------------------------------- */
1502
1503 static void
1504 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1505 bool force_major)
1506 {
1507 Capability *cap = *pcap;
1508 bool heap_census;
1509 uint32_t collect_gen;
1510 bool major_gc;
1511 #if defined(THREADED_RTS)
1512 uint32_t gc_type;
1513 uint32_t i;
1514 uint32_t need_idle;
1515 uint32_t n_gc_threads;
1516 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1517 StgTSO *tso;
1518 bool *idle_cap;
1519 // idle_cap is an array (allocated later) of size n_capabilities, where
1520 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1521 // cycle.
1522 #endif
1523
1524 if (sched_state == SCHED_SHUTTING_DOWN) {
1525 // The final GC has already been done, and the system is
1526 // shutting down. We'll probably deadlock if we try to GC
1527 // now.
1528 return;
1529 }
1530
1531 heap_census = scheduleNeedHeapProfile(true);
1532
1533 // Figure out which generation we are collecting, so that we can
1534 // decide whether this is a parallel GC or not.
1535 collect_gen = calcNeeded(force_major || heap_census, NULL);
1536 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1537
1538 #if defined(THREADED_RTS)
1539 if (sched_state < SCHED_INTERRUPTING
1540 && RtsFlags.ParFlags.parGcEnabled
1541 && collect_gen >= RtsFlags.ParFlags.parGcGen
1542 && ! oldest_gen->mark)
1543 {
1544 gc_type = SYNC_GC_PAR;
1545 } else {
1546 gc_type = SYNC_GC_SEQ;
1547 }
1548
1549 // In order to GC, there must be no threads running Haskell code.
1550 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1551 // capabilities, and release them after the GC has completed. For parallel
1552 // GC, we synchronise all the running threads using requestSync().
1553 //
1554 // Other capabilities are prevented from running yet more Haskell threads if
1555 // pending_sync is set. Tested inside yieldCapability() and
1556 // releaseCapability() in Capability.c
1557
1558 PendingSync sync = {
1559 .type = gc_type,
1560 .idle = NULL,
1561 .task = task
1562 };
1563
1564 {
1565 SyncType prev_sync = 0;
1566 bool was_syncing;
1567 do {
1568 // If -qn is not set and we have more capabilities than cores, set
1569 // the number of GC threads to #cores. We do this here rather than
1570 // in normaliseRtsOpts() because here it will work if the program
1571 // calls setNumCapabilities.
1572 //
1573 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1574 if (n_gc_threads == 0 &&
1575 enabled_capabilities > getNumberOfProcessors()) {
1576 n_gc_threads = getNumberOfProcessors();
1577 }
1578
1579 // This calculation must be inside the loop because
1580 // enabled_capabilities may change if requestSync() below fails and
1581 // we retry.
1582 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1583 if (n_gc_threads >= enabled_capabilities) {
1584 need_idle = 0;
1585 } else {
1586 need_idle = enabled_capabilities - n_gc_threads;
1587 }
1588 } else {
1589 need_idle = 0;
1590 }
1591
1592 // We need an array of size n_capabilities, but since this may
1593 // change each time around the loop we must allocate it afresh.
1594 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1595 sizeof(bool),
1596 "scheduleDoGC");
1597 sync.idle = idle_cap;
1598
1599 // When using +RTS -qn, we need some capabilities to be idle during
1600 // GC. The best bet is to choose some inactive ones, so we look for
1601 // those first:
1602 uint32_t n_idle = need_idle;
1603 for (i=0; i < n_capabilities; i++) {
1604 if (capabilities[i]->disabled) {
1605 idle_cap[i] = true;
1606 } else if (n_idle > 0 &&
1607 capabilities[i]->running_task == NULL) {
1608 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1609 n_idle--;
1610 idle_cap[i] = true;
1611 } else {
1612 idle_cap[i] = false;
1613 }
1614 }
1615 // If we didn't find enough inactive capabilities, just pick some
1616 // more to be idle.
1617 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1618 if (!idle_cap[i] && i != cap->no) {
1619 idle_cap[i] = true;
1620 n_idle--;
1621 }
1622 }
1623 ASSERT(n_idle == 0);
1624
1625 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1626 cap = *pcap;
1627 if (was_syncing) {
1628 stgFree(idle_cap);
1629 }
1630 if (was_syncing &&
1631 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1632 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1633 // someone else had a pending sync request for a GC, so
1634 // let's assume GC has been done and we don't need to GC
1635 // again.
1636 // Exception to this: if SCHED_INTERRUPTING, then we still
1637 // need to do the final GC.
1638 return;
1639 }
1640 if (sched_state == SCHED_SHUTTING_DOWN) {
1641 // The scheduler might now be shutting down. We tested
1642 // this above, but it might have become true since then as
1643 // we yielded the capability in requestSync().
1644 return;
1645 }
1646 } while (was_syncing);
1647 }
1648
1649 stat_startGCSync(gc_threads[cap->no]);
1650
1651 #if defined(DEBUG)
1652 unsigned int old_n_capabilities = n_capabilities;
1653 #endif
1654
1655 interruptAllCapabilities();
1656
1657 // The final shutdown GC is always single-threaded, because it's
1658 // possible that some of the Capabilities have no worker threads.
1659
1660 if (gc_type == SYNC_GC_SEQ) {
1661 traceEventRequestSeqGc(cap);
1662 } else {
1663 traceEventRequestParGc(cap);
1664 }
1665
1666 if (gc_type == SYNC_GC_SEQ) {
1667 // single-threaded GC: grab all the capabilities
1668 acquireAllCapabilities(cap,task);
1669 }
1670 else
1671 {
1672 // If we are load-balancing collections in this
1673 // generation, then we require all GC threads to participate
1674 // in the collection. Otherwise, we only require active
1675 // threads to participate, and we set gc_threads[i]->idle for
1676 // any idle capabilities. The rationale here is that waking
1677 // up an idle Capability takes much longer than just doing any
1678 // GC work on its behalf.
1679
1680 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1681 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1682 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1683 {
1684 for (i=0; i < n_capabilities; i++) {
1685 if (capabilities[i]->disabled) {
1686 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1687 if (idle_cap[i]) {
1688 n_idle_caps++;
1689 }
1690 } else {
1691 if (i != cap->no && idle_cap[i]) {
1692 Capability *tmpcap = capabilities[i];
1693 task->cap = tmpcap;
1694 waitForCapability(&tmpcap, task);
1695 n_idle_caps++;
1696 }
1697 }
1698 }
1699 }
1700 else
1701 {
1702 for (i=0; i < n_capabilities; i++) {
1703 if (capabilities[i]->disabled) {
1704 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1705 if (idle_cap[i]) {
1706 n_idle_caps++;
1707 }
1708 } else if (i != cap->no &&
1709 capabilities[i]->idle >=
1710 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1711 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1712 if (idle_cap[i]) {
1713 n_idle_caps++;
1714 } else {
1715 n_failed_trygrab_idles++;
1716 }
1717 }
1718 }
1719 }
1720 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1721
1722 for (i=0; i < n_capabilities; i++) {
1723 capabilities[i]->idle++;
1724 }
1725
1726 // For all capabilities participating in this GC, wait until
1727 // they have stopped mutating and are standing by for GC.
1728 waitForGcThreads(cap, idle_cap);
1729
1730 // Stable point where we can do a global check on our spark counters
1731 ASSERT(checkSparkCountInvariant());
1732 }
1733
1734 #endif
1735
1736 IF_DEBUG(scheduler, printAllThreads());
1737
1738 delete_threads_and_gc:
1739 /*
1740 * We now have all the capabilities; if we're in an interrupting
1741 * state, then we should take the opportunity to delete all the
1742 * threads in the system.
1743 * Checking for major_gc ensures that the last GC is major.
1744 */
1745 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1746 deleteAllThreads();
1747 #if defined(THREADED_RTS)
1748 // Discard all the sparks from every Capability. Why?
1749 // They'll probably be GC'd anyway since we've killed all the
1750 // threads. It just avoids the GC having to do any work to
1751 // figure out that any remaining sparks are garbage.
1752 for (i = 0; i < n_capabilities; i++) {
1753 capabilities[i]->spark_stats.gcd +=
1754 sparkPoolSize(capabilities[i]->sparks);
1755 // No race here since all Caps are stopped.
1756 discardSparksCap(capabilities[i]);
1757 }
1758 #endif
1759 sched_state = SCHED_SHUTTING_DOWN;
1760 }
1761
1762 /*
1763 * When there are disabled capabilities, we want to migrate any
1764 * threads away from them. Normally this happens in the
1765 * scheduler's loop, but only for unbound threads - it's really
1766 * hard for a bound thread to migrate itself. So we have another
1767 * go here.
1768 */
1769 #if defined(THREADED_RTS)
1770 for (i = enabled_capabilities; i < n_capabilities; i++) {
1771 Capability *tmp_cap, *dest_cap;
1772 tmp_cap = capabilities[i];
1773 ASSERT(tmp_cap->disabled);
1774 if (i != cap->no) {
1775 dest_cap = capabilities[i % enabled_capabilities];
1776 while (!emptyRunQueue(tmp_cap)) {
1777 tso = popRunQueue(tmp_cap);
1778 migrateThread(tmp_cap, tso, dest_cap);
1779 if (tso->bound) {
1780 traceTaskMigrate(tso->bound->task,
1781 tso->bound->task->cap,
1782 dest_cap);
1783 tso->bound->task->cap = dest_cap;
1784 }
1785 }
1786 }
1787 }
1788 #endif
1789
1790 // Do any remaining idle GC work from the previous GC
1791 doIdleGCWork(cap, true /* all of it */);
1792
1793 #if defined(THREADED_RTS)
1794 // reset pending_sync *before* GC, so that when the GC threads
1795 // emerge they don't immediately re-enter the GC.
1796 pending_sync = 0;
1797 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1798 #else
1799 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1800 #endif
1801
1802 // If we're shutting down, don't leave any idle GC work to do.
1803 if (sched_state == SCHED_SHUTTING_DOWN) {
1804 doIdleGCWork(cap, true /* all of it */);
1805 }
1806
1807 traceSparkCounters(cap);
1808
1809 switch (recent_activity) {
1810 case ACTIVITY_INACTIVE:
1811 if (force_major) {
1812 // We are doing a GC because the system has been idle for a
1813 // timeslice and we need to check for deadlock. Record the
1814 // fact that we've done a GC and turn off the timer signal;
1815 // it will get re-enabled if we run any threads after the GC.
1816 recent_activity = ACTIVITY_DONE_GC;
1817 #if !defined(PROFILING)
1818 stopTimer();
1819 #endif
1820 break;
1821 }
1822 // fall through...
1823
1824 case ACTIVITY_MAYBE_NO:
1825 // the GC might have taken long enough for the timer to set
1826 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1827 // but we aren't necessarily deadlocked:
1828 recent_activity = ACTIVITY_YES;
1829 break;
1830
1831 case ACTIVITY_DONE_GC:
1832 // If we are actually active, the scheduler will reset the
1833 // recent_activity flag and re-enable the timer.
1834 break;
1835 }
1836
1837 #if defined(THREADED_RTS)
1838 // Stable point where we can do a global check on our spark counters
1839 ASSERT(checkSparkCountInvariant());
1840 #endif
1841
1842 // The heap census itself is done during GarbageCollect().
1843 if (heap_census) {
1844 performHeapProfile = false;
1845 }
1846
1847 #if defined(THREADED_RTS)
1848
1849 // If n_capabilities has changed during GC, we're in trouble.
1850 ASSERT(n_capabilities == old_n_capabilities);
1851
1852 if (gc_type == SYNC_GC_PAR)
1853 {
1854 for (i = 0; i < n_capabilities; i++) {
1855 if (i != cap->no) {
1856 if (idle_cap[i]) {
1857 ASSERT(capabilities[i]->running_task == task);
1858 task->cap = capabilities[i];
1859 releaseCapability(capabilities[i]);
1860 } else {
1861 ASSERT(capabilities[i]->running_task != task);
1862 }
1863 }
1864 }
1865 task->cap = cap;
1866
1867 // releaseGCThreads() happens *after* we have released idle
1868 // capabilities. Otherwise what can happen is one of the released
1869 // threads starts a new GC, and finds that it can't acquire some of
1870 // the disabled capabilities, because the previous GC still holds
1871 // them, so those disabled capabilities will not be idle during the
1872 // next GC round. However, if we release the capabilities first,
1873 // then they will be free (because they're disabled) when the next
1874 // GC cycle happens.
1875 releaseGCThreads(cap, idle_cap);
1876 }
1877 #endif
1878 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1879 // GC set the heap_overflow flag. We should throw an exception if we
1880 // can, or shut down otherwise.
1881
1882 // Get the thread to which Ctrl-C is thrown
1883 StgTSO *main_thread = getTopHandlerThread();
1884 if (main_thread == NULL) {
1885 // GC set the heap_overflow flag, and there is no main thread to
1886 // throw an exception to, so we should proceed with an orderly
1887 // shutdown now. Ultimately we want the main thread to return to
1888 // its caller with HeapExhausted, at which point the caller should
1889 // call hs_exit(). The first step is to delete all the threads.
1890 sched_state = SCHED_INTERRUPTING;
1891 goto delete_threads_and_gc;
1892 }
1893
1894 heap_overflow = false;
1895 const uint64_t allocation_count = getAllocations();
1896 if (RtsFlags.GcFlags.heapLimitGrace <
1897 allocation_count - allocated_bytes_at_heapoverflow ||
1898 allocated_bytes_at_heapoverflow == 0) {
1899 allocated_bytes_at_heapoverflow = allocation_count;
1900 // We used to simply exit, but throwing an exception gives the
1901 // program a chance to clean up. It also lets the exception be
1902 // caught.
1903
1904 // FIXME this is not a good way to tell a program to release
1905 // resources. It is neither reliable (the RTS crashes if it fails
1906 // to allocate memory from the OS) nor very usable (it is always
1907 // thrown to the main thread, which might not be able to do anything
1908 // useful with it). We really should have a more general way to
1909 // release resources in low-memory conditions. Nevertheless, this
1910 // is still a big improvement over just exiting.
1911
1912 // FIXME again: perhaps we should throw a synchronous exception
1913 // instead an asynchronous one, or have a way for the program to
1914 // register a handler to be called when heap overflow happens.
1915 throwToSelf(cap, main_thread, heapOverflow_closure);
1916 }
1917 }
1918
1919 #if defined(THREADED_RTS)
1920 stgFree(idle_cap);
1921
1922 if (gc_type == SYNC_GC_SEQ) {
1923 // release our stash of capabilities.
1924 releaseAllCapabilities(n_capabilities, cap, task);
1925 }
1926 #endif
1927
1928 return;
1929 }
1930
1931 /* ---------------------------------------------------------------------------
1932 * Singleton fork(). Do not copy any running threads.
1933 * ------------------------------------------------------------------------- */
1934
1935 pid_t
1936 forkProcess(HsStablePtr *entry
1937 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1938 STG_UNUSED
1939 #endif
1940 )
1941 {
1942 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1943 pid_t pid;
1944 StgTSO* t,*next;
1945 Capability *cap;
1946 uint32_t g;
1947 Task *task = NULL;
1948 uint32_t i;
1949
1950 debugTrace(DEBUG_sched, "forking!");
1951
1952 task = newBoundTask();
1953
1954 cap = NULL;
1955 waitForCapability(&cap, task);
1956
1957 #if defined(THREADED_RTS)
1958 stopAllCapabilities(&cap, task);
1959 #endif
1960
1961 // no funny business: hold locks while we fork, otherwise if some
1962 // other thread is holding a lock when the fork happens, the data
1963 // structure protected by the lock will forever be in an
1964 // inconsistent state in the child. See also #1391.
1965 ACQUIRE_LOCK(&sched_mutex);
1966 ACQUIRE_LOCK(&sm_mutex);
1967 ACQUIRE_LOCK(&stable_mutex);
1968 ACQUIRE_LOCK(&task->lock);
1969
1970 for (i=0; i < n_capabilities; i++) {
1971 ACQUIRE_LOCK(&capabilities[i]->lock);
1972 }
1973
1974 #if defined(THREADED_RTS)
1975 ACQUIRE_LOCK(&all_tasks_mutex);
1976 #endif
1977
1978 stopTimer(); // See #4074
1979
1980 #if defined(TRACING)
1981 flushEventLog(); // so that child won't inherit dirty file buffers
1982 #endif
1983
1984 pid = fork();
1985
1986 if (pid) { // parent
1987
1988 startTimer(); // #4074
1989
1990 RELEASE_LOCK(&sched_mutex);
1991 RELEASE_LOCK(&sm_mutex);
1992 RELEASE_LOCK(&stable_mutex);
1993 RELEASE_LOCK(&task->lock);
1994
1995 #if defined(THREADED_RTS)
1996 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
1997 RELEASE_LOCK(&all_tasks_mutex);
1998 #endif
1999
2000 for (i=0; i < n_capabilities; i++) {
2001 releaseCapability_(capabilities[i],false);
2002 RELEASE_LOCK(&capabilities[i]->lock);
2003 }
2004
2005 boundTaskExiting(task);
2006
2007 // just return the pid
2008 return pid;
2009
2010 } else { // child
2011
2012 #if defined(THREADED_RTS)
2013 initMutex(&sched_mutex);
2014 initMutex(&sm_mutex);
2015 initMutex(&stable_mutex);
2016 initMutex(&task->lock);
2017
2018 for (i=0; i < n_capabilities; i++) {
2019 initMutex(&capabilities[i]->lock);
2020 }
2021
2022 initMutex(&all_tasks_mutex);
2023 #endif
2024
2025 #if defined(TRACING)
2026 resetTracing();
2027 #endif
2028
2029 // Now, all OS threads except the thread that forked are
2030 // stopped. We need to stop all Haskell threads, including
2031 // those involved in foreign calls. Also we need to delete
2032 // all Tasks, because they correspond to OS threads that are
2033 // now gone.
2034
2035 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2036 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2037 next = t->global_link;
2038 // don't allow threads to catch the ThreadKilled
2039 // exception, but we do want to raiseAsync() because these
2040 // threads may be evaluating thunks that we need later.
2041 deleteThread_(t);
2042
2043 // stop the GC from updating the InCall to point to
2044 // the TSO. This is only necessary because the
2045 // OSThread bound to the TSO has been killed, and
2046 // won't get a chance to exit in the usual way (see
2047 // also scheduleHandleThreadFinished).
2048 t->bound = NULL;
2049 }
2050 }
2051
2052 discardTasksExcept(task);
2053
2054 for (i=0; i < n_capabilities; i++) {
2055 cap = capabilities[i];
2056
2057 // Empty the run queue. It seems tempting to let all the
2058 // killed threads stay on the run queue as zombies to be
2059 // cleaned up later, but some of them may correspond to
2060 // bound threads for which the corresponding Task does not
2061 // exist.
2062 truncateRunQueue(cap);
2063 cap->n_run_queue = 0;
2064
2065 // Any suspended C-calling Tasks are no more, their OS threads
2066 // don't exist now:
2067 cap->suspended_ccalls = NULL;
2068 cap->n_suspended_ccalls = 0;
2069
2070 #if defined(THREADED_RTS)
2071 // Wipe our spare workers list, they no longer exist. New
2072 // workers will be created if necessary.
2073 cap->spare_workers = NULL;
2074 cap->n_spare_workers = 0;
2075 cap->returning_tasks_hd = NULL;
2076 cap->returning_tasks_tl = NULL;
2077 cap->n_returning_tasks = 0;
2078 #endif
2079
2080 // Release all caps except 0, we'll use that for starting
2081 // the IO manager and running the client action below.
2082 if (cap->no != 0) {
2083 task->cap = cap;
2084 releaseCapability(cap);
2085 }
2086 }
2087 cap = capabilities[0];
2088 task->cap = cap;
2089
2090 // Empty the threads lists. Otherwise, the garbage
2091 // collector may attempt to resurrect some of these threads.
2092 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2093 generations[g].threads = END_TSO_QUEUE;
2094 }
2095
2096 // On Unix, all timers are reset in the child, so we need to start
2097 // the timer again.
2098 initTimer();
2099 startTimer();
2100
2101 // TODO: need to trace various other things in the child
2102 // like startup event, capabilities, process info etc
2103 traceTaskCreate(task, cap);
2104
2105 #if defined(THREADED_RTS)
2106 ioManagerStartCap(&cap);
2107 #endif
2108
2109 // Install toplevel exception handlers, so interruption
2110 // signal will be sent to the main thread.
2111 // See Trac #12903
2112 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2113 rts_checkSchedStatus("forkProcess",cap);
2114
2115 rts_unlock(cap);
2116 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2117 }
2118 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2119 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2120 #endif
2121 }
2122
2123 /* ---------------------------------------------------------------------------
2124 * Changing the number of Capabilities
2125 *
2126 * Changing the number of Capabilities is very tricky! We can only do
2127 * it with the system fully stopped, so we do a full sync with
2128 * requestSync(SYNC_OTHER) and grab all the capabilities.
2129 *
2130 * Then we resize the appropriate data structures, and update all
2131 * references to the old data structures which have now moved.
2132 * Finally we release the Capabilities we are holding, and start
2133 * worker Tasks on the new Capabilities we created.
2134 *
2135 * ------------------------------------------------------------------------- */
2136
2137 void
2138 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2139 {
2140 #if !defined(THREADED_RTS)
2141 if (new_n_capabilities != 1) {
2142 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2143 }
2144 return;
2145 #elif defined(NOSMP)
2146 if (new_n_capabilities != 1) {
2147 errorBelch("setNumCapabilities: not supported on this platform");
2148 }
2149 return;
2150 #else
2151 Task *task;
2152 Capability *cap;
2153 uint32_t n;
2154 Capability *old_capabilities = NULL;
2155 uint32_t old_n_capabilities = n_capabilities;
2156
2157 if (new_n_capabilities == enabled_capabilities) {
2158 return;
2159 } else if (new_n_capabilities <= 0) {
2160 errorBelch("setNumCapabilities: Capability count must be positive");
2161 return;
2162 }
2163
2164
2165 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2166 enabled_capabilities, new_n_capabilities);
2167
2168 cap = rts_lock();
2169 task = cap->running_task;
2170
2171 stopAllCapabilities(&cap, task);
2172
2173 if (new_n_capabilities < enabled_capabilities)
2174 {
2175 // Reducing the number of capabilities: we do not actually
2176 // remove the extra capabilities, we just mark them as
2177 // "disabled". This has the following effects:
2178 //
2179 // - threads on a disabled capability are migrated away by the
2180 // scheduler loop
2181 //
2182 // - disabled capabilities do not participate in GC
2183 // (see scheduleDoGC())
2184 //
2185 // - No spark threads are created on this capability
2186 // (see scheduleActivateSpark())
2187 //
2188 // - We do not attempt to migrate threads *to* a disabled
2189 // capability (see schedulePushWork()).
2190 //
2191 // but in other respects, a disabled capability remains
2192 // alive. Threads may be woken up on a disabled capability,
2193 // but they will be immediately migrated away.
2194 //
2195 // This approach is much easier than trying to actually remove
2196 // the capability; we don't have to worry about GC data
2197 // structures, the nursery, etc.
2198 //
2199 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2200 capabilities[n]->disabled = true;
2201 traceCapDisable(capabilities[n]);
2202 }
2203 enabled_capabilities = new_n_capabilities;
2204 }
2205 else
2206 {
2207 // Increasing the number of enabled capabilities.
2208 //
2209 // enable any disabled capabilities, up to the required number
2210 for (n = enabled_capabilities;
2211 n < new_n_capabilities && n < n_capabilities; n++) {
2212 capabilities[n]->disabled = false;
2213 traceCapEnable(capabilities[n]);
2214 }
2215 enabled_capabilities = n;
2216
2217 if (new_n_capabilities > n_capabilities) {
2218 #if defined(TRACING)
2219 // Allocate eventlog buffers for the new capabilities. Note this
2220 // must be done before calling moreCapabilities(), because that
2221 // will emit events about creating the new capabilities and adding
2222 // them to existing capsets.
2223 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2224 #endif
2225
2226 // Resize the capabilities array
2227 // NB. after this, capabilities points somewhere new. Any pointers
2228 // of type (Capability *) are now invalid.
2229 moreCapabilities(n_capabilities, new_n_capabilities);
2230
2231 // Resize and update storage manager data structures
2232 storageAddCapabilities(n_capabilities, new_n_capabilities);
2233 }
2234 }
2235
2236 // update n_capabilities before things start running
2237 if (new_n_capabilities > n_capabilities) {
2238 n_capabilities = enabled_capabilities = new_n_capabilities;
2239 }
2240
2241 // We're done: release the original Capabilities
2242 releaseAllCapabilities(old_n_capabilities, cap,task);
2243
2244 // We can't free the old array until now, because we access it
2245 // while updating pointers in updateCapabilityRefs().
2246 if (old_capabilities) {
2247 stgFree(old_capabilities);
2248 }
2249
2250 // Notify IO manager that the number of capabilities has changed.
2251 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2252
2253 rts_unlock(cap);
2254
2255 #endif // THREADED_RTS
2256 }
2257
2258
2259
2260 /* ---------------------------------------------------------------------------
2261 * Delete all the threads in the system
2262 * ------------------------------------------------------------------------- */
2263
2264 static void
2265 deleteAllThreads ()
2266 {
2267 // NOTE: only safe to call if we own all capabilities.
2268
2269 StgTSO* t, *next;
2270 uint32_t g;
2271
2272 debugTrace(DEBUG_sched,"deleting all threads");
2273 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2274 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2275 next = t->global_link;
2276 deleteThread(t);
2277 }
2278 }
2279
2280 // The run queue now contains a bunch of ThreadKilled threads. We
2281 // must not throw these away: the main thread(s) will be in there
2282 // somewhere, and the main scheduler loop has to deal with it.
2283 // Also, the run queue is the only thing keeping these threads from
2284 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2285
2286 #if !defined(THREADED_RTS)
2287 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2288 ASSERT(sleeping_queue == END_TSO_QUEUE);
2289 #endif
2290 }
2291
2292 /* -----------------------------------------------------------------------------
2293 Managing the suspended_ccalls list.
2294 Locks required: sched_mutex
2295 -------------------------------------------------------------------------- */
2296
2297 STATIC_INLINE void
2298 suspendTask (Capability *cap, Task *task)
2299 {
2300 InCall *incall;
2301
2302 incall = task->incall;
2303 ASSERT(incall->next == NULL && incall->prev == NULL);
2304 incall->next = cap->suspended_ccalls;
2305 incall->prev = NULL;
2306 if (cap->suspended_ccalls) {
2307 cap->suspended_ccalls->prev = incall;
2308 }
2309 cap->suspended_ccalls = incall;
2310 cap->n_suspended_ccalls++;
2311 }
2312
2313 STATIC_INLINE void
2314 recoverSuspendedTask (Capability *cap, Task *task)
2315 {
2316 InCall *incall;
2317
2318 incall = task->incall;
2319 if (incall->prev) {
2320 incall->prev->next = incall->next;
2321 } else {
2322 ASSERT(cap->suspended_ccalls == incall);
2323 cap->suspended_ccalls = incall->next;
2324 }
2325 if (incall->next) {
2326 incall->next->prev = incall->prev;
2327 }
2328 incall->next = incall->prev = NULL;
2329 cap->n_suspended_ccalls--;
2330 }
2331
2332 /* ---------------------------------------------------------------------------
2333 * Suspending & resuming Haskell threads.
2334 *
2335 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2336 * its capability before calling the C function. This allows another
2337 * task to pick up the capability and carry on running Haskell
2338 * threads. It also means that if the C call blocks, it won't lock
2339 * the whole system.
2340 *
2341 * The Haskell thread making the C call is put to sleep for the
2342 * duration of the call, on the suspended_ccalling_threads queue. We
2343 * give out a token to the task, which it can use to resume the thread
2344 * on return from the C function.
2345 *
2346 * If this is an interruptible C call, this means that the FFI call may be
2347 * unceremoniously terminated and should be scheduled on an
2348 * unbound worker thread.
2349 * ------------------------------------------------------------------------- */
2350
2351 void *
2352 suspendThread (StgRegTable *reg, bool interruptible)
2353 {
2354 Capability *cap;
2355 int saved_errno;
2356 StgTSO *tso;
2357 Task *task;
2358 #if defined(mingw32_HOST_OS)
2359 StgWord32 saved_winerror;
2360 #endif
2361
2362 saved_errno = errno;
2363 #if defined(mingw32_HOST_OS)
2364 saved_winerror = GetLastError();
2365 #endif
2366
2367 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2368 */
2369 cap = regTableToCapability(reg);
2370
2371 task = cap->running_task;
2372 tso = cap->r.rCurrentTSO;
2373
2374 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2375
2376 // XXX this might not be necessary --SDM
2377 tso->what_next = ThreadRunGHC;
2378
2379 threadPaused(cap,tso);
2380
2381 if (interruptible) {
2382 tso->why_blocked = BlockedOnCCall_Interruptible;
2383 } else {
2384 tso->why_blocked = BlockedOnCCall;
2385 }
2386
2387 // Hand back capability
2388 task->incall->suspended_tso = tso;
2389 task->incall->suspended_cap = cap;
2390
2391 // Otherwise allocate() will write to invalid memory.
2392 cap->r.rCurrentTSO = NULL;
2393
2394 ACQUIRE_LOCK(&cap->lock);
2395
2396 suspendTask(cap,task);
2397 cap->in_haskell = false;
2398 releaseCapability_(cap,false);
2399
2400 RELEASE_LOCK(&cap->lock);
2401
2402 errno = saved_errno;
2403 #if defined(mingw32_HOST_OS)
2404 SetLastError(saved_winerror);
2405 #endif
2406 return task;
2407 }
2408
2409 StgRegTable *
2410 resumeThread (void *task_)
2411 {
2412 StgTSO *tso;
2413 InCall *incall;
2414 Capability *cap;
2415 Task *task = task_;
2416 int saved_errno;
2417 #if defined(mingw32_HOST_OS)
2418 StgWord32 saved_winerror;
2419 #endif
2420
2421 saved_errno = errno;
2422 #if defined(mingw32_HOST_OS)
2423 saved_winerror = GetLastError();
2424 #endif
2425
2426 incall = task->incall;
2427 cap = incall->suspended_cap;
2428 task->cap = cap;
2429
2430 // Wait for permission to re-enter the RTS with the result.
2431 waitForCapability(&cap,task);
2432 // we might be on a different capability now... but if so, our
2433 // entry on the suspended_ccalls list will also have been
2434 // migrated.
2435
2436 // Remove the thread from the suspended list
2437 recoverSuspendedTask(cap,task);
2438
2439 tso = incall->suspended_tso;
2440 incall->suspended_tso = NULL;
2441 incall->suspended_cap = NULL;
2442 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2443
2444 traceEventRunThread(cap, tso);
2445
2446 /* Reset blocking status */
2447 tso->why_blocked = NotBlocked;
2448
2449 if ((tso->flags & TSO_BLOCKEX) == 0) {
2450 // avoid locking the TSO if we don't have to
2451 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2452 maybePerformBlockedException(cap,tso);
2453 }
2454 }
2455
2456 cap->r.rCurrentTSO = tso;
2457 cap->in_haskell = true;
2458 errno = saved_errno;
2459 #if defined(mingw32_HOST_OS)
2460 SetLastError(saved_winerror);
2461 #endif
2462
2463 /* We might have GC'd, mark the TSO dirty again */
2464 dirty_TSO(cap,tso);
2465 dirty_STACK(cap,tso->stackobj);
2466
2467 IF_DEBUG(sanity, checkTSO(tso));
2468
2469 return &cap->r;
2470 }
2471
2472 /* ---------------------------------------------------------------------------
2473 * scheduleThread()
2474 *
2475 * scheduleThread puts a thread on the end of the runnable queue.
2476 * This will usually be done immediately after a thread is created.
2477 * The caller of scheduleThread must create the thread using e.g.
2478 * createThread and push an appropriate closure
2479 * on this thread's stack before the scheduler is invoked.
2480 * ------------------------------------------------------------------------ */
2481
2482 void
2483 scheduleThread(Capability *cap, StgTSO *tso)
2484 {
2485 // The thread goes at the *end* of the run-queue, to avoid possible
2486 // starvation of any threads already on the queue.
2487 appendToRunQueue(cap,tso);
2488 }
2489
2490 void
2491 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2492 {
2493 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2494 // move this thread from now on.
2495 #if defined(THREADED_RTS)
2496 cpu %= enabled_capabilities;
2497 if (cpu == cap->no) {
2498 appendToRunQueue(cap,tso);
2499 } else {
2500 migrateThread(cap, tso, capabilities[cpu]);
2501 }
2502 #else
2503 appendToRunQueue(cap,tso);
2504 #endif
2505 }
2506
2507 void
2508 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2509 {
2510 Task *task;
2511 DEBUG_ONLY( StgThreadID id );
2512 Capability *cap;
2513
2514 cap = *pcap;
2515
2516 // We already created/initialised the Task
2517 task = cap->running_task;
2518
2519 // This TSO is now a bound thread; make the Task and TSO
2520 // point to each other.
2521 tso->bound = task->incall;
2522 tso->cap = cap;
2523
2524 task->incall->tso = tso;
2525 task->incall->ret = ret;
2526 task->incall->rstat = NoStatus;
2527
2528 appendToRunQueue(cap,tso);
2529
2530 DEBUG_ONLY( id = tso->id );
2531 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2532
2533 cap = schedule(cap,task);
2534
2535 ASSERT(task->incall->rstat != NoStatus);
2536 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2537
2538 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2539 *pcap = cap;
2540 }
2541
2542 /* ----------------------------------------------------------------------------
2543 * Starting Tasks
2544 * ------------------------------------------------------------------------- */
2545
2546 #if defined(THREADED_RTS)
2547 void scheduleWorker (Capability *cap, Task *task)
2548 {
2549 // schedule() runs without a lock.
2550 cap = schedule(cap,task);
2551
2552 // On exit from schedule(), we have a Capability, but possibly not
2553 // the same one we started with.
2554
2555 // During shutdown, the requirement is that after all the
2556 // Capabilities are shut down, all workers that are shutting down
2557 // have finished workerTaskStop(). This is why we hold on to
2558 // cap->lock until we've finished workerTaskStop() below.
2559 //
2560 // There may be workers still involved in foreign calls; those
2561 // will just block in waitForCapability() because the
2562 // Capability has been shut down.
2563 //
2564 ACQUIRE_LOCK(&cap->lock);
2565 releaseCapability_(cap,false);
2566 workerTaskStop(task);
2567 RELEASE_LOCK(&cap->lock);
2568 }
2569 #endif
2570
2571 /* ---------------------------------------------------------------------------
2572 * Start new worker tasks on Capabilities from--to
2573 * -------------------------------------------------------------------------- */
2574
2575 static void
2576 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2577 {
2578 #if defined(THREADED_RTS)
2579 uint32_t i;
2580 Capability *cap;
2581
2582 for (i = from; i < to; i++) {
2583 cap = capabilities[i];
2584 ACQUIRE_LOCK(&cap->lock);
2585 startWorkerTask(cap);
2586 RELEASE_LOCK(&cap->lock);
2587 }
2588 #endif
2589 }
2590
2591 /* ---------------------------------------------------------------------------
2592 * initScheduler()
2593 *
2594 * Initialise the scheduler. This resets all the queues - if the
2595 * queues contained any threads, they'll be garbage collected at the
2596 * next pass.
2597 *
2598 * ------------------------------------------------------------------------ */
2599
2600 void
2601 initScheduler(void)
2602 {
2603 #if !defined(THREADED_RTS)
2604 blocked_queue_hd = END_TSO_QUEUE;
2605 blocked_queue_tl = END_TSO_QUEUE;
2606 sleeping_queue = END_TSO_QUEUE;
2607 #endif
2608
2609 sched_state = SCHED_RUNNING;
2610 recent_activity = ACTIVITY_YES;
2611
2612 #if defined(THREADED_RTS)
2613 /* Initialise the mutex and condition variables used by
2614 * the scheduler. */
2615 initMutex(&sched_mutex);
2616 #endif
2617
2618 ACQUIRE_LOCK(&sched_mutex);
2619
2620 allocated_bytes_at_heapoverflow = 0;
2621
2622 /* A capability holds the state a native thread needs in
2623 * order to execute STG code. At least one capability is
2624 * floating around (only THREADED_RTS builds have more than one).
2625 */
2626 initCapabilities();
2627
2628 initTaskManager();
2629
2630 /*
2631 * Eagerly start one worker to run each Capability, except for
2632 * Capability 0. The idea is that we're probably going to start a
2633 * bound thread on Capability 0 pretty soon, so we don't want a
2634 * worker task hogging it.
2635 */
2636 startWorkerTasks(1, n_capabilities);
2637
2638 RELEASE_LOCK(&sched_mutex);
2639
2640 }
2641
2642 void
2643 exitScheduler (bool wait_foreign USED_IF_THREADS)
2644 /* see Capability.c, shutdownCapability() */
2645 {
2646 Task *task = NULL;
2647
2648 task = newBoundTask();
2649
2650 // If we haven't killed all the threads yet, do it now.
2651 if (sched_state < SCHED_SHUTTING_DOWN) {
2652 sched_state = SCHED_INTERRUPTING;
2653 Capability *cap = task->cap;
2654 waitForCapability(&cap,task);
2655 scheduleDoGC(&cap,task,true);
2656 ASSERT(task->incall->tso == NULL);
2657 releaseCapability(cap);
2658 }
2659 sched_state = SCHED_SHUTTING_DOWN;
2660
2661 shutdownCapabilities(task, wait_foreign);
2662
2663 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2664 // n_failed_trygrab_idles, n_idle_caps);
2665
2666 boundTaskExiting(task);
2667 }
2668
2669 void
2670 freeScheduler( void )
2671 {
2672 uint32_t still_running;
2673
2674 ACQUIRE_LOCK(&sched_mutex);
2675 still_running = freeTaskManager();
2676 // We can only free the Capabilities if there are no Tasks still
2677 // running. We might have a Task about to return from a foreign
2678 // call into waitForCapability(), for example (actually,
2679 // this should be the *only* thing that a still-running Task can
2680 // do at this point, and it will block waiting for the
2681 // Capability).
2682 if (still_running == 0) {
2683 freeCapabilities();
2684 }
2685 RELEASE_LOCK(&sched_mutex);
2686 #if defined(THREADED_RTS)
2687 closeMutex(&sched_mutex);
2688 #endif
2689 }
2690
2691 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2692 void *user USED_IF_NOT_THREADS)
2693 {
2694 #if !defined(THREADED_RTS)
2695 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2696 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2697 evac(user, (StgClosure **)(void *)&sleeping_queue);
2698 #endif
2699 }
2700
2701 /* -----------------------------------------------------------------------------
2702 performGC
2703
2704 This is the interface to the garbage collector from Haskell land.
2705 We provide this so that external C code can allocate and garbage
2706 collect when called from Haskell via _ccall_GC.
2707 -------------------------------------------------------------------------- */
2708
2709 static void
2710 performGC_(bool force_major)
2711 {
2712 Task *task;
2713 Capability *cap = NULL;
2714
2715 // We must grab a new Task here, because the existing Task may be
2716 // associated with a particular Capability, and chained onto the
2717 // suspended_ccalls queue.
2718 task = newBoundTask();
2719
2720 // TODO: do we need to traceTask*() here?
2721
2722 waitForCapability(&cap,task);
2723 scheduleDoGC(&cap,task,force_major);
2724 releaseCapability(cap);
2725 boundTaskExiting(task);
2726 }
2727
2728 void
2729 performGC(void)
2730 {
2731 performGC_(false);
2732 }
2733
2734 void
2735 performMajorGC(void)
2736 {
2737 performGC_(true);
2738 }
2739
2740 /* ---------------------------------------------------------------------------
2741 Interrupt execution.
2742 Might be called inside a signal handler so it mustn't do anything fancy.
2743 ------------------------------------------------------------------------ */
2744
2745 void
2746 interruptStgRts(void)
2747 {
2748 sched_state = SCHED_INTERRUPTING;
2749 interruptAllCapabilities();
2750 #if defined(THREADED_RTS)
2751 wakeUpRts();
2752 #endif
2753 }
2754
2755 /* -----------------------------------------------------------------------------
2756 Wake up the RTS
2757
2758 This function causes at least one OS thread to wake up and run the
2759 scheduler loop. It is invoked when the RTS might be deadlocked, or
2760 an external event has arrived that may need servicing (eg. a
2761 keyboard interrupt).
2762
2763 In the single-threaded RTS we don't do anything here; we only have
2764 one thread anyway, and the event that caused us to want to wake up
2765 will have interrupted any blocking system call in progress anyway.
2766 -------------------------------------------------------------------------- */
2767
2768 #if defined(THREADED_RTS)
2769 void wakeUpRts(void)
2770 {
2771 // This forces the IO Manager thread to wakeup, which will
2772 // in turn ensure that some OS thread wakes up and runs the
2773 // scheduler loop, which will cause a GC and deadlock check.
2774 ioManagerWakeup();
2775 }
2776 #endif
2777
2778 /* -----------------------------------------------------------------------------
2779 Deleting threads
2780
2781 This is used for interruption (^C) and forking, and corresponds to
2782 raising an exception but without letting the thread catch the
2783 exception.
2784 -------------------------------------------------------------------------- */
2785
2786 static void
2787 deleteThread (StgTSO *tso)
2788 {
2789 // NOTE: must only be called on a TSO that we have exclusive
2790 // access to, because we will call throwToSingleThreaded() below.
2791 // The TSO must be on the run queue of the Capability we own, or
2792 // we must own all Capabilities.
2793
2794 if (tso->why_blocked != BlockedOnCCall &&
2795 tso->why_blocked != BlockedOnCCall_Interruptible) {
2796 throwToSingleThreaded(tso->cap,tso,NULL);
2797 }
2798 }
2799
2800 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2801 static void
2802 deleteThread_(StgTSO *tso)
2803 { // for forkProcess only:
2804 // like deleteThread(), but we delete threads in foreign calls, too.
2805
2806 if (tso->why_blocked == BlockedOnCCall ||
2807 tso->why_blocked == BlockedOnCCall_Interruptible) {
2808 tso->what_next = ThreadKilled;
2809 appendToRunQueue(tso->cap, tso);
2810 } else {
2811 deleteThread(tso);
2812 }
2813 }
2814 #endif
2815
2816 /* -----------------------------------------------------------------------------
2817 raiseExceptionHelper
2818
2819 This function is called by the raise# primitive, just so that we can
2820 move some of the tricky bits of raising an exception from C-- into
2821 C. Who knows, it might be a useful re-useable thing here too.
2822 -------------------------------------------------------------------------- */
2823
2824 StgWord
2825 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2826 {
2827 Capability *cap = regTableToCapability(reg);
2828 StgThunk *raise_closure = NULL;
2829 StgPtr p, next;
2830 const StgRetInfoTable *info;
2831 //
2832 // This closure represents the expression 'raise# E' where E
2833 // is the exception raise. It is used to overwrite all the
2834 // thunks which are currently under evaluation.
2835 //
2836
2837 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2838 // LDV profiling: stg_raise_info has THUNK as its closure
2839 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2840 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2841 // 1 does not cause any problem unless profiling is performed.
2842 // However, when LDV profiling goes on, we need to linearly scan
2843 // small object pool, where raise_closure is stored, so we should
2844 // use MIN_UPD_SIZE.
2845 //
2846 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2847 // sizeofW(StgClosure)+1);
2848 //
2849
2850 //
2851 // Walk up the stack, looking for the catch frame. On the way,
2852 // we update any closures pointed to from update frames with the
2853 // raise closure that we just built.
2854 //
2855 p = tso->stackobj->sp;
2856 while(1) {
2857 info = get_ret_itbl((StgClosure *)p);
2858 next = p + stack_frame_sizeW((StgClosure *)p);
2859 switch (info->i.type) {
2860
2861 case UPDATE_FRAME:
2862 // Only create raise_closure if we need to.
2863 if (raise_closure == NULL) {
2864 raise_closure =
2865 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2866 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2867 raise_closure->payload[0] = exception;
2868 }
2869 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2870 (StgClosure *)raise_closure);
2871 p = next;
2872 continue;
2873
2874 case ATOMICALLY_FRAME:
2875 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2876 tso->stackobj->sp = p;
2877 return ATOMICALLY_FRAME;
2878
2879 case CATCH_FRAME:
2880 tso->stackobj->sp = p;
2881 return CATCH_FRAME;
2882
2883 case CATCH_STM_FRAME:
2884 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2885 tso->stackobj->sp = p;
2886 return CATCH_STM_FRAME;
2887
2888 case UNDERFLOW_FRAME:
2889 tso->stackobj->sp = p;
2890 threadStackUnderflow(cap,tso);
2891 p = tso->stackobj->sp;
2892 continue;
2893
2894 case STOP_FRAME:
2895 tso->stackobj->sp = p;
2896 return STOP_FRAME;
2897
2898 case CATCH_RETRY_FRAME: {
2899 StgTRecHeader *trec = tso -> trec;
2900 StgTRecHeader *outer = trec -> enclosing_trec;
2901 debugTrace(DEBUG_stm,
2902 "found CATCH_RETRY_FRAME at %p during raise", p);
2903 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2904 stmAbortTransaction(cap, trec);
2905 stmFreeAbortedTRec(cap, trec);
2906 tso -> trec = outer;
2907 p = next;
2908 continue;
2909 }
2910
2911 default:
2912 p = next;
2913 continue;
2914 }
2915 }
2916 }
2917
2918
2919 /* -----------------------------------------------------------------------------
2920 findRetryFrameHelper
2921
2922 This function is called by the retry# primitive. It traverses the stack
2923 leaving tso->sp referring to the frame which should handle the retry.
2924
2925 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2926 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2927
2928 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2929 create) because retries are not considered to be exceptions, despite the
2930 similar implementation.
2931
2932 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2933 not be created within memory transactions.
2934 -------------------------------------------------------------------------- */
2935
2936 StgWord
2937 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2938 {
2939 const StgRetInfoTable *info;
2940 StgPtr p, next;
2941
2942 p = tso->stackobj->sp;
2943 while (1) {
2944 info = get_ret_itbl((const StgClosure *)p);
2945 next = p + stack_frame_sizeW((StgClosure *)p);
2946 switch (info->i.type) {
2947
2948 case ATOMICALLY_FRAME:
2949 debugTrace(DEBUG_stm,
2950 "found ATOMICALLY_FRAME at %p during retry", p);
2951 tso->stackobj->sp = p;
2952 return ATOMICALLY_FRAME;
2953
2954 case CATCH_RETRY_FRAME:
2955 debugTrace(DEBUG_stm,
2956 "found CATCH_RETRY_FRAME at %p during retry", p);
2957 tso->stackobj->sp = p;
2958 return CATCH_RETRY_FRAME;
2959
2960 case CATCH_STM_FRAME: {
2961 StgTRecHeader *trec = tso -> trec;
2962 StgTRecHeader *outer = trec -> enclosing_trec;
2963 debugTrace(DEBUG_stm,
2964 "found CATCH_STM_FRAME at %p during retry", p);
2965 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2966 stmAbortTransaction(cap, trec);
2967 stmFreeAbortedTRec(cap, trec);
2968 tso -> trec = outer;
2969 p = next;
2970 continue;
2971 }
2972
2973 case UNDERFLOW_FRAME:
2974 tso->stackobj->sp = p;
2975 threadStackUnderflow(cap,tso);
2976 p = tso->stackobj->sp;
2977 continue;
2978
2979 default:
2980 ASSERT(info->i.type != CATCH_FRAME);
2981 ASSERT(info->i.type != STOP_FRAME);
2982 p = next;
2983 continue;
2984 }
2985 }
2986 }
2987
2988 /* -----------------------------------------------------------------------------
2989 resurrectThreads is called after garbage collection on the list of
2990 threads found to be garbage. Each of these threads will be woken
2991 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2992 on an MVar, or NonTermination if the thread was blocked on a Black
2993 Hole.
2994
2995 Locks: assumes we hold *all* the capabilities.
2996 -------------------------------------------------------------------------- */
2997
2998 void
2999 resurrectThreads (StgTSO *threads)
3000 {
3001 StgTSO *tso, *next;
3002 Capability *cap;
3003 generation *gen;
3004
3005 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3006 next = tso->global_link;
3007
3008 gen = Bdescr((P_)tso)->gen;
3009 tso->global_link = gen->threads;
3010 gen->threads = tso;
3011
3012 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3013
3014 // Wake up the thread on the Capability it was last on
3015 cap = tso->cap;
3016
3017 switch (tso->why_blocked) {
3018 case BlockedOnMVar:
3019 case BlockedOnMVarRead:
3020 /* Called by GC - sched_mutex lock is currently held. */
3021 throwToSingleThreaded(cap, tso,
3022 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3023 break;
3024 case BlockedOnBlackHole:
3025 throwToSingleThreaded(cap, tso,
3026 (StgClosure *)nonTermination_closure);
3027 break;
3028 case BlockedOnSTM:
3029 throwToSingleThreaded(cap, tso,
3030 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3031 break;
3032 case NotBlocked:
3033 /* This might happen if the thread was blocked on a black hole
3034 * belonging to a thread that we've just woken up (raiseAsync
3035 * can wake up threads, remember...).
3036 */
3037 continue;
3038 case BlockedOnMsgThrowTo:
3039 // This can happen if the target is masking, blocks on a
3040 // black hole, and then is found to be unreachable. In
3041 // this case, we want to let the target wake up and carry
3042 // on, and do nothing to this thread.
3043 continue;
3044 default:
3045 barf("resurrectThreads: thread blocked in a strange way: %d",
3046 tso->why_blocked);
3047 }
3048 }
3049 }