Schedule.c: remove unreachable code block
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %" FMT_Word, sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #if defined(THREADED_RTS)
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #if defined(mingw32_HOST_OS)
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #if !defined(PROFILING)
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #if defined(mingw32_HOST_OS)
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 didGcLast = yieldCapability(&cap,task, !didGcLast);
683 }
684 while (shouldYieldCapability(cap,task,didGcLast));
685
686 // note there may still be no threads on the run queue at this
687 // point, the caller has to check.
688
689 *pcap = cap;
690 return;
691 }
692 #endif
693
694 /* -----------------------------------------------------------------------------
695 * schedulePushWork()
696 *
697 * Push work to other Capabilities if we have some.
698 * -------------------------------------------------------------------------- */
699
700 static void
701 schedulePushWork(Capability *cap USED_IF_THREADS,
702 Task *task USED_IF_THREADS)
703 {
704 /* following code not for PARALLEL_HASKELL. I kept the call general,
705 future GUM versions might use pushing in a distributed setup */
706 #if defined(THREADED_RTS)
707
708 Capability *free_caps[n_capabilities], *cap0;
709 uint32_t i, n_wanted_caps, n_free_caps;
710
711 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
712
713 // migration can be turned off with +RTS -qm
714 if (!RtsFlags.ParFlags.migrate) {
715 spare_threads = 0;
716 }
717
718 // Figure out how many capabilities we want to wake up. We need at least
719 // sparkPoolSize(cap) plus the number of spare threads we have.
720 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
721 if (n_wanted_caps == 0) return;
722
723 // First grab as many free Capabilities as we can. ToDo: we should use
724 // capabilities on the same NUMA node preferably, but not exclusively.
725 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
726 n_free_caps < n_wanted_caps && i != cap->no;
727 i = (i + 1) % n_capabilities) {
728 cap0 = capabilities[i];
729 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
730 if (!emptyRunQueue(cap0)
731 || cap0->n_returning_tasks != 0
732 || !emptyInbox(cap0)) {
733 // it already has some work, we just grabbed it at
734 // the wrong moment. Or maybe it's deadlocked!
735 releaseCapability(cap0);
736 } else {
737 free_caps[n_free_caps++] = cap0;
738 }
739 }
740 }
741
742 // We now have n_free_caps free capabilities stashed in
743 // free_caps[]. Attempt to share our run queue equally with them.
744 // This is complicated slightly by the fact that we can't move
745 // some threads:
746 //
747 // - threads that have TSO_LOCKED cannot migrate
748 // - a thread that is bound to the current Task cannot be migrated
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761
762 debugTrace(DEBUG_sched,
763 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
764 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
765 n_free_caps);
766
767 // There are n_free_caps+1 caps in total. We will share the threads
768 // evently between them, *except* that if the run queue does not divide
769 // evenly by n_free_caps+1 then we bias towards the current capability.
770 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
771 uint32_t keep_threads =
772 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
773
774 // This also ensures that we don't give away all our threads, since
775 // (x + y) / (y + 1) >= 1 when x >= 1.
776
777 // The number of threads we have left.
778 uint32_t n = cap->n_run_queue;
779
780 // prev = the previous thread on this cap's run queue
781 prev = END_TSO_QUEUE;
782
783 // We're going to walk through the run queue, migrating threads to other
784 // capabilities until we have only keep_threads left. We might
785 // encounter a thread that cannot be migrated, in which case we add it
786 // to the current run queue and decrement keep_threads.
787 for (t = cap->run_queue_hd, i = 0;
788 t != END_TSO_QUEUE && n > keep_threads;
789 t = next)
790 {
791 next = t->_link;
792 t->_link = END_TSO_QUEUE;
793
794 // Should we keep this thread?
795 if (t->bound == task->incall // don't move my bound thread
796 || tsoLocked(t) // don't move a locked thread
797 ) {
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 if (keep_threads > 0) keep_threads--;
806 }
807
808 // Or migrate it?
809 else {
810 appendToRunQueue(free_caps[i],t);
811 traceEventMigrateThread (cap, t, free_caps[i]->no);
812
813 if (t->bound) { t->bound->task->cap = free_caps[i]; }
814 t->cap = free_caps[i];
815 n--; // we have one fewer threads now
816 i++; // move on to the next free_cap
817 if (i == n_free_caps) i = 0;
818 }
819 }
820
821 // Join up the beginning of the queue (prev)
822 // with the rest of the queue (t)
823 if (t == END_TSO_QUEUE) {
824 cap->run_queue_tl = prev;
825 } else {
826 setTSOPrev(cap, t, prev);
827 }
828 if (prev == END_TSO_QUEUE) {
829 cap->run_queue_hd = t;
830 } else {
831 setTSOLink(cap, prev, t);
832 }
833 cap->n_run_queue = n;
834
835 IF_DEBUG(sanity, checkRunQueue(cap));
836
837 // release the capabilities
838 for (i = 0; i < n_free_caps; i++) {
839 task->cap = free_caps[i];
840 if (sparkPoolSizeCap(cap) > 0) {
841 // If we have sparks to steal, wake up a worker on the
842 // capability, even if it has no threads to run.
843 releaseAndWakeupCapability(free_caps[i]);
844 } else {
845 releaseCapability(free_caps[i]);
846 }
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850
851 #endif /* THREADED_RTS */
852
853 }
854
855 /* ----------------------------------------------------------------------------
856 * Start any pending signal handlers
857 * ------------------------------------------------------------------------- */
858
859 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
860 static void
861 scheduleStartSignalHandlers(Capability *cap)
862 {
863 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
864 // safe outside the lock
865 startSignalHandlers(cap);
866 }
867 }
868 #else
869 static void
870 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
871 {
872 }
873 #endif
874
875 /* ----------------------------------------------------------------------------
876 * Check for blocked threads that can be woken up.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
881 {
882 #if !defined(THREADED_RTS)
883 //
884 // Check whether any waiting threads need to be woken up. If the
885 // run queue is empty, and there are no other tasks running, we
886 // can wait indefinitely for something to happen.
887 //
888 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
889 {
890 awaitEvent (emptyRunQueue(cap));
891 }
892 #endif
893 }
894
895 /* ----------------------------------------------------------------------------
896 * Detect deadlock conditions and attempt to resolve them.
897 * ------------------------------------------------------------------------- */
898
899 static void
900 scheduleDetectDeadlock (Capability **pcap, Task *task)
901 {
902 Capability *cap = *pcap;
903 /*
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
907 * some description.
908 */
909 if ( emptyThreadQueues(cap) )
910 {
911 #if defined(THREADED_RTS)
912 /*
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
917 */
918 if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
922
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
927 // runnable.
928 scheduleDoGC (pcap, task, true/*force major GC*/);
929 cap = *pcap;
930 // when force_major == true. scheduleDoGC sets
931 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 // signal.
933
934 if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
937 /* If we have user-installed signal handlers, then wait
938 * for signals to arrive rather then bombing out with a
939 * deadlock.
940 */
941 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
942 debugTrace(DEBUG_sched,
943 "still deadlocked, waiting for signals...");
944
945 awaitUserSignals();
946
947 if (signals_pending()) {
948 startSignalHandlers(cap);
949 }
950
951 // either we have threads to run, or we were interrupted:
952 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
953
954 return;
955 }
956 #endif
957
958 #if !defined(THREADED_RTS)
959 /* Probably a real deadlock. Send the current main thread the
960 * Deadlock exception.
961 */
962 if (task->incall->tso) {
963 switch (task->incall->tso->why_blocked) {
964 case BlockedOnSTM:
965 case BlockedOnBlackHole:
966 case BlockedOnMsgThrowTo:
967 case BlockedOnMVar:
968 case BlockedOnMVarRead:
969 throwToSingleThreaded(cap, task->incall->tso,
970 (StgClosure *)nonTermination_closure);
971 return;
972 default:
973 barf("deadlock: main thread blocked in a strange way");
974 }
975 }
976 return;
977 #endif
978 }
979 }
980
981
982 /* ----------------------------------------------------------------------------
983 * Process message in the current Capability's inbox
984 * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
988 {
989 #if defined(THREADED_RTS)
990 Message *m, *next;
991 PutMVar *p, *pnext;
992 int r;
993 Capability *cap = *pcap;
994
995 while (!emptyInbox(cap)) {
996 // Executing messages might use heap, so we should check for GC.
997 if (doYouWantToGC(cap)) {
998 scheduleDoGC(pcap, cap->running_task, false);
999 cap = *pcap;
1000 }
1001
1002 // don't use a blocking acquire; if the lock is held by
1003 // another thread then just carry on. This seems to avoid
1004 // getting stuck in a message ping-pong situation with other
1005 // processors. We'll check the inbox again later anyway.
1006 //
1007 // We should really use a more efficient queue data structure
1008 // here. The trickiness is that we must ensure a Capability
1009 // never goes idle if the inbox is non-empty, which is why we
1010 // use cap->lock (cap->lock is released as the last thing
1011 // before going idle; see Capability.c:releaseCapability()).
1012 r = TRY_ACQUIRE_LOCK(&cap->lock);
1013 if (r != 0) return;
1014
1015 m = cap->inbox;
1016 p = cap->putMVars;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018 cap->putMVars = NULL;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027
1028 while (p != NULL) {
1029 pnext = p->link;
1030 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1031 Unit_closure);
1032 freeStablePtr(p->mvar);
1033 stgFree(p);
1034 p = pnext;
1035 }
1036 }
1037 #endif
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042 * Activate spark threads (THREADED_RTS)
1043 * ------------------------------------------------------------------------- */
1044
1045 #if defined(THREADED_RTS)
1046 static void
1047 scheduleActivateSpark(Capability *cap)
1048 {
1049 if (anySparks() && !cap->disabled)
1050 {
1051 createSparkThread(cap);
1052 debugTrace(DEBUG_sched, "creating a spark thread");
1053 }
1054 }
1055 #endif // THREADED_RTS
1056
1057 /* ----------------------------------------------------------------------------
1058 * After running a thread...
1059 * ------------------------------------------------------------------------- */
1060
1061 static void
1062 schedulePostRunThread (Capability *cap, StgTSO *t)
1063 {
1064 // We have to be able to catch transactions that are in an
1065 // infinite loop as a result of seeing an inconsistent view of
1066 // memory, e.g.
1067 //
1068 // atomically $ do
1069 // [a,b] <- mapM readTVar [ta,tb]
1070 // when (a == b) loop
1071 //
1072 // and a is never equal to b given a consistent view of memory.
1073 //
1074 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1075 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1076 debugTrace(DEBUG_sched | DEBUG_stm,
1077 "trec %p found wasting its time", t);
1078
1079 // strip the stack back to the
1080 // ATOMICALLY_FRAME, aborting the (nested)
1081 // transaction, and saving the stack of any
1082 // partially-evaluated thunks on the heap.
1083 throwToSingleThreaded_(cap, t, NULL, true);
1084
1085 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 }
1087 }
1088
1089 //
1090 // If the current thread's allocation limit has run out, send it
1091 // the AllocationLimitExceeded exception.
1092
1093 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1094 // Use a throwToSelf rather than a throwToSingleThreaded, because
1095 // it correctly handles the case where the thread is currently
1096 // inside mask. Also the thread might be blocked (e.g. on an
1097 // MVar), and throwToSingleThreaded doesn't unblock it
1098 // correctly in that case.
1099 throwToSelf(cap, t, allocationLimitExceeded_closure);
1100 ASSIGN_Int64((W_*)&(t->alloc_limit),
1101 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1102 }
1103
1104 /* some statistics gathering in the parallel case */
1105 }
1106
1107 /* -----------------------------------------------------------------------------
1108 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1109 * -------------------------------------------------------------------------- */
1110
1111 static bool
1112 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1113 {
1114 if (cap->r.rHpLim == NULL || cap->context_switch) {
1115 // Sometimes we miss a context switch, e.g. when calling
1116 // primitives in a tight loop, MAYBE_GC() doesn't check the
1117 // context switch flag, and we end up waiting for a GC.
1118 // See #1984, and concurrent/should_run/1984
1119 cap->context_switch = 0;
1120 appendToRunQueue(cap,t);
1121 } else {
1122 pushOnRunQueue(cap,t);
1123 }
1124
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1127 // if so, get one and push it on the front of the nursery.
1128 bdescr *bd;
1129 W_ blocks;
1130
1131 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1132
1133 if (blocks > BLOCKS_PER_MBLOCK) {
1134 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1135 }
1136
1137 debugTrace(DEBUG_sched,
1138 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1139 (long)t->id, what_next_strs[t->what_next], blocks);
1140
1141 // don't do this if the nursery is (nearly) full, we'll GC first.
1142 if (cap->r.rCurrentNursery->link != NULL ||
1143 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1144 // infinite loop if the
1145 // nursery has only one
1146 // block.
1147
1148 bd = allocGroupOnNode_lock(cap->node,blocks);
1149 cap->r.rNursery->n_blocks += blocks;
1150
1151 // link the new group after CurrentNursery
1152 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1153
1154 // initialise it as a nursery block. We initialise the
1155 // step, gen_no, and flags field of *every* sub-block in
1156 // this large block, because this is easier than making
1157 // sure that we always find the block head of a large
1158 // block whenever we call Bdescr() (eg. evacuate() and
1159 // isAlive() in the GC would both have to do this, at
1160 // least).
1161 {
1162 bdescr *x;
1163 for (x = bd; x < bd + blocks; x++) {
1164 initBdescr(x,g0,g0);
1165 x->free = x->start;
1166 x->flags = 0;
1167 }
1168 }
1169
1170 // This assert can be a killer if the app is doing lots
1171 // of large block allocations.
1172 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1173
1174 // now update the nursery to point to the new block
1175 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1176 cap->r.rCurrentNursery = bd;
1177
1178 // we might be unlucky and have another thread get on the
1179 // run queue before us and steal the large block, but in that
1180 // case the thread will just end up requesting another large
1181 // block.
1182 return false; /* not actually GC'ing */
1183 }
1184 }
1185
1186 return doYouWantToGC(cap);
1187 /* actual GC is done at the end of the while loop in schedule() */
1188 }
1189
1190 /* -----------------------------------------------------------------------------
1191 * Handle a thread that returned to the scheduler with ThreadYielding
1192 * -------------------------------------------------------------------------- */
1193
1194 static bool
1195 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1196 {
1197 /* put the thread back on the run queue. Then, if we're ready to
1198 * GC, check whether this is the last task to stop. If so, wake
1199 * up the GC thread. getThread will block during a GC until the
1200 * GC is finished.
1201 */
1202
1203 ASSERT(t->_link == END_TSO_QUEUE);
1204
1205 // Shortcut if we're just switching evaluators: just run the thread. See
1206 // Note [avoiding threadPaused] in Interpreter.c.
1207 if (t->what_next != prev_what_next) {
1208 debugTrace(DEBUG_sched,
1209 "--<< thread %ld (%s) stopped to switch evaluators",
1210 (long)t->id, what_next_strs[t->what_next]);
1211 return true;
1212 }
1213
1214 // Reset the context switch flag. We don't do this just before
1215 // running the thread, because that would mean we would lose ticks
1216 // during GC, which can lead to unfair scheduling (a thread hogs
1217 // the CPU because the tick always arrives during GC). This way
1218 // penalises threads that do a lot of allocation, but that seems
1219 // better than the alternative.
1220 if (cap->context_switch != 0) {
1221 cap->context_switch = 0;
1222 appendToRunQueue(cap,t);
1223 } else {
1224 pushOnRunQueue(cap,t);
1225 }
1226
1227 IF_DEBUG(sanity,
1228 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1229 checkTSO(t));
1230
1231 return false;
1232 }
1233
1234 /* -----------------------------------------------------------------------------
1235 * Handle a thread that returned to the scheduler with ThreadBlocked
1236 * -------------------------------------------------------------------------- */
1237
1238 static void
1239 scheduleHandleThreadBlocked( StgTSO *t
1240 #if !defined(DEBUG)
1241 STG_UNUSED
1242 #endif
1243 )
1244 {
1245
1246 // We don't need to do anything. The thread is blocked, and it
1247 // has tidied up its stack and placed itself on whatever queue
1248 // it needs to be on.
1249
1250 // ASSERT(t->why_blocked != NotBlocked);
1251 // Not true: for example,
1252 // - the thread may have woken itself up already, because
1253 // threadPaused() might have raised a blocked throwTo
1254 // exception, see maybePerformBlockedException().
1255
1256 #if defined(DEBUG)
1257 traceThreadStatus(DEBUG_sched, t);
1258 #endif
1259 }
1260
1261 /* -----------------------------------------------------------------------------
1262 * Handle a thread that returned to the scheduler with ThreadFinished
1263 * -------------------------------------------------------------------------- */
1264
1265 static bool
1266 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1267 {
1268 /* Need to check whether this was a main thread, and if so,
1269 * return with the return value.
1270 *
1271 * We also end up here if the thread kills itself with an
1272 * uncaught exception, see Exception.cmm.
1273 */
1274
1275 // blocked exceptions can now complete, even if the thread was in
1276 // blocked mode (see #2910).
1277 awakenBlockedExceptionQueue (cap, t);
1278
1279 //
1280 // Check whether the thread that just completed was a bound
1281 // thread, and if so return with the result.
1282 //
1283 // There is an assumption here that all thread completion goes
1284 // through this point; we need to make sure that if a thread
1285 // ends up in the ThreadKilled state, that it stays on the run
1286 // queue so it can be dealt with here.
1287 //
1288
1289 if (t->bound) {
1290
1291 if (t->bound != task->incall) {
1292 #if !defined(THREADED_RTS)
1293 // Must be a bound thread that is not the topmost one. Leave
1294 // it on the run queue until the stack has unwound to the
1295 // point where we can deal with this. Leaving it on the run
1296 // queue also ensures that the garbage collector knows about
1297 // this thread and its return value (it gets dropped from the
1298 // step->threads list so there's no other way to find it).
1299 appendToRunQueue(cap,t);
1300 return false;
1301 #else
1302 // this cannot happen in the threaded RTS, because a
1303 // bound thread can only be run by the appropriate Task.
1304 barf("finished bound thread that isn't mine");
1305 #endif
1306 }
1307
1308 ASSERT(task->incall->tso == t);
1309
1310 if (t->what_next == ThreadComplete) {
1311 if (task->incall->ret) {
1312 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1313 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1314 }
1315 task->incall->rstat = Success;
1316 } else {
1317 if (task->incall->ret) {
1318 *(task->incall->ret) = NULL;
1319 }
1320 if (sched_state >= SCHED_INTERRUPTING) {
1321 if (heap_overflow) {
1322 task->incall->rstat = HeapExhausted;
1323 } else {
1324 task->incall->rstat = Interrupted;
1325 }
1326 } else {
1327 task->incall->rstat = Killed;
1328 }
1329 }
1330 #if defined(DEBUG)
1331 removeThreadLabel((StgWord)task->incall->tso->id);
1332 #endif
1333
1334 // We no longer consider this thread and task to be bound to
1335 // each other. The TSO lives on until it is GC'd, but the
1336 // task is about to be released by the caller, and we don't
1337 // want anyone following the pointer from the TSO to the
1338 // defunct task (which might have already been
1339 // re-used). This was a real bug: the GC updated
1340 // tso->bound->tso which lead to a deadlock.
1341 t->bound = NULL;
1342 task->incall->tso = NULL;
1343
1344 return true; // tells schedule() to return
1345 }
1346
1347 return false;
1348 }
1349
1350 /* -----------------------------------------------------------------------------
1351 * Perform a heap census
1352 * -------------------------------------------------------------------------- */
1353
1354 static bool
1355 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1356 {
1357 // When we have +RTS -i0 and we're heap profiling, do a census at
1358 // every GC. This lets us get repeatable runs for debugging.
1359 if (performHeapProfile ||
1360 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1361 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1362 return true;
1363 } else {
1364 return false;
1365 }
1366 }
1367
1368 /* -----------------------------------------------------------------------------
1369 * stopAllCapabilities()
1370 *
1371 * Stop all Haskell execution. This is used when we need to make some global
1372 * change to the system, such as altering the number of capabilities, or
1373 * forking.
1374 *
1375 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1376 * -------------------------------------------------------------------------- */
1377
1378 #if defined(THREADED_RTS)
1379 static void stopAllCapabilities (Capability **pCap, Task *task)
1380 {
1381 bool was_syncing;
1382 SyncType prev_sync_type;
1383
1384 PendingSync sync = {
1385 .type = SYNC_OTHER,
1386 .idle = NULL,
1387 .task = task
1388 };
1389
1390 do {
1391 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1392 } while (was_syncing);
1393
1394 acquireAllCapabilities(*pCap,task);
1395
1396 pending_sync = 0;
1397 }
1398 #endif
1399
1400 /* -----------------------------------------------------------------------------
1401 * requestSync()
1402 *
1403 * Commence a synchronisation between all capabilities. Normally not called
1404 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1405 * has some special synchronisation requirements.
1406 *
1407 * Returns:
1408 * false if we successfully got a sync
1409 * true if there was another sync request in progress,
1410 * and we yielded to it. The value returned is the
1411 * type of the other sync request.
1412 * -------------------------------------------------------------------------- */
1413
1414 #if defined(THREADED_RTS)
1415 static bool requestSync (
1416 Capability **pcap, Task *task, PendingSync *new_sync,
1417 SyncType *prev_sync_type)
1418 {
1419 PendingSync *sync;
1420
1421 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1422 (StgWord)NULL,
1423 (StgWord)new_sync);
1424
1425 if (sync != NULL)
1426 {
1427 // sync is valid until we have called yieldCapability().
1428 // After the sync is completed, we cannot read that struct any
1429 // more because it has been freed.
1430 *prev_sync_type = sync->type;
1431 do {
1432 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1433 sync->type);
1434 ASSERT(*pcap);
1435 yieldCapability(pcap,task,true);
1436 sync = pending_sync;
1437 } while (sync != NULL);
1438
1439 // NOTE: task->cap might have changed now
1440 return true;
1441 }
1442 else
1443 {
1444 return false;
1445 }
1446 }
1447 #endif
1448
1449 /* -----------------------------------------------------------------------------
1450 * acquireAllCapabilities()
1451 *
1452 * Grab all the capabilities except the one we already hold. Used
1453 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1454 * before a fork (SYNC_OTHER).
1455 *
1456 * Only call this after requestSync(), otherwise a deadlock might
1457 * ensue if another thread is trying to synchronise.
1458 * -------------------------------------------------------------------------- */
1459
1460 #if defined(THREADED_RTS)
1461 static void acquireAllCapabilities(Capability *cap, Task *task)
1462 {
1463 Capability *tmpcap;
1464 uint32_t i;
1465
1466 ASSERT(pending_sync != NULL);
1467 for (i=0; i < n_capabilities; i++) {
1468 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1469 i, n_capabilities);
1470 tmpcap = capabilities[i];
1471 if (tmpcap != cap) {
1472 // we better hope this task doesn't get migrated to
1473 // another Capability while we're waiting for this one.
1474 // It won't, because load balancing happens while we have
1475 // all the Capabilities, but even so it's a slightly
1476 // unsavoury invariant.
1477 task->cap = tmpcap;
1478 waitForCapability(&tmpcap, task);
1479 if (tmpcap->no != i) {
1480 barf("acquireAllCapabilities: got the wrong capability");
1481 }
1482 }
1483 }
1484 task->cap = cap;
1485 }
1486 #endif
1487
1488 /* -----------------------------------------------------------------------------
1489 * releaseAllcapabilities()
1490 *
1491 * Assuming this thread holds all the capabilities, release them all except for
1492 * the one passed in as cap.
1493 * -------------------------------------------------------------------------- */
1494
1495 #if defined(THREADED_RTS)
1496 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1497 {
1498 uint32_t i;
1499
1500 for (i = 0; i < n; i++) {
1501 if (cap->no != i) {
1502 task->cap = capabilities[i];
1503 releaseCapability(capabilities[i]);
1504 }
1505 }
1506 task->cap = cap;
1507 }
1508 #endif
1509
1510 /* -----------------------------------------------------------------------------
1511 * Perform a garbage collection if necessary
1512 * -------------------------------------------------------------------------- */
1513
1514 static void
1515 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1516 bool force_major)
1517 {
1518 Capability *cap = *pcap;
1519 bool heap_census;
1520 uint32_t collect_gen;
1521 bool major_gc;
1522 #if defined(THREADED_RTS)
1523 uint32_t gc_type;
1524 uint32_t i;
1525 uint32_t need_idle;
1526 uint32_t n_gc_threads;
1527 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1528 StgTSO *tso;
1529 bool *idle_cap;
1530 // idle_cap is an array (allocated later) of size n_capabilities, where
1531 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1532 // cycle.
1533 #endif
1534
1535 if (sched_state == SCHED_SHUTTING_DOWN) {
1536 // The final GC has already been done, and the system is
1537 // shutting down. We'll probably deadlock if we try to GC
1538 // now.
1539 return;
1540 }
1541
1542 heap_census = scheduleNeedHeapProfile(true);
1543
1544 // Figure out which generation we are collecting, so that we can
1545 // decide whether this is a parallel GC or not.
1546 collect_gen = calcNeeded(force_major || heap_census, NULL);
1547 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1548
1549 #if defined(THREADED_RTS)
1550 if (sched_state < SCHED_INTERRUPTING
1551 && RtsFlags.ParFlags.parGcEnabled
1552 && collect_gen >= RtsFlags.ParFlags.parGcGen
1553 && ! oldest_gen->mark)
1554 {
1555 gc_type = SYNC_GC_PAR;
1556 } else {
1557 gc_type = SYNC_GC_SEQ;
1558 }
1559
1560 // In order to GC, there must be no threads running Haskell code.
1561 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1562 // capabilities, and release them after the GC has completed. For parallel
1563 // GC, we synchronise all the running threads using requestSync().
1564 //
1565 // Other capabilities are prevented from running yet more Haskell threads if
1566 // pending_sync is set. Tested inside yieldCapability() and
1567 // releaseCapability() in Capability.c
1568
1569 PendingSync sync = {
1570 .type = gc_type,
1571 .idle = NULL,
1572 .task = task
1573 };
1574
1575 {
1576 SyncType prev_sync = 0;
1577 bool was_syncing;
1578 do {
1579 // If -qn is not set and we have more capabilities than cores, set
1580 // the number of GC threads to #cores. We do this here rather than
1581 // in normaliseRtsOpts() because here it will work if the program
1582 // calls setNumCapabilities.
1583 //
1584 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1585 if (n_gc_threads == 0 &&
1586 enabled_capabilities > getNumberOfProcessors()) {
1587 n_gc_threads = getNumberOfProcessors();
1588 }
1589
1590 // This calculation must be inside the loop because
1591 // enabled_capabilities may change if requestSync() below fails and
1592 // we retry.
1593 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1594 if (n_gc_threads >= enabled_capabilities) {
1595 need_idle = 0;
1596 } else {
1597 need_idle = enabled_capabilities - n_gc_threads;
1598 }
1599 } else {
1600 need_idle = 0;
1601 }
1602
1603 // We need an array of size n_capabilities, but since this may
1604 // change each time around the loop we must allocate it afresh.
1605 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1606 sizeof(bool),
1607 "scheduleDoGC");
1608 sync.idle = idle_cap;
1609
1610 // When using +RTS -qn, we need some capabilities to be idle during
1611 // GC. The best bet is to choose some inactive ones, so we look for
1612 // those first:
1613 uint32_t n_idle = need_idle;
1614 for (i=0; i < n_capabilities; i++) {
1615 if (capabilities[i]->disabled) {
1616 idle_cap[i] = true;
1617 } else if (n_idle > 0 &&
1618 capabilities[i]->running_task == NULL) {
1619 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1620 n_idle--;
1621 idle_cap[i] = true;
1622 } else {
1623 idle_cap[i] = false;
1624 }
1625 }
1626 // If we didn't find enough inactive capabilities, just pick some
1627 // more to be idle.
1628 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1629 if (!idle_cap[i] && i != cap->no) {
1630 idle_cap[i] = true;
1631 n_idle--;
1632 }
1633 }
1634 ASSERT(n_idle == 0);
1635
1636 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1637 cap = *pcap;
1638 if (was_syncing) {
1639 stgFree(idle_cap);
1640 }
1641 if (was_syncing &&
1642 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1643 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1644 // someone else had a pending sync request for a GC, so
1645 // let's assume GC has been done and we don't need to GC
1646 // again.
1647 // Exception to this: if SCHED_INTERRUPTING, then we still
1648 // need to do the final GC.
1649 return;
1650 }
1651 if (sched_state == SCHED_SHUTTING_DOWN) {
1652 // The scheduler might now be shutting down. We tested
1653 // this above, but it might have become true since then as
1654 // we yielded the capability in requestSync().
1655 return;
1656 }
1657 } while (was_syncing);
1658 }
1659
1660 stat_startGCSync(gc_threads[cap->no]);
1661
1662 #if defined(DEBUG)
1663 unsigned int old_n_capabilities = n_capabilities;
1664 #endif
1665
1666 interruptAllCapabilities();
1667
1668 // The final shutdown GC is always single-threaded, because it's
1669 // possible that some of the Capabilities have no worker threads.
1670
1671 if (gc_type == SYNC_GC_SEQ) {
1672 traceEventRequestSeqGc(cap);
1673 } else {
1674 traceEventRequestParGc(cap);
1675 }
1676
1677 if (gc_type == SYNC_GC_SEQ) {
1678 // single-threaded GC: grab all the capabilities
1679 acquireAllCapabilities(cap,task);
1680 }
1681 else
1682 {
1683 // If we are load-balancing collections in this
1684 // generation, then we require all GC threads to participate
1685 // in the collection. Otherwise, we only require active
1686 // threads to participate, and we set gc_threads[i]->idle for
1687 // any idle capabilities. The rationale here is that waking
1688 // up an idle Capability takes much longer than just doing any
1689 // GC work on its behalf.
1690
1691 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1692 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1693 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1694 {
1695 for (i=0; i < n_capabilities; i++) {
1696 if (capabilities[i]->disabled) {
1697 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1698 if (idle_cap[i]) {
1699 n_idle_caps++;
1700 }
1701 } else {
1702 if (i != cap->no && idle_cap[i]) {
1703 Capability *tmpcap = capabilities[i];
1704 task->cap = tmpcap;
1705 waitForCapability(&tmpcap, task);
1706 n_idle_caps++;
1707 }
1708 }
1709 }
1710 }
1711 else
1712 {
1713 for (i=0; i < n_capabilities; i++) {
1714 if (capabilities[i]->disabled) {
1715 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1716 if (idle_cap[i]) {
1717 n_idle_caps++;
1718 }
1719 } else if (i != cap->no &&
1720 capabilities[i]->idle >=
1721 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1722 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1723 if (idle_cap[i]) {
1724 n_idle_caps++;
1725 } else {
1726 n_failed_trygrab_idles++;
1727 }
1728 }
1729 }
1730 }
1731 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1732
1733 for (i=0; i < n_capabilities; i++) {
1734 capabilities[i]->idle++;
1735 }
1736
1737 // For all capabilities participating in this GC, wait until
1738 // they have stopped mutating and are standing by for GC.
1739 waitForGcThreads(cap, idle_cap);
1740
1741 // Stable point where we can do a global check on our spark counters
1742 ASSERT(checkSparkCountInvariant());
1743 }
1744
1745 #endif
1746
1747 IF_DEBUG(scheduler, printAllThreads());
1748
1749 delete_threads_and_gc:
1750 /*
1751 * We now have all the capabilities; if we're in an interrupting
1752 * state, then we should take the opportunity to delete all the
1753 * threads in the system.
1754 * Checking for major_gc ensures that the last GC is major.
1755 */
1756 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1757 deleteAllThreads(cap);
1758 #if defined(THREADED_RTS)
1759 // Discard all the sparks from every Capability. Why?
1760 // They'll probably be GC'd anyway since we've killed all the
1761 // threads. It just avoids the GC having to do any work to
1762 // figure out that any remaining sparks are garbage.
1763 for (i = 0; i < n_capabilities; i++) {
1764 capabilities[i]->spark_stats.gcd +=
1765 sparkPoolSize(capabilities[i]->sparks);
1766 // No race here since all Caps are stopped.
1767 discardSparksCap(capabilities[i]);
1768 }
1769 #endif
1770 sched_state = SCHED_SHUTTING_DOWN;
1771 }
1772
1773 /*
1774 * When there are disabled capabilities, we want to migrate any
1775 * threads away from them. Normally this happens in the
1776 * scheduler's loop, but only for unbound threads - it's really
1777 * hard for a bound thread to migrate itself. So we have another
1778 * go here.
1779 */
1780 #if defined(THREADED_RTS)
1781 for (i = enabled_capabilities; i < n_capabilities; i++) {
1782 Capability *tmp_cap, *dest_cap;
1783 tmp_cap = capabilities[i];
1784 ASSERT(tmp_cap->disabled);
1785 if (i != cap->no) {
1786 dest_cap = capabilities[i % enabled_capabilities];
1787 while (!emptyRunQueue(tmp_cap)) {
1788 tso = popRunQueue(tmp_cap);
1789 migrateThread(tmp_cap, tso, dest_cap);
1790 if (tso->bound) {
1791 traceTaskMigrate(tso->bound->task,
1792 tso->bound->task->cap,
1793 dest_cap);
1794 tso->bound->task->cap = dest_cap;
1795 }
1796 }
1797 }
1798 }
1799 #endif
1800
1801 #if defined(THREADED_RTS)
1802 // reset pending_sync *before* GC, so that when the GC threads
1803 // emerge they don't immediately re-enter the GC.
1804 pending_sync = 0;
1805 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1806 #else
1807 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1808 #endif
1809
1810 traceSparkCounters(cap);
1811
1812 switch (recent_activity) {
1813 case ACTIVITY_INACTIVE:
1814 if (force_major) {
1815 // We are doing a GC because the system has been idle for a
1816 // timeslice and we need to check for deadlock. Record the
1817 // fact that we've done a GC and turn off the timer signal;
1818 // it will get re-enabled if we run any threads after the GC.
1819 recent_activity = ACTIVITY_DONE_GC;
1820 #if !defined(PROFILING)
1821 stopTimer();
1822 #endif
1823 break;
1824 }
1825 // fall through...
1826
1827 case ACTIVITY_MAYBE_NO:
1828 // the GC might have taken long enough for the timer to set
1829 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1830 // but we aren't necessarily deadlocked:
1831 recent_activity = ACTIVITY_YES;
1832 break;
1833
1834 case ACTIVITY_DONE_GC:
1835 // If we are actually active, the scheduler will reset the
1836 // recent_activity flag and re-enable the timer.
1837 break;
1838 }
1839
1840 #if defined(THREADED_RTS)
1841 // Stable point where we can do a global check on our spark counters
1842 ASSERT(checkSparkCountInvariant());
1843 #endif
1844
1845 // The heap census itself is done during GarbageCollect().
1846 if (heap_census) {
1847 performHeapProfile = false;
1848 }
1849
1850 #if defined(THREADED_RTS)
1851
1852 // If n_capabilities has changed during GC, we're in trouble.
1853 ASSERT(n_capabilities == old_n_capabilities);
1854
1855 if (gc_type == SYNC_GC_PAR)
1856 {
1857 for (i = 0; i < n_capabilities; i++) {
1858 if (i != cap->no) {
1859 if (idle_cap[i]) {
1860 ASSERT(capabilities[i]->running_task == task);
1861 task->cap = capabilities[i];
1862 releaseCapability(capabilities[i]);
1863 } else {
1864 ASSERT(capabilities[i]->running_task != task);
1865 }
1866 }
1867 }
1868 task->cap = cap;
1869
1870 // releaseGCThreads() happens *after* we have released idle
1871 // capabilities. Otherwise what can happen is one of the released
1872 // threads starts a new GC, and finds that it can't acquire some of
1873 // the disabled capabilities, because the previous GC still holds
1874 // them, so those disabled capabilities will not be idle during the
1875 // next GC round. However, if we release the capabilities first,
1876 // then they will be free (because they're disabled) when the next
1877 // GC cycle happens.
1878 releaseGCThreads(cap, idle_cap);
1879 }
1880 #endif
1881 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1882 // GC set the heap_overflow flag. We should throw an exception if we
1883 // can, or shut down otherwise.
1884
1885 // Get the thread to which Ctrl-C is thrown
1886 StgTSO *main_thread = getTopHandlerThread();
1887 if (main_thread == NULL) {
1888 // GC set the heap_overflow flag, and there is no main thread to
1889 // throw an exception to, so we should proceed with an orderly
1890 // shutdown now. Ultimately we want the main thread to return to
1891 // its caller with HeapExhausted, at which point the caller should
1892 // call hs_exit(). The first step is to delete all the threads.
1893 sched_state = SCHED_INTERRUPTING;
1894 goto delete_threads_and_gc;
1895 }
1896
1897 heap_overflow = false;
1898 const uint64_t allocation_count = getAllocations();
1899 if (RtsFlags.GcFlags.heapLimitGrace <
1900 allocation_count - allocated_bytes_at_heapoverflow ||
1901 allocated_bytes_at_heapoverflow == 0) {
1902 allocated_bytes_at_heapoverflow = allocation_count;
1903 // We used to simply exit, but throwing an exception gives the
1904 // program a chance to clean up. It also lets the exception be
1905 // caught.
1906
1907 // FIXME this is not a good way to tell a program to release
1908 // resources. It is neither reliable (the RTS crashes if it fails
1909 // to allocate memory from the OS) nor very usable (it is always
1910 // thrown to the main thread, which might not be able to do anything
1911 // useful with it). We really should have a more general way to
1912 // release resources in low-memory conditions. Nevertheless, this
1913 // is still a big improvement over just exiting.
1914
1915 // FIXME again: perhaps we should throw a synchronous exception
1916 // instead an asynchronous one, or have a way for the program to
1917 // register a handler to be called when heap overflow happens.
1918 throwToSelf(cap, main_thread, heapOverflow_closure);
1919 }
1920 }
1921
1922 #if defined(THREADED_RTS)
1923 stgFree(idle_cap);
1924
1925 if (gc_type == SYNC_GC_SEQ) {
1926 // release our stash of capabilities.
1927 releaseAllCapabilities(n_capabilities, cap, task);
1928 }
1929 #endif
1930
1931 return;
1932 }
1933
1934 /* ---------------------------------------------------------------------------
1935 * Singleton fork(). Do not copy any running threads.
1936 * ------------------------------------------------------------------------- */
1937
1938 pid_t
1939 forkProcess(HsStablePtr *entry
1940 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1941 STG_UNUSED
1942 #endif
1943 )
1944 {
1945 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1946 pid_t pid;
1947 StgTSO* t,*next;
1948 Capability *cap;
1949 uint32_t g;
1950 Task *task = NULL;
1951 uint32_t i;
1952
1953 debugTrace(DEBUG_sched, "forking!");
1954
1955 task = newBoundTask();
1956
1957 cap = NULL;
1958 waitForCapability(&cap, task);
1959
1960 #if defined(THREADED_RTS)
1961 stopAllCapabilities(&cap, task);
1962 #endif
1963
1964 // no funny business: hold locks while we fork, otherwise if some
1965 // other thread is holding a lock when the fork happens, the data
1966 // structure protected by the lock will forever be in an
1967 // inconsistent state in the child. See also #1391.
1968 ACQUIRE_LOCK(&sched_mutex);
1969 ACQUIRE_LOCK(&sm_mutex);
1970 ACQUIRE_LOCK(&stable_mutex);
1971 ACQUIRE_LOCK(&task->lock);
1972
1973 for (i=0; i < n_capabilities; i++) {
1974 ACQUIRE_LOCK(&capabilities[i]->lock);
1975 }
1976
1977 #if defined(THREADED_RTS)
1978 ACQUIRE_LOCK(&all_tasks_mutex);
1979 #endif
1980
1981 stopTimer(); // See #4074
1982
1983 #if defined(TRACING)
1984 flushEventLog(); // so that child won't inherit dirty file buffers
1985 #endif
1986
1987 pid = fork();
1988
1989 if (pid) { // parent
1990
1991 startTimer(); // #4074
1992
1993 RELEASE_LOCK(&sched_mutex);
1994 RELEASE_LOCK(&sm_mutex);
1995 RELEASE_LOCK(&stable_mutex);
1996 RELEASE_LOCK(&task->lock);
1997
1998 #if defined(THREADED_RTS)
1999 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2000 RELEASE_LOCK(&all_tasks_mutex);
2001 #endif
2002
2003 for (i=0; i < n_capabilities; i++) {
2004 releaseCapability_(capabilities[i],false);
2005 RELEASE_LOCK(&capabilities[i]->lock);
2006 }
2007
2008 boundTaskExiting(task);
2009
2010 // just return the pid
2011 return pid;
2012
2013 } else { // child
2014
2015 #if defined(THREADED_RTS)
2016 initMutex(&sched_mutex);
2017 initMutex(&sm_mutex);
2018 initMutex(&stable_mutex);
2019 initMutex(&task->lock);
2020
2021 for (i=0; i < n_capabilities; i++) {
2022 initMutex(&capabilities[i]->lock);
2023 }
2024
2025 initMutex(&all_tasks_mutex);
2026 #endif
2027
2028 #if defined(TRACING)
2029 resetTracing();
2030 #endif
2031
2032 // Now, all OS threads except the thread that forked are
2033 // stopped. We need to stop all Haskell threads, including
2034 // those involved in foreign calls. Also we need to delete
2035 // all Tasks, because they correspond to OS threads that are
2036 // now gone.
2037
2038 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2039 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2040 next = t->global_link;
2041 // don't allow threads to catch the ThreadKilled
2042 // exception, but we do want to raiseAsync() because these
2043 // threads may be evaluating thunks that we need later.
2044 deleteThread_(t->cap,t);
2045
2046 // stop the GC from updating the InCall to point to
2047 // the TSO. This is only necessary because the
2048 // OSThread bound to the TSO has been killed, and
2049 // won't get a chance to exit in the usual way (see
2050 // also scheduleHandleThreadFinished).
2051 t->bound = NULL;
2052 }
2053 }
2054
2055 discardTasksExcept(task);
2056
2057 for (i=0; i < n_capabilities; i++) {
2058 cap = capabilities[i];
2059
2060 // Empty the run queue. It seems tempting to let all the
2061 // killed threads stay on the run queue as zombies to be
2062 // cleaned up later, but some of them may correspond to
2063 // bound threads for which the corresponding Task does not
2064 // exist.
2065 truncateRunQueue(cap);
2066 cap->n_run_queue = 0;
2067
2068 // Any suspended C-calling Tasks are no more, their OS threads
2069 // don't exist now:
2070 cap->suspended_ccalls = NULL;
2071 cap->n_suspended_ccalls = 0;
2072
2073 #if defined(THREADED_RTS)
2074 // Wipe our spare workers list, they no longer exist. New
2075 // workers will be created if necessary.
2076 cap->spare_workers = NULL;
2077 cap->n_spare_workers = 0;
2078 cap->returning_tasks_hd = NULL;
2079 cap->returning_tasks_tl = NULL;
2080 cap->n_returning_tasks = 0;
2081 #endif
2082
2083 // Release all caps except 0, we'll use that for starting
2084 // the IO manager and running the client action below.
2085 if (cap->no != 0) {
2086 task->cap = cap;
2087 releaseCapability(cap);
2088 }
2089 }
2090 cap = capabilities[0];
2091 task->cap = cap;
2092
2093 // Empty the threads lists. Otherwise, the garbage
2094 // collector may attempt to resurrect some of these threads.
2095 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2096 generations[g].threads = END_TSO_QUEUE;
2097 }
2098
2099 // On Unix, all timers are reset in the child, so we need to start
2100 // the timer again.
2101 initTimer();
2102 startTimer();
2103
2104 // TODO: need to trace various other things in the child
2105 // like startup event, capabilities, process info etc
2106 traceTaskCreate(task, cap);
2107
2108 #if defined(THREADED_RTS)
2109 ioManagerStartCap(&cap);
2110 #endif
2111
2112 // Install toplevel exception handlers, so interruption
2113 // signal will be sent to the main thread.
2114 // See Trac #12903
2115 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2116 rts_checkSchedStatus("forkProcess",cap);
2117
2118 rts_unlock(cap);
2119 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2120 }
2121 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2122 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2123 #endif
2124 }
2125
2126 /* ---------------------------------------------------------------------------
2127 * Changing the number of Capabilities
2128 *
2129 * Changing the number of Capabilities is very tricky! We can only do
2130 * it with the system fully stopped, so we do a full sync with
2131 * requestSync(SYNC_OTHER) and grab all the capabilities.
2132 *
2133 * Then we resize the appropriate data structures, and update all
2134 * references to the old data structures which have now moved.
2135 * Finally we release the Capabilities we are holding, and start
2136 * worker Tasks on the new Capabilities we created.
2137 *
2138 * ------------------------------------------------------------------------- */
2139
2140 void
2141 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2142 {
2143 #if !defined(THREADED_RTS)
2144 if (new_n_capabilities != 1) {
2145 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2146 }
2147 return;
2148 #elif defined(NOSMP)
2149 if (new_n_capabilities != 1) {
2150 errorBelch("setNumCapabilities: not supported on this platform");
2151 }
2152 return;
2153 #else
2154 Task *task;
2155 Capability *cap;
2156 uint32_t n;
2157 Capability *old_capabilities = NULL;
2158 uint32_t old_n_capabilities = n_capabilities;
2159
2160 if (new_n_capabilities == enabled_capabilities) {
2161 return;
2162 } else if (new_n_capabilities <= 0) {
2163 errorBelch("setNumCapabilities: Capability count must be positive");
2164 return;
2165 }
2166
2167
2168 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2169 enabled_capabilities, new_n_capabilities);
2170
2171 cap = rts_lock();
2172 task = cap->running_task;
2173
2174 stopAllCapabilities(&cap, task);
2175
2176 if (new_n_capabilities < enabled_capabilities)
2177 {
2178 // Reducing the number of capabilities: we do not actually
2179 // remove the extra capabilities, we just mark them as
2180 // "disabled". This has the following effects:
2181 //
2182 // - threads on a disabled capability are migrated away by the
2183 // scheduler loop
2184 //
2185 // - disabled capabilities do not participate in GC
2186 // (see scheduleDoGC())
2187 //
2188 // - No spark threads are created on this capability
2189 // (see scheduleActivateSpark())
2190 //
2191 // - We do not attempt to migrate threads *to* a disabled
2192 // capability (see schedulePushWork()).
2193 //
2194 // but in other respects, a disabled capability remains
2195 // alive. Threads may be woken up on a disabled capability,
2196 // but they will be immediately migrated away.
2197 //
2198 // This approach is much easier than trying to actually remove
2199 // the capability; we don't have to worry about GC data
2200 // structures, the nursery, etc.
2201 //
2202 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2203 capabilities[n]->disabled = true;
2204 traceCapDisable(capabilities[n]);
2205 }
2206 enabled_capabilities = new_n_capabilities;
2207 }
2208 else
2209 {
2210 // Increasing the number of enabled capabilities.
2211 //
2212 // enable any disabled capabilities, up to the required number
2213 for (n = enabled_capabilities;
2214 n < new_n_capabilities && n < n_capabilities; n++) {
2215 capabilities[n]->disabled = false;
2216 traceCapEnable(capabilities[n]);
2217 }
2218 enabled_capabilities = n;
2219
2220 if (new_n_capabilities > n_capabilities) {
2221 #if defined(TRACING)
2222 // Allocate eventlog buffers for the new capabilities. Note this
2223 // must be done before calling moreCapabilities(), because that
2224 // will emit events about creating the new capabilities and adding
2225 // them to existing capsets.
2226 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2227 #endif
2228
2229 // Resize the capabilities array
2230 // NB. after this, capabilities points somewhere new. Any pointers
2231 // of type (Capability *) are now invalid.
2232 moreCapabilities(n_capabilities, new_n_capabilities);
2233
2234 // Resize and update storage manager data structures
2235 storageAddCapabilities(n_capabilities, new_n_capabilities);
2236 }
2237 }
2238
2239 // update n_capabilities before things start running
2240 if (new_n_capabilities > n_capabilities) {
2241 n_capabilities = enabled_capabilities = new_n_capabilities;
2242 }
2243
2244 // We're done: release the original Capabilities
2245 releaseAllCapabilities(old_n_capabilities, cap,task);
2246
2247 // We can't free the old array until now, because we access it
2248 // while updating pointers in updateCapabilityRefs().
2249 if (old_capabilities) {
2250 stgFree(old_capabilities);
2251 }
2252
2253 // Notify IO manager that the number of capabilities has changed.
2254 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2255
2256 rts_unlock(cap);
2257
2258 #endif // THREADED_RTS
2259 }
2260
2261
2262
2263 /* ---------------------------------------------------------------------------
2264 * Delete all the threads in the system
2265 * ------------------------------------------------------------------------- */
2266
2267 static void
2268 deleteAllThreads ( Capability *cap )
2269 {
2270 // NOTE: only safe to call if we own all capabilities.
2271
2272 StgTSO* t, *next;
2273 uint32_t g;
2274
2275 debugTrace(DEBUG_sched,"deleting all threads");
2276 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2277 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2278 next = t->global_link;
2279 deleteThread(cap,t);
2280 }
2281 }
2282
2283 // The run queue now contains a bunch of ThreadKilled threads. We
2284 // must not throw these away: the main thread(s) will be in there
2285 // somewhere, and the main scheduler loop has to deal with it.
2286 // Also, the run queue is the only thing keeping these threads from
2287 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2288
2289 #if !defined(THREADED_RTS)
2290 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2291 ASSERT(sleeping_queue == END_TSO_QUEUE);
2292 #endif
2293 }
2294
2295 /* -----------------------------------------------------------------------------
2296 Managing the suspended_ccalls list.
2297 Locks required: sched_mutex
2298 -------------------------------------------------------------------------- */
2299
2300 STATIC_INLINE void
2301 suspendTask (Capability *cap, Task *task)
2302 {
2303 InCall *incall;
2304
2305 incall = task->incall;
2306 ASSERT(incall->next == NULL && incall->prev == NULL);
2307 incall->next = cap->suspended_ccalls;
2308 incall->prev = NULL;
2309 if (cap->suspended_ccalls) {
2310 cap->suspended_ccalls->prev = incall;
2311 }
2312 cap->suspended_ccalls = incall;
2313 cap->n_suspended_ccalls++;
2314 }
2315
2316 STATIC_INLINE void
2317 recoverSuspendedTask (Capability *cap, Task *task)
2318 {
2319 InCall *incall;
2320
2321 incall = task->incall;
2322 if (incall->prev) {
2323 incall->prev->next = incall->next;
2324 } else {
2325 ASSERT(cap->suspended_ccalls == incall);
2326 cap->suspended_ccalls = incall->next;
2327 }
2328 if (incall->next) {
2329 incall->next->prev = incall->prev;
2330 }
2331 incall->next = incall->prev = NULL;
2332 cap->n_suspended_ccalls--;
2333 }
2334
2335 /* ---------------------------------------------------------------------------
2336 * Suspending & resuming Haskell threads.
2337 *
2338 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2339 * its capability before calling the C function. This allows another
2340 * task to pick up the capability and carry on running Haskell
2341 * threads. It also means that if the C call blocks, it won't lock
2342 * the whole system.
2343 *
2344 * The Haskell thread making the C call is put to sleep for the
2345 * duration of the call, on the suspended_ccalling_threads queue. We
2346 * give out a token to the task, which it can use to resume the thread
2347 * on return from the C function.
2348 *
2349 * If this is an interruptible C call, this means that the FFI call may be
2350 * unceremoniously terminated and should be scheduled on an
2351 * unbound worker thread.
2352 * ------------------------------------------------------------------------- */
2353
2354 void *
2355 suspendThread (StgRegTable *reg, bool interruptible)
2356 {
2357 Capability *cap;
2358 int saved_errno;
2359 StgTSO *tso;
2360 Task *task;
2361 #if defined(mingw32_HOST_OS)
2362 StgWord32 saved_winerror;
2363 #endif
2364
2365 saved_errno = errno;
2366 #if defined(mingw32_HOST_OS)
2367 saved_winerror = GetLastError();
2368 #endif
2369
2370 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2371 */
2372 cap = regTableToCapability(reg);
2373
2374 task = cap->running_task;
2375 tso = cap->r.rCurrentTSO;
2376
2377 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2378
2379 // XXX this might not be necessary --SDM
2380 tso->what_next = ThreadRunGHC;
2381
2382 threadPaused(cap,tso);
2383
2384 if (interruptible) {
2385 tso->why_blocked = BlockedOnCCall_Interruptible;
2386 } else {
2387 tso->why_blocked = BlockedOnCCall;
2388 }
2389
2390 // Hand back capability
2391 task->incall->suspended_tso = tso;
2392 task->incall->suspended_cap = cap;
2393
2394 // Otherwise allocate() will write to invalid memory.
2395 cap->r.rCurrentTSO = NULL;
2396
2397 ACQUIRE_LOCK(&cap->lock);
2398
2399 suspendTask(cap,task);
2400 cap->in_haskell = false;
2401 releaseCapability_(cap,false);
2402
2403 RELEASE_LOCK(&cap->lock);
2404
2405 errno = saved_errno;
2406 #if defined(mingw32_HOST_OS)
2407 SetLastError(saved_winerror);
2408 #endif
2409 return task;
2410 }
2411
2412 StgRegTable *
2413 resumeThread (void *task_)
2414 {
2415 StgTSO *tso;
2416 InCall *incall;
2417 Capability *cap;
2418 Task *task = task_;
2419 int saved_errno;
2420 #if defined(mingw32_HOST_OS)
2421 StgWord32 saved_winerror;
2422 #endif
2423
2424 saved_errno = errno;
2425 #if defined(mingw32_HOST_OS)
2426 saved_winerror = GetLastError();
2427 #endif
2428
2429 incall = task->incall;
2430 cap = incall->suspended_cap;
2431 task->cap = cap;
2432
2433 // Wait for permission to re-enter the RTS with the result.
2434 waitForCapability(&cap,task);
2435 // we might be on a different capability now... but if so, our
2436 // entry on the suspended_ccalls list will also have been
2437 // migrated.
2438
2439 // Remove the thread from the suspended list
2440 recoverSuspendedTask(cap,task);
2441
2442 tso = incall->suspended_tso;
2443 incall->suspended_tso = NULL;
2444 incall->suspended_cap = NULL;
2445 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2446
2447 traceEventRunThread(cap, tso);
2448
2449 /* Reset blocking status */
2450 tso->why_blocked = NotBlocked;
2451
2452 if ((tso->flags & TSO_BLOCKEX) == 0) {
2453 // avoid locking the TSO if we don't have to
2454 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2455 maybePerformBlockedException(cap,tso);
2456 }
2457 }
2458
2459 cap->r.rCurrentTSO = tso;
2460 cap->in_haskell = true;
2461 errno = saved_errno;
2462 #if defined(mingw32_HOST_OS)
2463 SetLastError(saved_winerror);
2464 #endif
2465
2466 /* We might have GC'd, mark the TSO dirty again */
2467 dirty_TSO(cap,tso);
2468 dirty_STACK(cap,tso->stackobj);
2469
2470 IF_DEBUG(sanity, checkTSO(tso));
2471
2472 return &cap->r;
2473 }
2474
2475 /* ---------------------------------------------------------------------------
2476 * scheduleThread()
2477 *
2478 * scheduleThread puts a thread on the end of the runnable queue.
2479 * This will usually be done immediately after a thread is created.
2480 * The caller of scheduleThread must create the thread using e.g.
2481 * createThread and push an appropriate closure
2482 * on this thread's stack before the scheduler is invoked.
2483 * ------------------------------------------------------------------------ */
2484
2485 void
2486 scheduleThread(Capability *cap, StgTSO *tso)
2487 {
2488 // The thread goes at the *end* of the run-queue, to avoid possible
2489 // starvation of any threads already on the queue.
2490 appendToRunQueue(cap,tso);
2491 }
2492
2493 void
2494 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2495 {
2496 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2497 // move this thread from now on.
2498 #if defined(THREADED_RTS)
2499 cpu %= enabled_capabilities;
2500 if (cpu == cap->no) {
2501 appendToRunQueue(cap,tso);
2502 } else {
2503 migrateThread(cap, tso, capabilities[cpu]);
2504 }
2505 #else
2506 appendToRunQueue(cap,tso);
2507 #endif
2508 }
2509
2510 void
2511 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2512 {
2513 Task *task;
2514 DEBUG_ONLY( StgThreadID id );
2515 Capability *cap;
2516
2517 cap = *pcap;
2518
2519 // We already created/initialised the Task
2520 task = cap->running_task;
2521
2522 // This TSO is now a bound thread; make the Task and TSO
2523 // point to each other.
2524 tso->bound = task->incall;
2525 tso->cap = cap;
2526
2527 task->incall->tso = tso;
2528 task->incall->ret = ret;
2529 task->incall->rstat = NoStatus;
2530
2531 appendToRunQueue(cap,tso);
2532
2533 DEBUG_ONLY( id = tso->id );
2534 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2535
2536 cap = schedule(cap,task);
2537
2538 ASSERT(task->incall->rstat != NoStatus);
2539 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2540
2541 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2542 *pcap = cap;
2543 }
2544
2545 /* ----------------------------------------------------------------------------
2546 * Starting Tasks
2547 * ------------------------------------------------------------------------- */
2548
2549 #if defined(THREADED_RTS)
2550 void scheduleWorker (Capability *cap, Task *task)
2551 {
2552 // schedule() runs without a lock.
2553 cap = schedule(cap,task);
2554
2555 // On exit from schedule(), we have a Capability, but possibly not
2556 // the same one we started with.
2557
2558 // During shutdown, the requirement is that after all the
2559 // Capabilities are shut down, all workers that are shutting down
2560 // have finished workerTaskStop(). This is why we hold on to
2561 // cap->lock until we've finished workerTaskStop() below.
2562 //
2563 // There may be workers still involved in foreign calls; those
2564 // will just block in waitForCapability() because the
2565 // Capability has been shut down.
2566 //
2567 ACQUIRE_LOCK(&cap->lock);
2568 releaseCapability_(cap,false);
2569 workerTaskStop(task);
2570 RELEASE_LOCK(&cap->lock);
2571 }
2572 #endif
2573
2574 /* ---------------------------------------------------------------------------
2575 * Start new worker tasks on Capabilities from--to
2576 * -------------------------------------------------------------------------- */
2577
2578 static void
2579 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2580 {
2581 #if defined(THREADED_RTS)
2582 uint32_t i;
2583 Capability *cap;
2584
2585 for (i = from; i < to; i++) {
2586 cap = capabilities[i];
2587 ACQUIRE_LOCK(&cap->lock);
2588 startWorkerTask(cap);
2589 RELEASE_LOCK(&cap->lock);
2590 }
2591 #endif
2592 }
2593
2594 /* ---------------------------------------------------------------------------
2595 * initScheduler()
2596 *
2597 * Initialise the scheduler. This resets all the queues - if the
2598 * queues contained any threads, they'll be garbage collected at the
2599 * next pass.
2600 *
2601 * ------------------------------------------------------------------------ */
2602
2603 void
2604 initScheduler(void)
2605 {
2606 #if !defined(THREADED_RTS)
2607 blocked_queue_hd = END_TSO_QUEUE;
2608 blocked_queue_tl = END_TSO_QUEUE;
2609 sleeping_queue = END_TSO_QUEUE;
2610 #endif
2611
2612 sched_state = SCHED_RUNNING;
2613 recent_activity = ACTIVITY_YES;
2614
2615 #if defined(THREADED_RTS)
2616 /* Initialise the mutex and condition variables used by
2617 * the scheduler. */
2618 initMutex(&sched_mutex);
2619 #endif
2620
2621 ACQUIRE_LOCK(&sched_mutex);
2622
2623 allocated_bytes_at_heapoverflow = 0;
2624
2625 /* A capability holds the state a native thread needs in
2626 * order to execute STG code. At least one capability is
2627 * floating around (only THREADED_RTS builds have more than one).
2628 */
2629 initCapabilities();
2630
2631 initTaskManager();
2632
2633 /*
2634 * Eagerly start one worker to run each Capability, except for
2635 * Capability 0. The idea is that we're probably going to start a
2636 * bound thread on Capability 0 pretty soon, so we don't want a
2637 * worker task hogging it.
2638 */
2639 startWorkerTasks(1, n_capabilities);
2640
2641 RELEASE_LOCK(&sched_mutex);
2642
2643 }
2644
2645 void
2646 exitScheduler (bool wait_foreign USED_IF_THREADS)
2647 /* see Capability.c, shutdownCapability() */
2648 {
2649 Task *task = NULL;
2650
2651 task = newBoundTask();
2652
2653 // If we haven't killed all the threads yet, do it now.
2654 if (sched_state < SCHED_SHUTTING_DOWN) {
2655 sched_state = SCHED_INTERRUPTING;
2656 Capability *cap = task->cap;
2657 waitForCapability(&cap,task);
2658 scheduleDoGC(&cap,task,true);
2659 ASSERT(task->incall->tso == NULL);
2660 releaseCapability(cap);
2661 }
2662 sched_state = SCHED_SHUTTING_DOWN;
2663
2664 shutdownCapabilities(task, wait_foreign);
2665
2666 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2667 // n_failed_trygrab_idles, n_idle_caps);
2668
2669 boundTaskExiting(task);
2670 }
2671
2672 void
2673 freeScheduler( void )
2674 {
2675 uint32_t still_running;
2676
2677 ACQUIRE_LOCK(&sched_mutex);
2678 still_running = freeTaskManager();
2679 // We can only free the Capabilities if there are no Tasks still
2680 // running. We might have a Task about to return from a foreign
2681 // call into waitForCapability(), for example (actually,
2682 // this should be the *only* thing that a still-running Task can
2683 // do at this point, and it will block waiting for the
2684 // Capability).
2685 if (still_running == 0) {
2686 freeCapabilities();
2687 }
2688 RELEASE_LOCK(&sched_mutex);
2689 #if defined(THREADED_RTS)
2690 closeMutex(&sched_mutex);
2691 #endif
2692 }
2693
2694 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2695 void *user USED_IF_NOT_THREADS)
2696 {
2697 #if !defined(THREADED_RTS)
2698 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2699 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2700 evac(user, (StgClosure **)(void *)&sleeping_queue);
2701 #endif
2702 }
2703
2704 /* -----------------------------------------------------------------------------
2705 performGC
2706
2707 This is the interface to the garbage collector from Haskell land.
2708 We provide this so that external C code can allocate and garbage
2709 collect when called from Haskell via _ccall_GC.
2710 -------------------------------------------------------------------------- */
2711
2712 static void
2713 performGC_(bool force_major)
2714 {
2715 Task *task;
2716 Capability *cap = NULL;
2717
2718 // We must grab a new Task here, because the existing Task may be
2719 // associated with a particular Capability, and chained onto the
2720 // suspended_ccalls queue.
2721 task = newBoundTask();
2722
2723 // TODO: do we need to traceTask*() here?
2724
2725 waitForCapability(&cap,task);
2726 scheduleDoGC(&cap,task,force_major);
2727 releaseCapability(cap);
2728 boundTaskExiting(task);
2729 }
2730
2731 void
2732 performGC(void)
2733 {
2734 performGC_(false);
2735 }
2736
2737 void
2738 performMajorGC(void)
2739 {
2740 performGC_(true);
2741 }
2742
2743 /* ---------------------------------------------------------------------------
2744 Interrupt execution.
2745 Might be called inside a signal handler so it mustn't do anything fancy.
2746 ------------------------------------------------------------------------ */
2747
2748 void
2749 interruptStgRts(void)
2750 {
2751 sched_state = SCHED_INTERRUPTING;
2752 interruptAllCapabilities();
2753 #if defined(THREADED_RTS)
2754 wakeUpRts();
2755 #endif
2756 }
2757
2758 /* -----------------------------------------------------------------------------
2759 Wake up the RTS
2760
2761 This function causes at least one OS thread to wake up and run the
2762 scheduler loop. It is invoked when the RTS might be deadlocked, or
2763 an external event has arrived that may need servicing (eg. a
2764 keyboard interrupt).
2765
2766 In the single-threaded RTS we don't do anything here; we only have
2767 one thread anyway, and the event that caused us to want to wake up
2768 will have interrupted any blocking system call in progress anyway.
2769 -------------------------------------------------------------------------- */
2770
2771 #if defined(THREADED_RTS)
2772 void wakeUpRts(void)
2773 {
2774 // This forces the IO Manager thread to wakeup, which will
2775 // in turn ensure that some OS thread wakes up and runs the
2776 // scheduler loop, which will cause a GC and deadlock check.
2777 ioManagerWakeup();
2778 }
2779 #endif
2780
2781 /* -----------------------------------------------------------------------------
2782 Deleting threads
2783
2784 This is used for interruption (^C) and forking, and corresponds to
2785 raising an exception but without letting the thread catch the
2786 exception.
2787 -------------------------------------------------------------------------- */
2788
2789 static void
2790 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2791 {
2792 // NOTE: must only be called on a TSO that we have exclusive
2793 // access to, because we will call throwToSingleThreaded() below.
2794 // The TSO must be on the run queue of the Capability we own, or
2795 // we must own all Capabilities.
2796
2797 if (tso->why_blocked != BlockedOnCCall &&
2798 tso->why_blocked != BlockedOnCCall_Interruptible) {
2799 throwToSingleThreaded(tso->cap,tso,NULL);
2800 }
2801 }
2802
2803 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2804 static void
2805 deleteThread_(Capability *cap, StgTSO *tso)
2806 { // for forkProcess only:
2807 // like deleteThread(), but we delete threads in foreign calls, too.
2808
2809 if (tso->why_blocked == BlockedOnCCall ||
2810 tso->why_blocked == BlockedOnCCall_Interruptible) {
2811 tso->what_next = ThreadKilled;
2812 appendToRunQueue(tso->cap, tso);
2813 } else {
2814 deleteThread(cap,tso);
2815 }
2816 }
2817 #endif
2818
2819 /* -----------------------------------------------------------------------------
2820 raiseExceptionHelper
2821
2822 This function is called by the raise# primitive, just so that we can
2823 move some of the tricky bits of raising an exception from C-- into
2824 C. Who knows, it might be a useful re-useable thing here too.
2825 -------------------------------------------------------------------------- */
2826
2827 StgWord
2828 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2829 {
2830 Capability *cap = regTableToCapability(reg);
2831 StgThunk *raise_closure = NULL;
2832 StgPtr p, next;
2833 const StgRetInfoTable *info;
2834 //
2835 // This closure represents the expression 'raise# E' where E
2836 // is the exception raise. It is used to overwrite all the
2837 // thunks which are currently under evaluation.
2838 //
2839
2840 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2841 // LDV profiling: stg_raise_info has THUNK as its closure
2842 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2843 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2844 // 1 does not cause any problem unless profiling is performed.
2845 // However, when LDV profiling goes on, we need to linearly scan
2846 // small object pool, where raise_closure is stored, so we should
2847 // use MIN_UPD_SIZE.
2848 //
2849 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2850 // sizeofW(StgClosure)+1);
2851 //
2852
2853 //
2854 // Walk up the stack, looking for the catch frame. On the way,
2855 // we update any closures pointed to from update frames with the
2856 // raise closure that we just built.
2857 //
2858 p = tso->stackobj->sp;
2859 while(1) {
2860 info = get_ret_itbl((StgClosure *)p);
2861 next = p + stack_frame_sizeW((StgClosure *)p);
2862 switch (info->i.type) {
2863
2864 case UPDATE_FRAME:
2865 // Only create raise_closure if we need to.
2866 if (raise_closure == NULL) {
2867 raise_closure =
2868 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2869 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2870 raise_closure->payload[0] = exception;
2871 }
2872 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2873 (StgClosure *)raise_closure);
2874 p = next;
2875 continue;
2876
2877 case ATOMICALLY_FRAME:
2878 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2879 tso->stackobj->sp = p;
2880 return ATOMICALLY_FRAME;
2881
2882 case CATCH_FRAME:
2883 tso->stackobj->sp = p;
2884 return CATCH_FRAME;
2885
2886 case CATCH_STM_FRAME:
2887 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2888 tso->stackobj->sp = p;
2889 return CATCH_STM_FRAME;
2890
2891 case UNDERFLOW_FRAME:
2892 tso->stackobj->sp = p;
2893 threadStackUnderflow(cap,tso);
2894 p = tso->stackobj->sp;
2895 continue;
2896
2897 case STOP_FRAME:
2898 tso->stackobj->sp = p;
2899 return STOP_FRAME;
2900
2901 case CATCH_RETRY_FRAME: {
2902 StgTRecHeader *trec = tso -> trec;
2903 StgTRecHeader *outer = trec -> enclosing_trec;
2904 debugTrace(DEBUG_stm,
2905 "found CATCH_RETRY_FRAME at %p during raise", p);
2906 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2907 stmAbortTransaction(cap, trec);
2908 stmFreeAbortedTRec(cap, trec);
2909 tso -> trec = outer;
2910 p = next;
2911 continue;
2912 }
2913
2914 default:
2915 p = next;
2916 continue;
2917 }
2918 }
2919 }
2920
2921
2922 /* -----------------------------------------------------------------------------
2923 findRetryFrameHelper
2924
2925 This function is called by the retry# primitive. It traverses the stack
2926 leaving tso->sp referring to the frame which should handle the retry.
2927
2928 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2929 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2930
2931 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2932 create) because retries are not considered to be exceptions, despite the
2933 similar implementation.
2934
2935 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2936 not be created within memory transactions.
2937 -------------------------------------------------------------------------- */
2938
2939 StgWord
2940 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2941 {
2942 const StgRetInfoTable *info;
2943 StgPtr p, next;
2944
2945 p = tso->stackobj->sp;
2946 while (1) {
2947 info = get_ret_itbl((const StgClosure *)p);
2948 next = p + stack_frame_sizeW((StgClosure *)p);
2949 switch (info->i.type) {
2950
2951 case ATOMICALLY_FRAME:
2952 debugTrace(DEBUG_stm,
2953 "found ATOMICALLY_FRAME at %p during retry", p);
2954 tso->stackobj->sp = p;
2955 return ATOMICALLY_FRAME;
2956
2957 case CATCH_RETRY_FRAME:
2958 debugTrace(DEBUG_stm,
2959 "found CATCH_RETRY_FRAME at %p during retry", p);
2960 tso->stackobj->sp = p;
2961 return CATCH_RETRY_FRAME;
2962
2963 case CATCH_STM_FRAME: {
2964 StgTRecHeader *trec = tso -> trec;
2965 StgTRecHeader *outer = trec -> enclosing_trec;
2966 debugTrace(DEBUG_stm,
2967 "found CATCH_STM_FRAME at %p during retry", p);
2968 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2969 stmAbortTransaction(cap, trec);
2970 stmFreeAbortedTRec(cap, trec);
2971 tso -> trec = outer;
2972 p = next;
2973 continue;
2974 }
2975
2976 case UNDERFLOW_FRAME:
2977 tso->stackobj->sp = p;
2978 threadStackUnderflow(cap,tso);
2979 p = tso->stackobj->sp;
2980 continue;
2981
2982 default:
2983 ASSERT(info->i.type != CATCH_FRAME);
2984 ASSERT(info->i.type != STOP_FRAME);
2985 p = next;
2986 continue;
2987 }
2988 }
2989 }
2990
2991 /* -----------------------------------------------------------------------------
2992 resurrectThreads is called after garbage collection on the list of
2993 threads found to be garbage. Each of these threads will be woken
2994 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2995 on an MVar, or NonTermination if the thread was blocked on a Black
2996 Hole.
2997
2998 Locks: assumes we hold *all* the capabilities.
2999 -------------------------------------------------------------------------- */
3000
3001 void
3002 resurrectThreads (StgTSO *threads)
3003 {
3004 StgTSO *tso, *next;
3005 Capability *cap;
3006 generation *gen;
3007
3008 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3009 next = tso->global_link;
3010
3011 gen = Bdescr((P_)tso)->gen;
3012 tso->global_link = gen->threads;
3013 gen->threads = tso;
3014
3015 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3016
3017 // Wake up the thread on the Capability it was last on
3018 cap = tso->cap;
3019
3020 switch (tso->why_blocked) {
3021 case BlockedOnMVar:
3022 case BlockedOnMVarRead:
3023 /* Called by GC - sched_mutex lock is currently held. */
3024 throwToSingleThreaded(cap, tso,
3025 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3026 break;
3027 case BlockedOnBlackHole:
3028 throwToSingleThreaded(cap, tso,
3029 (StgClosure *)nonTermination_closure);
3030 break;
3031 case BlockedOnSTM:
3032 throwToSingleThreaded(cap, tso,
3033 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3034 break;
3035 case NotBlocked:
3036 /* This might happen if the thread was blocked on a black hole
3037 * belonging to a thread that we've just woken up (raiseAsync
3038 * can wake up threads, remember...).
3039 */
3040 continue;
3041 case BlockedOnMsgThrowTo:
3042 // This can happen if the target is masking, blocks on a
3043 // black hole, and then is found to be unreachable. In
3044 // this case, we want to let the target wake up and carry
3045 // on, and do nothing to this thread.
3046 continue;
3047 default:
3048 barf("resurrectThreads: thread blocked in a strange way: %d",
3049 tso->why_blocked);
3050 }
3051 }
3052 }