296461b38db522a0d21d3111542f4ee890725eb4
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #ifdef HAVE_SYS_TYPES_H
48 #include <sys/types.h>
49 #endif
50 #ifdef HAVE_UNISTD_H
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #ifdef HAVE_ERRNO_H
59 #include <errno.h>
60 #endif
61
62 #ifdef TRACING
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %d", sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #ifdef THREADED_RTS
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #ifdef mingw32_HOST_OS
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #ifndef PROFILING
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #ifdef mingw32_HOST_OS
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 didGcLast = yieldCapability(&cap,task, !didGcLast);
683 }
684 while (shouldYieldCapability(cap,task,didGcLast));
685
686 // note there may still be no threads on the run queue at this
687 // point, the caller has to check.
688
689 *pcap = cap;
690 return;
691 }
692 #endif
693
694 /* -----------------------------------------------------------------------------
695 * schedulePushWork()
696 *
697 * Push work to other Capabilities if we have some.
698 * -------------------------------------------------------------------------- */
699
700 static void
701 schedulePushWork(Capability *cap USED_IF_THREADS,
702 Task *task USED_IF_THREADS)
703 {
704 /* following code not for PARALLEL_HASKELL. I kept the call general,
705 future GUM versions might use pushing in a distributed setup */
706 #if defined(THREADED_RTS)
707
708 Capability *free_caps[n_capabilities], *cap0;
709 uint32_t i, n_wanted_caps, n_free_caps;
710
711 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
712
713 // migration can be turned off with +RTS -qm
714 if (!RtsFlags.ParFlags.migrate) {
715 spare_threads = 0;
716 }
717
718 // Figure out how many capabilities we want to wake up. We need at least
719 // sparkPoolSize(cap) plus the number of spare threads we have.
720 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
721 if (n_wanted_caps == 0) return;
722
723 // First grab as many free Capabilities as we can. ToDo: we should use
724 // capabilities on the same NUMA node preferably, but not exclusively.
725 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
726 n_free_caps < n_wanted_caps && i != cap->no;
727 i = (i + 1) % n_capabilities) {
728 cap0 = capabilities[i];
729 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
730 if (!emptyRunQueue(cap0)
731 || cap0->n_returning_tasks != 0
732 || !emptyInbox(cap0)) {
733 // it already has some work, we just grabbed it at
734 // the wrong moment. Or maybe it's deadlocked!
735 releaseCapability(cap0);
736 } else {
737 free_caps[n_free_caps++] = cap0;
738 }
739 }
740 }
741
742 // We now have n_free_caps free capabilities stashed in
743 // free_caps[]. Attempt to share our run queue equally with them.
744 // This is complicated slightly by the fact that we can't move
745 // some threads:
746 //
747 // - threads that have TSO_LOCKED cannot migrate
748 // - a thread that is bound to the current Task cannot be migrated
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761
762 debugTrace(DEBUG_sched,
763 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
764 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
765 n_free_caps);
766
767 // There are n_free_caps+1 caps in total. We will share the threads
768 // evently between them, *except* that if the run queue does not divide
769 // evenly by n_free_caps+1 then we bias towards the current capability.
770 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
771 uint32_t keep_threads =
772 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
773
774 // This also ensures that we don't give away all our threads, since
775 // (x + y) / (y + 1) >= 1 when x >= 1.
776
777 // The number of threads we have left.
778 uint32_t n = cap->n_run_queue;
779
780 // prev = the previous thread on this cap's run queue
781 prev = END_TSO_QUEUE;
782
783 // We're going to walk through the run queue, migrating threads to other
784 // capabilities until we have only keep_threads left. We might
785 // encounter a thread that cannot be migrated, in which case we add it
786 // to the current run queue and decrement keep_threads.
787 for (t = cap->run_queue_hd, i = 0;
788 t != END_TSO_QUEUE && n > keep_threads;
789 t = next)
790 {
791 next = t->_link;
792 t->_link = END_TSO_QUEUE;
793
794 // Should we keep this thread?
795 if (t->bound == task->incall // don't move my bound thread
796 || tsoLocked(t) // don't move a locked thread
797 ) {
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 if (keep_threads > 0) keep_threads--;
806 }
807
808 // Or migrate it?
809 else {
810 appendToRunQueue(free_caps[i],t);
811 traceEventMigrateThread (cap, t, free_caps[i]->no);
812
813 if (t->bound) { t->bound->task->cap = free_caps[i]; }
814 t->cap = free_caps[i];
815 n--; // we have one fewer threads now
816 i++; // move on to the next free_cap
817 if (i == n_free_caps) i = 0;
818 }
819 }
820
821 // Join up the beginning of the queue (prev)
822 // with the rest of the queue (t)
823 if (t == END_TSO_QUEUE) {
824 cap->run_queue_tl = prev;
825 } else {
826 setTSOPrev(cap, t, prev);
827 }
828 if (prev == END_TSO_QUEUE) {
829 cap->run_queue_hd = t;
830 } else {
831 setTSOLink(cap, prev, t);
832 }
833 cap->n_run_queue = n;
834
835 IF_DEBUG(sanity, checkRunQueue(cap));
836
837 // release the capabilities
838 for (i = 0; i < n_free_caps; i++) {
839 task->cap = free_caps[i];
840 if (sparkPoolSizeCap(cap) > 0) {
841 // If we have sparks to steal, wake up a worker on the
842 // capability, even if it has no threads to run.
843 releaseAndWakeupCapability(free_caps[i]);
844 } else {
845 releaseCapability(free_caps[i]);
846 }
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850
851 #endif /* THREADED_RTS */
852
853 }
854
855 /* ----------------------------------------------------------------------------
856 * Start any pending signal handlers
857 * ------------------------------------------------------------------------- */
858
859 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
860 static void
861 scheduleStartSignalHandlers(Capability *cap)
862 {
863 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
864 // safe outside the lock
865 startSignalHandlers(cap);
866 }
867 }
868 #else
869 static void
870 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
871 {
872 }
873 #endif
874
875 /* ----------------------------------------------------------------------------
876 * Check for blocked threads that can be woken up.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
881 {
882 #if !defined(THREADED_RTS)
883 //
884 // Check whether any waiting threads need to be woken up. If the
885 // run queue is empty, and there are no other tasks running, we
886 // can wait indefinitely for something to happen.
887 //
888 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
889 {
890 awaitEvent (emptyRunQueue(cap));
891 }
892 #endif
893 }
894
895 /* ----------------------------------------------------------------------------
896 * Detect deadlock conditions and attempt to resolve them.
897 * ------------------------------------------------------------------------- */
898
899 static void
900 scheduleDetectDeadlock (Capability **pcap, Task *task)
901 {
902 Capability *cap = *pcap;
903 /*
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
907 * some description.
908 */
909 if ( emptyThreadQueues(cap) )
910 {
911 #if defined(THREADED_RTS)
912 /*
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
917 */
918 if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
922
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
927 // runnable.
928 scheduleDoGC (pcap, task, true/*force major GC*/);
929 cap = *pcap;
930 // when force_major == true. scheduleDoGC sets
931 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 // signal.
933
934 if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
937 /* If we have user-installed signal handlers, then wait
938 * for signals to arrive rather then bombing out with a
939 * deadlock.
940 */
941 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
942 debugTrace(DEBUG_sched,
943 "still deadlocked, waiting for signals...");
944
945 awaitUserSignals();
946
947 if (signals_pending()) {
948 startSignalHandlers(cap);
949 }
950
951 // either we have threads to run, or we were interrupted:
952 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
953
954 return;
955 }
956 #endif
957
958 #if !defined(THREADED_RTS)
959 /* Probably a real deadlock. Send the current main thread the
960 * Deadlock exception.
961 */
962 if (task->incall->tso) {
963 switch (task->incall->tso->why_blocked) {
964 case BlockedOnSTM:
965 case BlockedOnBlackHole:
966 case BlockedOnMsgThrowTo:
967 case BlockedOnMVar:
968 case BlockedOnMVarRead:
969 throwToSingleThreaded(cap, task->incall->tso,
970 (StgClosure *)nonTermination_closure);
971 return;
972 default:
973 barf("deadlock: main thread blocked in a strange way");
974 }
975 }
976 return;
977 #endif
978 }
979 }
980
981
982 /* ----------------------------------------------------------------------------
983 * Process message in the current Capability's inbox
984 * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
988 {
989 #if defined(THREADED_RTS)
990 Message *m, *next;
991 PutMVar *p, *pnext;
992 int r;
993 Capability *cap = *pcap;
994
995 while (!emptyInbox(cap)) {
996 if (cap->r.rCurrentNursery->link == NULL ||
997 g0->n_new_large_words >= large_alloc_lim) {
998 scheduleDoGC(pcap, cap->running_task, false);
999 cap = *pcap;
1000 }
1001
1002 // don't use a blocking acquire; if the lock is held by
1003 // another thread then just carry on. This seems to avoid
1004 // getting stuck in a message ping-pong situation with other
1005 // processors. We'll check the inbox again later anyway.
1006 //
1007 // We should really use a more efficient queue data structure
1008 // here. The trickiness is that we must ensure a Capability
1009 // never goes idle if the inbox is non-empty, which is why we
1010 // use cap->lock (cap->lock is released as the last thing
1011 // before going idle; see Capability.c:releaseCapability()).
1012 r = TRY_ACQUIRE_LOCK(&cap->lock);
1013 if (r != 0) return;
1014
1015 m = cap->inbox;
1016 p = cap->putMVars;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018 cap->putMVars = NULL;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027
1028 while (p != NULL) {
1029 pnext = p->link;
1030 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1031 Unit_closure);
1032 freeStablePtr(p->mvar);
1033 stgFree(p);
1034 p = pnext;
1035 }
1036 }
1037 #endif
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042 * Activate spark threads (THREADED_RTS)
1043 * ------------------------------------------------------------------------- */
1044
1045 #if defined(THREADED_RTS)
1046 static void
1047 scheduleActivateSpark(Capability *cap)
1048 {
1049 if (anySparks() && !cap->disabled)
1050 {
1051 createSparkThread(cap);
1052 debugTrace(DEBUG_sched, "creating a spark thread");
1053 }
1054 }
1055 #endif // THREADED_RTS
1056
1057 /* ----------------------------------------------------------------------------
1058 * After running a thread...
1059 * ------------------------------------------------------------------------- */
1060
1061 static void
1062 schedulePostRunThread (Capability *cap, StgTSO *t)
1063 {
1064 // We have to be able to catch transactions that are in an
1065 // infinite loop as a result of seeing an inconsistent view of
1066 // memory, e.g.
1067 //
1068 // atomically $ do
1069 // [a,b] <- mapM readTVar [ta,tb]
1070 // when (a == b) loop
1071 //
1072 // and a is never equal to b given a consistent view of memory.
1073 //
1074 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1075 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1076 debugTrace(DEBUG_sched | DEBUG_stm,
1077 "trec %p found wasting its time", t);
1078
1079 // strip the stack back to the
1080 // ATOMICALLY_FRAME, aborting the (nested)
1081 // transaction, and saving the stack of any
1082 // partially-evaluated thunks on the heap.
1083 throwToSingleThreaded_(cap, t, NULL, true);
1084
1085 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 }
1087 }
1088
1089 //
1090 // If the current thread's allocation limit has run out, send it
1091 // the AllocationLimitExceeded exception.
1092
1093 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1094 // Use a throwToSelf rather than a throwToSingleThreaded, because
1095 // it correctly handles the case where the thread is currently
1096 // inside mask. Also the thread might be blocked (e.g. on an
1097 // MVar), and throwToSingleThreaded doesn't unblock it
1098 // correctly in that case.
1099 throwToSelf(cap, t, allocationLimitExceeded_closure);
1100 ASSIGN_Int64((W_*)&(t->alloc_limit),
1101 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1102 }
1103
1104 /* some statistics gathering in the parallel case */
1105 }
1106
1107 /* -----------------------------------------------------------------------------
1108 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1109 * -------------------------------------------------------------------------- */
1110
1111 static bool
1112 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1113 {
1114 if (cap->r.rHpLim == NULL || cap->context_switch) {
1115 // Sometimes we miss a context switch, e.g. when calling
1116 // primitives in a tight loop, MAYBE_GC() doesn't check the
1117 // context switch flag, and we end up waiting for a GC.
1118 // See #1984, and concurrent/should_run/1984
1119 cap->context_switch = 0;
1120 appendToRunQueue(cap,t);
1121 } else {
1122 pushOnRunQueue(cap,t);
1123 }
1124
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1127 // if so, get one and push it on the front of the nursery.
1128 bdescr *bd;
1129 W_ blocks;
1130
1131 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1132
1133 if (blocks > BLOCKS_PER_MBLOCK) {
1134 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1135 }
1136
1137 debugTrace(DEBUG_sched,
1138 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1139 (long)t->id, what_next_strs[t->what_next], blocks);
1140
1141 // don't do this if the nursery is (nearly) full, we'll GC first.
1142 if (cap->r.rCurrentNursery->link != NULL ||
1143 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1144 // infinite loop if the
1145 // nursery has only one
1146 // block.
1147
1148 bd = allocGroupOnNode_lock(cap->node,blocks);
1149 cap->r.rNursery->n_blocks += blocks;
1150
1151 // link the new group after CurrentNursery
1152 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1153
1154 // initialise it as a nursery block. We initialise the
1155 // step, gen_no, and flags field of *every* sub-block in
1156 // this large block, because this is easier than making
1157 // sure that we always find the block head of a large
1158 // block whenever we call Bdescr() (eg. evacuate() and
1159 // isAlive() in the GC would both have to do this, at
1160 // least).
1161 {
1162 bdescr *x;
1163 for (x = bd; x < bd + blocks; x++) {
1164 initBdescr(x,g0,g0);
1165 x->free = x->start;
1166 x->flags = 0;
1167 }
1168 }
1169
1170 // This assert can be a killer if the app is doing lots
1171 // of large block allocations.
1172 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1173
1174 // now update the nursery to point to the new block
1175 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1176 cap->r.rCurrentNursery = bd;
1177
1178 // we might be unlucky and have another thread get on the
1179 // run queue before us and steal the large block, but in that
1180 // case the thread will just end up requesting another large
1181 // block.
1182 return false; /* not actually GC'ing */
1183 }
1184 }
1185
1186 // if we got here because we exceeded large_alloc_lim, then
1187 // proceed straight to GC.
1188 if (g0->n_new_large_words >= large_alloc_lim) {
1189 return true;
1190 }
1191
1192 // Otherwise, we just ran out of space in the current nursery.
1193 // Grab another nursery if we can.
1194 if (getNewNursery(cap)) {
1195 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1196 return false;
1197 }
1198
1199 return true;
1200 /* actual GC is done at the end of the while loop in schedule() */
1201 }
1202
1203 /* -----------------------------------------------------------------------------
1204 * Handle a thread that returned to the scheduler with ThreadYielding
1205 * -------------------------------------------------------------------------- */
1206
1207 static bool
1208 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1209 {
1210 /* put the thread back on the run queue. Then, if we're ready to
1211 * GC, check whether this is the last task to stop. If so, wake
1212 * up the GC thread. getThread will block during a GC until the
1213 * GC is finished.
1214 */
1215
1216 ASSERT(t->_link == END_TSO_QUEUE);
1217
1218 // Shortcut if we're just switching evaluators: don't bother
1219 // doing stack squeezing (which can be expensive), just run the
1220 // thread.
1221 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1222 debugTrace(DEBUG_sched,
1223 "--<< thread %ld (%s) stopped to switch evaluators",
1224 (long)t->id, what_next_strs[t->what_next]);
1225 return true;
1226 }
1227
1228 // Reset the context switch flag. We don't do this just before
1229 // running the thread, because that would mean we would lose ticks
1230 // during GC, which can lead to unfair scheduling (a thread hogs
1231 // the CPU because the tick always arrives during GC). This way
1232 // penalises threads that do a lot of allocation, but that seems
1233 // better than the alternative.
1234 if (cap->context_switch != 0) {
1235 cap->context_switch = 0;
1236 appendToRunQueue(cap,t);
1237 } else {
1238 pushOnRunQueue(cap,t);
1239 }
1240
1241 IF_DEBUG(sanity,
1242 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1243 checkTSO(t));
1244
1245 return false;
1246 }
1247
1248 /* -----------------------------------------------------------------------------
1249 * Handle a thread that returned to the scheduler with ThreadBlocked
1250 * -------------------------------------------------------------------------- */
1251
1252 static void
1253 scheduleHandleThreadBlocked( StgTSO *t
1254 #if !defined(DEBUG)
1255 STG_UNUSED
1256 #endif
1257 )
1258 {
1259
1260 // We don't need to do anything. The thread is blocked, and it
1261 // has tidied up its stack and placed itself on whatever queue
1262 // it needs to be on.
1263
1264 // ASSERT(t->why_blocked != NotBlocked);
1265 // Not true: for example,
1266 // - the thread may have woken itself up already, because
1267 // threadPaused() might have raised a blocked throwTo
1268 // exception, see maybePerformBlockedException().
1269
1270 #ifdef DEBUG
1271 traceThreadStatus(DEBUG_sched, t);
1272 #endif
1273 }
1274
1275 /* -----------------------------------------------------------------------------
1276 * Handle a thread that returned to the scheduler with ThreadFinished
1277 * -------------------------------------------------------------------------- */
1278
1279 static bool
1280 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1281 {
1282 /* Need to check whether this was a main thread, and if so,
1283 * return with the return value.
1284 *
1285 * We also end up here if the thread kills itself with an
1286 * uncaught exception, see Exception.cmm.
1287 */
1288
1289 // blocked exceptions can now complete, even if the thread was in
1290 // blocked mode (see #2910).
1291 awakenBlockedExceptionQueue (cap, t);
1292
1293 //
1294 // Check whether the thread that just completed was a bound
1295 // thread, and if so return with the result.
1296 //
1297 // There is an assumption here that all thread completion goes
1298 // through this point; we need to make sure that if a thread
1299 // ends up in the ThreadKilled state, that it stays on the run
1300 // queue so it can be dealt with here.
1301 //
1302
1303 if (t->bound) {
1304
1305 if (t->bound != task->incall) {
1306 #if !defined(THREADED_RTS)
1307 // Must be a bound thread that is not the topmost one. Leave
1308 // it on the run queue until the stack has unwound to the
1309 // point where we can deal with this. Leaving it on the run
1310 // queue also ensures that the garbage collector knows about
1311 // this thread and its return value (it gets dropped from the
1312 // step->threads list so there's no other way to find it).
1313 appendToRunQueue(cap,t);
1314 return false;
1315 #else
1316 // this cannot happen in the threaded RTS, because a
1317 // bound thread can only be run by the appropriate Task.
1318 barf("finished bound thread that isn't mine");
1319 #endif
1320 }
1321
1322 ASSERT(task->incall->tso == t);
1323
1324 if (t->what_next == ThreadComplete) {
1325 if (task->incall->ret) {
1326 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1327 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1328 }
1329 task->incall->rstat = Success;
1330 } else {
1331 if (task->incall->ret) {
1332 *(task->incall->ret) = NULL;
1333 }
1334 if (sched_state >= SCHED_INTERRUPTING) {
1335 if (heap_overflow) {
1336 task->incall->rstat = HeapExhausted;
1337 } else {
1338 task->incall->rstat = Interrupted;
1339 }
1340 } else {
1341 task->incall->rstat = Killed;
1342 }
1343 }
1344 #ifdef DEBUG
1345 removeThreadLabel((StgWord)task->incall->tso->id);
1346 #endif
1347
1348 // We no longer consider this thread and task to be bound to
1349 // each other. The TSO lives on until it is GC'd, but the
1350 // task is about to be released by the caller, and we don't
1351 // want anyone following the pointer from the TSO to the
1352 // defunct task (which might have already been
1353 // re-used). This was a real bug: the GC updated
1354 // tso->bound->tso which lead to a deadlock.
1355 t->bound = NULL;
1356 task->incall->tso = NULL;
1357
1358 return true; // tells schedule() to return
1359 }
1360
1361 return false;
1362 }
1363
1364 /* -----------------------------------------------------------------------------
1365 * Perform a heap census
1366 * -------------------------------------------------------------------------- */
1367
1368 static bool
1369 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1370 {
1371 // When we have +RTS -i0 and we're heap profiling, do a census at
1372 // every GC. This lets us get repeatable runs for debugging.
1373 if (performHeapProfile ||
1374 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1375 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1376 return true;
1377 } else {
1378 return false;
1379 }
1380 }
1381
1382 /* -----------------------------------------------------------------------------
1383 * stopAllCapabilities()
1384 *
1385 * Stop all Haskell execution. This is used when we need to make some global
1386 * change to the system, such as altering the number of capabilities, or
1387 * forking.
1388 *
1389 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1390 * -------------------------------------------------------------------------- */
1391
1392 #if defined(THREADED_RTS)
1393 static void stopAllCapabilities (Capability **pCap, Task *task)
1394 {
1395 bool was_syncing;
1396 SyncType prev_sync_type;
1397
1398 PendingSync sync = {
1399 .type = SYNC_OTHER,
1400 .idle = NULL,
1401 .task = task
1402 };
1403
1404 do {
1405 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1406 } while (was_syncing);
1407
1408 acquireAllCapabilities(*pCap,task);
1409
1410 pending_sync = 0;
1411 }
1412 #endif
1413
1414 /* -----------------------------------------------------------------------------
1415 * requestSync()
1416 *
1417 * Commence a synchronisation between all capabilities. Normally not called
1418 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1419 * has some special synchronisation requirements.
1420 *
1421 * Returns:
1422 * false if we successfully got a sync
1423 * true if there was another sync request in progress,
1424 * and we yielded to it. The value returned is the
1425 * type of the other sync request.
1426 * -------------------------------------------------------------------------- */
1427
1428 #if defined(THREADED_RTS)
1429 static bool requestSync (
1430 Capability **pcap, Task *task, PendingSync *new_sync,
1431 SyncType *prev_sync_type)
1432 {
1433 PendingSync *sync;
1434
1435 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1436 (StgWord)NULL,
1437 (StgWord)new_sync);
1438
1439 if (sync != NULL)
1440 {
1441 // sync is valid until we have called yieldCapability().
1442 // After the sync is completed, we cannot read that struct any
1443 // more because it has been freed.
1444 *prev_sync_type = sync->type;
1445 do {
1446 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1447 sync->type);
1448 ASSERT(*pcap);
1449 yieldCapability(pcap,task,true);
1450 sync = pending_sync;
1451 } while (sync != NULL);
1452
1453 // NOTE: task->cap might have changed now
1454 return true;
1455 }
1456 else
1457 {
1458 return false;
1459 }
1460 }
1461 #endif
1462
1463 /* -----------------------------------------------------------------------------
1464 * acquireAllCapabilities()
1465 *
1466 * Grab all the capabilities except the one we already hold. Used
1467 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1468 * before a fork (SYNC_OTHER).
1469 *
1470 * Only call this after requestSync(), otherwise a deadlock might
1471 * ensue if another thread is trying to synchronise.
1472 * -------------------------------------------------------------------------- */
1473
1474 #ifdef THREADED_RTS
1475 static void acquireAllCapabilities(Capability *cap, Task *task)
1476 {
1477 Capability *tmpcap;
1478 uint32_t i;
1479
1480 ASSERT(pending_sync != NULL);
1481 for (i=0; i < n_capabilities; i++) {
1482 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1483 i, n_capabilities);
1484 tmpcap = capabilities[i];
1485 if (tmpcap != cap) {
1486 // we better hope this task doesn't get migrated to
1487 // another Capability while we're waiting for this one.
1488 // It won't, because load balancing happens while we have
1489 // all the Capabilities, but even so it's a slightly
1490 // unsavoury invariant.
1491 task->cap = tmpcap;
1492 waitForCapability(&tmpcap, task);
1493 if (tmpcap->no != i) {
1494 barf("acquireAllCapabilities: got the wrong capability");
1495 }
1496 }
1497 }
1498 task->cap = cap;
1499 }
1500 #endif
1501
1502 /* -----------------------------------------------------------------------------
1503 * releaseAllcapabilities()
1504 *
1505 * Assuming this thread holds all the capabilities, release them all except for
1506 * the one passed in as cap.
1507 * -------------------------------------------------------------------------- */
1508
1509 #ifdef THREADED_RTS
1510 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1511 {
1512 uint32_t i;
1513
1514 for (i = 0; i < n; i++) {
1515 if (cap->no != i) {
1516 task->cap = capabilities[i];
1517 releaseCapability(capabilities[i]);
1518 }
1519 }
1520 task->cap = cap;
1521 }
1522 #endif
1523
1524 /* -----------------------------------------------------------------------------
1525 * Perform a garbage collection if necessary
1526 * -------------------------------------------------------------------------- */
1527
1528 static void
1529 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1530 bool force_major)
1531 {
1532 Capability *cap = *pcap;
1533 bool heap_census;
1534 uint32_t collect_gen;
1535 bool major_gc;
1536 #ifdef THREADED_RTS
1537 uint32_t gc_type;
1538 uint32_t i;
1539 uint32_t need_idle;
1540 uint32_t n_gc_threads;
1541 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1542 StgTSO *tso;
1543 bool *idle_cap;
1544 // idle_cap is an array (allocated later) of size n_capabilities, where
1545 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1546 // cycle.
1547 #endif
1548
1549 if (sched_state == SCHED_SHUTTING_DOWN) {
1550 // The final GC has already been done, and the system is
1551 // shutting down. We'll probably deadlock if we try to GC
1552 // now.
1553 return;
1554 }
1555
1556 heap_census = scheduleNeedHeapProfile(true);
1557
1558 // Figure out which generation we are collecting, so that we can
1559 // decide whether this is a parallel GC or not.
1560 collect_gen = calcNeeded(force_major || heap_census, NULL);
1561 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1562
1563 #ifdef THREADED_RTS
1564 if (sched_state < SCHED_INTERRUPTING
1565 && RtsFlags.ParFlags.parGcEnabled
1566 && collect_gen >= RtsFlags.ParFlags.parGcGen
1567 && ! oldest_gen->mark)
1568 {
1569 gc_type = SYNC_GC_PAR;
1570 } else {
1571 gc_type = SYNC_GC_SEQ;
1572 }
1573
1574 // In order to GC, there must be no threads running Haskell code.
1575 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1576 // capabilities, and release them after the GC has completed. For parallel
1577 // GC, we synchronise all the running threads using requestSync().
1578 //
1579 // Other capabilities are prevented from running yet more Haskell threads if
1580 // pending_sync is set. Tested inside yieldCapability() and
1581 // releaseCapability() in Capability.c
1582
1583 PendingSync sync = {
1584 .type = gc_type,
1585 .idle = NULL,
1586 .task = task
1587 };
1588
1589 {
1590 SyncType prev_sync = 0;
1591 bool was_syncing;
1592 do {
1593 // If -qn is not set and we have more capabilities than cores, set
1594 // the number of GC threads to #cores. We do this here rather than
1595 // in normaliseRtsOpts() because here it will work if the program
1596 // calls setNumCapabilities.
1597 //
1598 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1599 if (n_gc_threads == 0 &&
1600 enabled_capabilities > getNumberOfProcessors()) {
1601 n_gc_threads = getNumberOfProcessors();
1602 }
1603
1604 // This calculation must be inside the loop because
1605 // enabled_capabilities may change if requestSync() below fails and
1606 // we retry.
1607 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1608 if (n_gc_threads >= enabled_capabilities) {
1609 need_idle = 0;
1610 } else {
1611 need_idle = enabled_capabilities - n_gc_threads;
1612 }
1613 } else {
1614 need_idle = 0;
1615 }
1616
1617 // We need an array of size n_capabilities, but since this may
1618 // change each time around the loop we must allocate it afresh.
1619 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1620 sizeof(bool),
1621 "scheduleDoGC");
1622 sync.idle = idle_cap;
1623
1624 // When using +RTS -qn, we need some capabilities to be idle during
1625 // GC. The best bet is to choose some inactive ones, so we look for
1626 // those first:
1627 uint32_t n_idle = need_idle;
1628 for (i=0; i < n_capabilities; i++) {
1629 if (capabilities[i]->disabled) {
1630 idle_cap[i] = true;
1631 } else if (n_idle > 0 &&
1632 capabilities[i]->running_task == NULL) {
1633 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1634 n_idle--;
1635 idle_cap[i] = true;
1636 } else {
1637 idle_cap[i] = false;
1638 }
1639 }
1640 // If we didn't find enough inactive capabilities, just pick some
1641 // more to be idle.
1642 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1643 if (!idle_cap[i] && i != cap->no) {
1644 idle_cap[i] = true;
1645 n_idle--;
1646 }
1647 }
1648 ASSERT(n_idle == 0);
1649
1650 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1651 cap = *pcap;
1652 if (was_syncing) {
1653 stgFree(idle_cap);
1654 }
1655 if (was_syncing &&
1656 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1657 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1658 // someone else had a pending sync request for a GC, so
1659 // let's assume GC has been done and we don't need to GC
1660 // again.
1661 // Exception to this: if SCHED_INTERRUPTING, then we still
1662 // need to do the final GC.
1663 return;
1664 }
1665 if (sched_state == SCHED_SHUTTING_DOWN) {
1666 // The scheduler might now be shutting down. We tested
1667 // this above, but it might have become true since then as
1668 // we yielded the capability in requestSync().
1669 return;
1670 }
1671 } while (was_syncing);
1672 }
1673
1674 stat_startGCSync(gc_threads[cap->no]);
1675
1676 #ifdef DEBUG
1677 unsigned int old_n_capabilities = n_capabilities;
1678 #endif
1679
1680 interruptAllCapabilities();
1681
1682 // The final shutdown GC is always single-threaded, because it's
1683 // possible that some of the Capabilities have no worker threads.
1684
1685 if (gc_type == SYNC_GC_SEQ) {
1686 traceEventRequestSeqGc(cap);
1687 } else {
1688 traceEventRequestParGc(cap);
1689 }
1690
1691 if (gc_type == SYNC_GC_SEQ) {
1692 // single-threaded GC: grab all the capabilities
1693 acquireAllCapabilities(cap,task);
1694 }
1695 else
1696 {
1697 // If we are load-balancing collections in this
1698 // generation, then we require all GC threads to participate
1699 // in the collection. Otherwise, we only require active
1700 // threads to participate, and we set gc_threads[i]->idle for
1701 // any idle capabilities. The rationale here is that waking
1702 // up an idle Capability takes much longer than just doing any
1703 // GC work on its behalf.
1704
1705 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1706 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1707 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1708 {
1709 for (i=0; i < n_capabilities; i++) {
1710 if (capabilities[i]->disabled) {
1711 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1712 if (idle_cap[i]) {
1713 n_idle_caps++;
1714 }
1715 } else {
1716 if (i != cap->no && idle_cap[i]) {
1717 Capability *tmpcap = capabilities[i];
1718 task->cap = tmpcap;
1719 waitForCapability(&tmpcap, task);
1720 n_idle_caps++;
1721 }
1722 }
1723 }
1724 }
1725 else
1726 {
1727 for (i=0; i < n_capabilities; i++) {
1728 if (capabilities[i]->disabled) {
1729 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1730 if (idle_cap[i]) {
1731 n_idle_caps++;
1732 }
1733 } else if (i != cap->no &&
1734 capabilities[i]->idle >=
1735 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1736 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1737 if (idle_cap[i]) {
1738 n_idle_caps++;
1739 } else {
1740 n_failed_trygrab_idles++;
1741 }
1742 }
1743 }
1744 }
1745 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1746
1747 for (i=0; i < n_capabilities; i++) {
1748 capabilities[i]->idle++;
1749 }
1750
1751 // For all capabilities participating in this GC, wait until
1752 // they have stopped mutating and are standing by for GC.
1753 waitForGcThreads(cap, idle_cap);
1754
1755 #if defined(THREADED_RTS)
1756 // Stable point where we can do a global check on our spark counters
1757 ASSERT(checkSparkCountInvariant());
1758 #endif
1759 }
1760
1761 #endif
1762
1763 IF_DEBUG(scheduler, printAllThreads());
1764
1765 delete_threads_and_gc:
1766 /*
1767 * We now have all the capabilities; if we're in an interrupting
1768 * state, then we should take the opportunity to delete all the
1769 * threads in the system.
1770 * Checking for major_gc ensures that the last GC is major.
1771 */
1772 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1773 deleteAllThreads(cap);
1774 #if defined(THREADED_RTS)
1775 // Discard all the sparks from every Capability. Why?
1776 // They'll probably be GC'd anyway since we've killed all the
1777 // threads. It just avoids the GC having to do any work to
1778 // figure out that any remaining sparks are garbage.
1779 for (i = 0; i < n_capabilities; i++) {
1780 capabilities[i]->spark_stats.gcd +=
1781 sparkPoolSize(capabilities[i]->sparks);
1782 // No race here since all Caps are stopped.
1783 discardSparksCap(capabilities[i]);
1784 }
1785 #endif
1786 sched_state = SCHED_SHUTTING_DOWN;
1787 }
1788
1789 /*
1790 * When there are disabled capabilities, we want to migrate any
1791 * threads away from them. Normally this happens in the
1792 * scheduler's loop, but only for unbound threads - it's really
1793 * hard for a bound thread to migrate itself. So we have another
1794 * go here.
1795 */
1796 #if defined(THREADED_RTS)
1797 for (i = enabled_capabilities; i < n_capabilities; i++) {
1798 Capability *tmp_cap, *dest_cap;
1799 tmp_cap = capabilities[i];
1800 ASSERT(tmp_cap->disabled);
1801 if (i != cap->no) {
1802 dest_cap = capabilities[i % enabled_capabilities];
1803 while (!emptyRunQueue(tmp_cap)) {
1804 tso = popRunQueue(tmp_cap);
1805 migrateThread(tmp_cap, tso, dest_cap);
1806 if (tso->bound) {
1807 traceTaskMigrate(tso->bound->task,
1808 tso->bound->task->cap,
1809 dest_cap);
1810 tso->bound->task->cap = dest_cap;
1811 }
1812 }
1813 }
1814 }
1815 #endif
1816
1817 #if defined(THREADED_RTS)
1818 // reset pending_sync *before* GC, so that when the GC threads
1819 // emerge they don't immediately re-enter the GC.
1820 pending_sync = 0;
1821 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1822 #else
1823 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1824 #endif
1825
1826 traceSparkCounters(cap);
1827
1828 switch (recent_activity) {
1829 case ACTIVITY_INACTIVE:
1830 if (force_major) {
1831 // We are doing a GC because the system has been idle for a
1832 // timeslice and we need to check for deadlock. Record the
1833 // fact that we've done a GC and turn off the timer signal;
1834 // it will get re-enabled if we run any threads after the GC.
1835 recent_activity = ACTIVITY_DONE_GC;
1836 #ifndef PROFILING
1837 stopTimer();
1838 #endif
1839 break;
1840 }
1841 // fall through...
1842
1843 case ACTIVITY_MAYBE_NO:
1844 // the GC might have taken long enough for the timer to set
1845 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1846 // but we aren't necessarily deadlocked:
1847 recent_activity = ACTIVITY_YES;
1848 break;
1849
1850 case ACTIVITY_DONE_GC:
1851 // If we are actually active, the scheduler will reset the
1852 // recent_activity flag and re-enable the timer.
1853 break;
1854 }
1855
1856 #if defined(THREADED_RTS)
1857 // Stable point where we can do a global check on our spark counters
1858 ASSERT(checkSparkCountInvariant());
1859 #endif
1860
1861 // The heap census itself is done during GarbageCollect().
1862 if (heap_census) {
1863 performHeapProfile = false;
1864 }
1865
1866 #if defined(THREADED_RTS)
1867
1868 // If n_capabilities has changed during GC, we're in trouble.
1869 ASSERT(n_capabilities == old_n_capabilities);
1870
1871 if (gc_type == SYNC_GC_PAR)
1872 {
1873 for (i = 0; i < n_capabilities; i++) {
1874 if (i != cap->no) {
1875 if (idle_cap[i]) {
1876 ASSERT(capabilities[i]->running_task == task);
1877 task->cap = capabilities[i];
1878 releaseCapability(capabilities[i]);
1879 } else {
1880 ASSERT(capabilities[i]->running_task != task);
1881 }
1882 }
1883 }
1884 task->cap = cap;
1885
1886 // releaseGCThreads() happens *after* we have released idle
1887 // capabilities. Otherwise what can happen is one of the released
1888 // threads starts a new GC, and finds that it can't acquire some of
1889 // the disabled capabilities, because the previous GC still holds
1890 // them, so those disabled capabilities will not be idle during the
1891 // next GC round. However, if we release the capabilities first,
1892 // then they will be free (because they're disabled) when the next
1893 // GC cycle happens.
1894 releaseGCThreads(cap, idle_cap);
1895 }
1896 #endif
1897 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1898 // GC set the heap_overflow flag. We should throw an exception if we
1899 // can, or shut down otherwise.
1900
1901 // Get the thread to which Ctrl-C is thrown
1902 StgTSO *main_thread = getTopHandlerThread();
1903 if (main_thread == NULL) {
1904 // GC set the heap_overflow flag, and there is no main thread to
1905 // throw an exception to, so we should proceed with an orderly
1906 // shutdown now. Ultimately we want the main thread to return to
1907 // its caller with HeapExhausted, at which point the caller should
1908 // call hs_exit(). The first step is to delete all the threads.
1909 sched_state = SCHED_INTERRUPTING;
1910 goto delete_threads_and_gc;
1911 }
1912
1913 heap_overflow = false;
1914 const uint64_t allocation_count = getAllocations();
1915 if (RtsFlags.GcFlags.heapLimitGrace <
1916 allocation_count - allocated_bytes_at_heapoverflow ||
1917 allocated_bytes_at_heapoverflow == 0) {
1918 allocated_bytes_at_heapoverflow = allocation_count;
1919 // We used to simply exit, but throwing an exception gives the
1920 // program a chance to clean up. It also lets the exception be
1921 // caught.
1922
1923 // FIXME this is not a good way to tell a program to release
1924 // resources. It is neither reliable (the RTS crashes if it fails
1925 // to allocate memory from the OS) nor very usable (it is always
1926 // thrown to the main thread, which might not be able to do anything
1927 // useful with it). We really should have a more general way to
1928 // release resources in low-memory conditions. Nevertheless, this
1929 // is still a big improvement over just exiting.
1930
1931 // FIXME again: perhaps we should throw a synchronous exception
1932 // instead an asynchronous one, or have a way for the program to
1933 // register a handler to be called when heap overflow happens.
1934 throwToSelf(cap, main_thread, heapOverflow_closure);
1935 }
1936 }
1937 #ifdef SPARKBALANCE
1938 /* JB
1939 Once we are all together... this would be the place to balance all
1940 spark pools. No concurrent stealing or adding of new sparks can
1941 occur. Should be defined in Sparks.c. */
1942 balanceSparkPoolsCaps(n_capabilities, capabilities);
1943 #endif
1944
1945 #if defined(THREADED_RTS)
1946 stgFree(idle_cap);
1947
1948 if (gc_type == SYNC_GC_SEQ) {
1949 // release our stash of capabilities.
1950 releaseAllCapabilities(n_capabilities, cap, task);
1951 }
1952 #endif
1953
1954 return;
1955 }
1956
1957 /* ---------------------------------------------------------------------------
1958 * Singleton fork(). Do not copy any running threads.
1959 * ------------------------------------------------------------------------- */
1960
1961 pid_t
1962 forkProcess(HsStablePtr *entry
1963 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1964 STG_UNUSED
1965 #endif
1966 )
1967 {
1968 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1969 pid_t pid;
1970 StgTSO* t,*next;
1971 Capability *cap;
1972 uint32_t g;
1973 Task *task = NULL;
1974 uint32_t i;
1975
1976 debugTrace(DEBUG_sched, "forking!");
1977
1978 task = newBoundTask();
1979
1980 cap = NULL;
1981 waitForCapability(&cap, task);
1982
1983 #ifdef THREADED_RTS
1984 stopAllCapabilities(&cap, task);
1985 #endif
1986
1987 // no funny business: hold locks while we fork, otherwise if some
1988 // other thread is holding a lock when the fork happens, the data
1989 // structure protected by the lock will forever be in an
1990 // inconsistent state in the child. See also #1391.
1991 ACQUIRE_LOCK(&sched_mutex);
1992 ACQUIRE_LOCK(&sm_mutex);
1993 ACQUIRE_LOCK(&stable_mutex);
1994 ACQUIRE_LOCK(&task->lock);
1995
1996 for (i=0; i < n_capabilities; i++) {
1997 ACQUIRE_LOCK(&capabilities[i]->lock);
1998 }
1999
2000 #ifdef THREADED_RTS
2001 ACQUIRE_LOCK(&all_tasks_mutex);
2002 #endif
2003
2004 stopTimer(); // See #4074
2005
2006 #if defined(TRACING)
2007 flushEventLog(); // so that child won't inherit dirty file buffers
2008 #endif
2009
2010 pid = fork();
2011
2012 if (pid) { // parent
2013
2014 startTimer(); // #4074
2015
2016 RELEASE_LOCK(&sched_mutex);
2017 RELEASE_LOCK(&sm_mutex);
2018 RELEASE_LOCK(&stable_mutex);
2019 RELEASE_LOCK(&task->lock);
2020
2021 for (i=0; i < n_capabilities; i++) {
2022 releaseCapability_(capabilities[i],false);
2023 RELEASE_LOCK(&capabilities[i]->lock);
2024 }
2025
2026 #ifdef THREADED_RTS
2027 RELEASE_LOCK(&all_tasks_mutex);
2028 #endif
2029
2030 boundTaskExiting(task);
2031
2032 // just return the pid
2033 return pid;
2034
2035 } else { // child
2036
2037 #if defined(THREADED_RTS)
2038 initMutex(&sched_mutex);
2039 initMutex(&sm_mutex);
2040 initMutex(&stable_mutex);
2041 initMutex(&task->lock);
2042
2043 for (i=0; i < n_capabilities; i++) {
2044 initMutex(&capabilities[i]->lock);
2045 }
2046
2047 initMutex(&all_tasks_mutex);
2048 #endif
2049
2050 #ifdef TRACING
2051 resetTracing();
2052 #endif
2053
2054 // Now, all OS threads except the thread that forked are
2055 // stopped. We need to stop all Haskell threads, including
2056 // those involved in foreign calls. Also we need to delete
2057 // all Tasks, because they correspond to OS threads that are
2058 // now gone.
2059
2060 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2061 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2062 next = t->global_link;
2063 // don't allow threads to catch the ThreadKilled
2064 // exception, but we do want to raiseAsync() because these
2065 // threads may be evaluating thunks that we need later.
2066 deleteThread_(t->cap,t);
2067
2068 // stop the GC from updating the InCall to point to
2069 // the TSO. This is only necessary because the
2070 // OSThread bound to the TSO has been killed, and
2071 // won't get a chance to exit in the usual way (see
2072 // also scheduleHandleThreadFinished).
2073 t->bound = NULL;
2074 }
2075 }
2076
2077 discardTasksExcept(task);
2078
2079 for (i=0; i < n_capabilities; i++) {
2080 cap = capabilities[i];
2081
2082 // Empty the run queue. It seems tempting to let all the
2083 // killed threads stay on the run queue as zombies to be
2084 // cleaned up later, but some of them may correspond to
2085 // bound threads for which the corresponding Task does not
2086 // exist.
2087 truncateRunQueue(cap);
2088 cap->n_run_queue = 0;
2089
2090 // Any suspended C-calling Tasks are no more, their OS threads
2091 // don't exist now:
2092 cap->suspended_ccalls = NULL;
2093 cap->n_suspended_ccalls = 0;
2094
2095 #if defined(THREADED_RTS)
2096 // Wipe our spare workers list, they no longer exist. New
2097 // workers will be created if necessary.
2098 cap->spare_workers = NULL;
2099 cap->n_spare_workers = 0;
2100 cap->returning_tasks_hd = NULL;
2101 cap->returning_tasks_tl = NULL;
2102 cap->n_returning_tasks = 0;
2103 #endif
2104
2105 // Release all caps except 0, we'll use that for starting
2106 // the IO manager and running the client action below.
2107 if (cap->no != 0) {
2108 task->cap = cap;
2109 releaseCapability(cap);
2110 }
2111 }
2112 cap = capabilities[0];
2113 task->cap = cap;
2114
2115 // Empty the threads lists. Otherwise, the garbage
2116 // collector may attempt to resurrect some of these threads.
2117 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2118 generations[g].threads = END_TSO_QUEUE;
2119 }
2120
2121 // On Unix, all timers are reset in the child, so we need to start
2122 // the timer again.
2123 initTimer();
2124 startTimer();
2125
2126 // TODO: need to trace various other things in the child
2127 // like startup event, capabilities, process info etc
2128 traceTaskCreate(task, cap);
2129
2130 #if defined(THREADED_RTS)
2131 ioManagerStartCap(&cap);
2132 #endif
2133
2134 // Install toplevel exception handlers, so interruption
2135 // signal will be sent to the main thread.
2136 // See Trac #12903
2137 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2138 rts_checkSchedStatus("forkProcess",cap);
2139
2140 rts_unlock(cap);
2141 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2142 }
2143 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2144 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2145 #endif
2146 }
2147
2148 /* ---------------------------------------------------------------------------
2149 * Changing the number of Capabilities
2150 *
2151 * Changing the number of Capabilities is very tricky! We can only do
2152 * it with the system fully stopped, so we do a full sync with
2153 * requestSync(SYNC_OTHER) and grab all the capabilities.
2154 *
2155 * Then we resize the appropriate data structures, and update all
2156 * references to the old data structures which have now moved.
2157 * Finally we release the Capabilities we are holding, and start
2158 * worker Tasks on the new Capabilities we created.
2159 *
2160 * ------------------------------------------------------------------------- */
2161
2162 void
2163 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2164 {
2165 #if !defined(THREADED_RTS)
2166 if (new_n_capabilities != 1) {
2167 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2168 }
2169 return;
2170 #elif defined(NOSMP)
2171 if (new_n_capabilities != 1) {
2172 errorBelch("setNumCapabilities: not supported on this platform");
2173 }
2174 return;
2175 #else
2176 Task *task;
2177 Capability *cap;
2178 uint32_t n;
2179 Capability *old_capabilities = NULL;
2180 uint32_t old_n_capabilities = n_capabilities;
2181
2182 if (new_n_capabilities == enabled_capabilities) return;
2183
2184 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2185 enabled_capabilities, new_n_capabilities);
2186
2187 cap = rts_lock();
2188 task = cap->running_task;
2189
2190 stopAllCapabilities(&cap, task);
2191
2192 if (new_n_capabilities < enabled_capabilities)
2193 {
2194 // Reducing the number of capabilities: we do not actually
2195 // remove the extra capabilities, we just mark them as
2196 // "disabled". This has the following effects:
2197 //
2198 // - threads on a disabled capability are migrated away by the
2199 // scheduler loop
2200 //
2201 // - disabled capabilities do not participate in GC
2202 // (see scheduleDoGC())
2203 //
2204 // - No spark threads are created on this capability
2205 // (see scheduleActivateSpark())
2206 //
2207 // - We do not attempt to migrate threads *to* a disabled
2208 // capability (see schedulePushWork()).
2209 //
2210 // but in other respects, a disabled capability remains
2211 // alive. Threads may be woken up on a disabled capability,
2212 // but they will be immediately migrated away.
2213 //
2214 // This approach is much easier than trying to actually remove
2215 // the capability; we don't have to worry about GC data
2216 // structures, the nursery, etc.
2217 //
2218 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2219 capabilities[n]->disabled = true;
2220 traceCapDisable(capabilities[n]);
2221 }
2222 enabled_capabilities = new_n_capabilities;
2223 }
2224 else
2225 {
2226 // Increasing the number of enabled capabilities.
2227 //
2228 // enable any disabled capabilities, up to the required number
2229 for (n = enabled_capabilities;
2230 n < new_n_capabilities && n < n_capabilities; n++) {
2231 capabilities[n]->disabled = false;
2232 traceCapEnable(capabilities[n]);
2233 }
2234 enabled_capabilities = n;
2235
2236 if (new_n_capabilities > n_capabilities) {
2237 #if defined(TRACING)
2238 // Allocate eventlog buffers for the new capabilities. Note this
2239 // must be done before calling moreCapabilities(), because that
2240 // will emit events about creating the new capabilities and adding
2241 // them to existing capsets.
2242 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2243 #endif
2244
2245 // Resize the capabilities array
2246 // NB. after this, capabilities points somewhere new. Any pointers
2247 // of type (Capability *) are now invalid.
2248 moreCapabilities(n_capabilities, new_n_capabilities);
2249
2250 // Resize and update storage manager data structures
2251 storageAddCapabilities(n_capabilities, new_n_capabilities);
2252 }
2253 }
2254
2255 // update n_capabilities before things start running
2256 if (new_n_capabilities > n_capabilities) {
2257 n_capabilities = enabled_capabilities = new_n_capabilities;
2258 }
2259
2260 // We're done: release the original Capabilities
2261 releaseAllCapabilities(old_n_capabilities, cap,task);
2262
2263 // We can't free the old array until now, because we access it
2264 // while updating pointers in updateCapabilityRefs().
2265 if (old_capabilities) {
2266 stgFree(old_capabilities);
2267 }
2268
2269 // Notify IO manager that the number of capabilities has changed.
2270 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2271
2272 rts_unlock(cap);
2273
2274 #endif // THREADED_RTS
2275 }
2276
2277
2278
2279 /* ---------------------------------------------------------------------------
2280 * Delete all the threads in the system
2281 * ------------------------------------------------------------------------- */
2282
2283 static void
2284 deleteAllThreads ( Capability *cap )
2285 {
2286 // NOTE: only safe to call if we own all capabilities.
2287
2288 StgTSO* t, *next;
2289 uint32_t g;
2290
2291 debugTrace(DEBUG_sched,"deleting all threads");
2292 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2293 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2294 next = t->global_link;
2295 deleteThread(cap,t);
2296 }
2297 }
2298
2299 // The run queue now contains a bunch of ThreadKilled threads. We
2300 // must not throw these away: the main thread(s) will be in there
2301 // somewhere, and the main scheduler loop has to deal with it.
2302 // Also, the run queue is the only thing keeping these threads from
2303 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2304
2305 #if !defined(THREADED_RTS)
2306 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2307 ASSERT(sleeping_queue == END_TSO_QUEUE);
2308 #endif
2309 }
2310
2311 /* -----------------------------------------------------------------------------
2312 Managing the suspended_ccalls list.
2313 Locks required: sched_mutex
2314 -------------------------------------------------------------------------- */
2315
2316 STATIC_INLINE void
2317 suspendTask (Capability *cap, Task *task)
2318 {
2319 InCall *incall;
2320
2321 incall = task->incall;
2322 ASSERT(incall->next == NULL && incall->prev == NULL);
2323 incall->next = cap->suspended_ccalls;
2324 incall->prev = NULL;
2325 if (cap->suspended_ccalls) {
2326 cap->suspended_ccalls->prev = incall;
2327 }
2328 cap->suspended_ccalls = incall;
2329 cap->n_suspended_ccalls++;
2330 }
2331
2332 STATIC_INLINE void
2333 recoverSuspendedTask (Capability *cap, Task *task)
2334 {
2335 InCall *incall;
2336
2337 incall = task->incall;
2338 if (incall->prev) {
2339 incall->prev->next = incall->next;
2340 } else {
2341 ASSERT(cap->suspended_ccalls == incall);
2342 cap->suspended_ccalls = incall->next;
2343 }
2344 if (incall->next) {
2345 incall->next->prev = incall->prev;
2346 }
2347 incall->next = incall->prev = NULL;
2348 cap->n_suspended_ccalls--;
2349 }
2350
2351 /* ---------------------------------------------------------------------------
2352 * Suspending & resuming Haskell threads.
2353 *
2354 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2355 * its capability before calling the C function. This allows another
2356 * task to pick up the capability and carry on running Haskell
2357 * threads. It also means that if the C call blocks, it won't lock
2358 * the whole system.
2359 *
2360 * The Haskell thread making the C call is put to sleep for the
2361 * duration of the call, on the suspended_ccalling_threads queue. We
2362 * give out a token to the task, which it can use to resume the thread
2363 * on return from the C function.
2364 *
2365 * If this is an interruptible C call, this means that the FFI call may be
2366 * unceremoniously terminated and should be scheduled on an
2367 * unbound worker thread.
2368 * ------------------------------------------------------------------------- */
2369
2370 void *
2371 suspendThread (StgRegTable *reg, bool interruptible)
2372 {
2373 Capability *cap;
2374 int saved_errno;
2375 StgTSO *tso;
2376 Task *task;
2377 #ifdef mingw32_HOST_OS
2378 StgWord32 saved_winerror;
2379 #endif
2380
2381 saved_errno = errno;
2382 #ifdef mingw32_HOST_OS
2383 saved_winerror = GetLastError();
2384 #endif
2385
2386 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2387 */
2388 cap = regTableToCapability(reg);
2389
2390 task = cap->running_task;
2391 tso = cap->r.rCurrentTSO;
2392
2393 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2394
2395 // XXX this might not be necessary --SDM
2396 tso->what_next = ThreadRunGHC;
2397
2398 threadPaused(cap,tso);
2399
2400 if (interruptible) {
2401 tso->why_blocked = BlockedOnCCall_Interruptible;
2402 } else {
2403 tso->why_blocked = BlockedOnCCall;
2404 }
2405
2406 // Hand back capability
2407 task->incall->suspended_tso = tso;
2408 task->incall->suspended_cap = cap;
2409
2410 // Otherwise allocate() will write to invalid memory.
2411 cap->r.rCurrentTSO = NULL;
2412
2413 ACQUIRE_LOCK(&cap->lock);
2414
2415 suspendTask(cap,task);
2416 cap->in_haskell = false;
2417 releaseCapability_(cap,false);
2418
2419 RELEASE_LOCK(&cap->lock);
2420
2421 errno = saved_errno;
2422 #ifdef mingw32_HOST_OS
2423 SetLastError(saved_winerror);
2424 #endif
2425 return task;
2426 }
2427
2428 StgRegTable *
2429 resumeThread (void *task_)
2430 {
2431 StgTSO *tso;
2432 InCall *incall;
2433 Capability *cap;
2434 Task *task = task_;
2435 int saved_errno;
2436 #ifdef mingw32_HOST_OS
2437 StgWord32 saved_winerror;
2438 #endif
2439
2440 saved_errno = errno;
2441 #ifdef mingw32_HOST_OS
2442 saved_winerror = GetLastError();
2443 #endif
2444
2445 incall = task->incall;
2446 cap = incall->suspended_cap;
2447 task->cap = cap;
2448
2449 // Wait for permission to re-enter the RTS with the result.
2450 waitForCapability(&cap,task);
2451 // we might be on a different capability now... but if so, our
2452 // entry on the suspended_ccalls list will also have been
2453 // migrated.
2454
2455 // Remove the thread from the suspended list
2456 recoverSuspendedTask(cap,task);
2457
2458 tso = incall->suspended_tso;
2459 incall->suspended_tso = NULL;
2460 incall->suspended_cap = NULL;
2461 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2462
2463 traceEventRunThread(cap, tso);
2464
2465 /* Reset blocking status */
2466 tso->why_blocked = NotBlocked;
2467
2468 if ((tso->flags & TSO_BLOCKEX) == 0) {
2469 // avoid locking the TSO if we don't have to
2470 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2471 maybePerformBlockedException(cap,tso);
2472 }
2473 }
2474
2475 cap->r.rCurrentTSO = tso;
2476 cap->in_haskell = true;
2477 errno = saved_errno;
2478 #ifdef mingw32_HOST_OS
2479 SetLastError(saved_winerror);
2480 #endif
2481
2482 /* We might have GC'd, mark the TSO dirty again */
2483 dirty_TSO(cap,tso);
2484 dirty_STACK(cap,tso->stackobj);
2485
2486 IF_DEBUG(sanity, checkTSO(tso));
2487
2488 return &cap->r;
2489 }
2490
2491 /* ---------------------------------------------------------------------------
2492 * scheduleThread()
2493 *
2494 * scheduleThread puts a thread on the end of the runnable queue.
2495 * This will usually be done immediately after a thread is created.
2496 * The caller of scheduleThread must create the thread using e.g.
2497 * createThread and push an appropriate closure
2498 * on this thread's stack before the scheduler is invoked.
2499 * ------------------------------------------------------------------------ */
2500
2501 void
2502 scheduleThread(Capability *cap, StgTSO *tso)
2503 {
2504 // The thread goes at the *end* of the run-queue, to avoid possible
2505 // starvation of any threads already on the queue.
2506 appendToRunQueue(cap,tso);
2507 }
2508
2509 void
2510 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2511 {
2512 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2513 // move this thread from now on.
2514 #if defined(THREADED_RTS)
2515 cpu %= enabled_capabilities;
2516 if (cpu == cap->no) {
2517 appendToRunQueue(cap,tso);
2518 } else {
2519 migrateThread(cap, tso, capabilities[cpu]);
2520 }
2521 #else
2522 appendToRunQueue(cap,tso);
2523 #endif
2524 }
2525
2526 void
2527 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2528 {
2529 Task *task;
2530 DEBUG_ONLY( StgThreadID id );
2531 Capability *cap;
2532
2533 cap = *pcap;
2534
2535 // We already created/initialised the Task
2536 task = cap->running_task;
2537
2538 // This TSO is now a bound thread; make the Task and TSO
2539 // point to each other.
2540 tso->bound = task->incall;
2541 tso->cap = cap;
2542
2543 task->incall->tso = tso;
2544 task->incall->ret = ret;
2545 task->incall->rstat = NoStatus;
2546
2547 appendToRunQueue(cap,tso);
2548
2549 DEBUG_ONLY( id = tso->id );
2550 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2551
2552 cap = schedule(cap,task);
2553
2554 ASSERT(task->incall->rstat != NoStatus);
2555 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2556
2557 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2558 *pcap = cap;
2559 }
2560
2561 /* ----------------------------------------------------------------------------
2562 * Starting Tasks
2563 * ------------------------------------------------------------------------- */
2564
2565 #if defined(THREADED_RTS)
2566 void scheduleWorker (Capability *cap, Task *task)
2567 {
2568 // schedule() runs without a lock.
2569 cap = schedule(cap,task);
2570
2571 // On exit from schedule(), we have a Capability, but possibly not
2572 // the same one we started with.
2573
2574 // During shutdown, the requirement is that after all the
2575 // Capabilities are shut down, all workers that are shutting down
2576 // have finished workerTaskStop(). This is why we hold on to
2577 // cap->lock until we've finished workerTaskStop() below.
2578 //
2579 // There may be workers still involved in foreign calls; those
2580 // will just block in waitForCapability() because the
2581 // Capability has been shut down.
2582 //
2583 ACQUIRE_LOCK(&cap->lock);
2584 releaseCapability_(cap,false);
2585 workerTaskStop(task);
2586 RELEASE_LOCK(&cap->lock);
2587 }
2588 #endif
2589
2590 /* ---------------------------------------------------------------------------
2591 * Start new worker tasks on Capabilities from--to
2592 * -------------------------------------------------------------------------- */
2593
2594 static void
2595 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2596 {
2597 #if defined(THREADED_RTS)
2598 uint32_t i;
2599 Capability *cap;
2600
2601 for (i = from; i < to; i++) {
2602 cap = capabilities[i];
2603 ACQUIRE_LOCK(&cap->lock);
2604 startWorkerTask(cap);
2605 RELEASE_LOCK(&cap->lock);
2606 }
2607 #endif
2608 }
2609
2610 /* ---------------------------------------------------------------------------
2611 * initScheduler()
2612 *
2613 * Initialise the scheduler. This resets all the queues - if the
2614 * queues contained any threads, they'll be garbage collected at the
2615 * next pass.
2616 *
2617 * ------------------------------------------------------------------------ */
2618
2619 void
2620 initScheduler(void)
2621 {
2622 #if !defined(THREADED_RTS)
2623 blocked_queue_hd = END_TSO_QUEUE;
2624 blocked_queue_tl = END_TSO_QUEUE;
2625 sleeping_queue = END_TSO_QUEUE;
2626 #endif
2627
2628 sched_state = SCHED_RUNNING;
2629 recent_activity = ACTIVITY_YES;
2630
2631 #if defined(THREADED_RTS)
2632 /* Initialise the mutex and condition variables used by
2633 * the scheduler. */
2634 initMutex(&sched_mutex);
2635 #endif
2636
2637 ACQUIRE_LOCK(&sched_mutex);
2638
2639 allocated_bytes_at_heapoverflow = 0;
2640
2641 /* A capability holds the state a native thread needs in
2642 * order to execute STG code. At least one capability is
2643 * floating around (only THREADED_RTS builds have more than one).
2644 */
2645 initCapabilities();
2646
2647 initTaskManager();
2648
2649 /*
2650 * Eagerly start one worker to run each Capability, except for
2651 * Capability 0. The idea is that we're probably going to start a
2652 * bound thread on Capability 0 pretty soon, so we don't want a
2653 * worker task hogging it.
2654 */
2655 startWorkerTasks(1, n_capabilities);
2656
2657 RELEASE_LOCK(&sched_mutex);
2658
2659 }
2660
2661 void
2662 exitScheduler (bool wait_foreign USED_IF_THREADS)
2663 /* see Capability.c, shutdownCapability() */
2664 {
2665 Task *task = NULL;
2666
2667 task = newBoundTask();
2668
2669 // If we haven't killed all the threads yet, do it now.
2670 if (sched_state < SCHED_SHUTTING_DOWN) {
2671 sched_state = SCHED_INTERRUPTING;
2672 Capability *cap = task->cap;
2673 waitForCapability(&cap,task);
2674 scheduleDoGC(&cap,task,true);
2675 ASSERT(task->incall->tso == NULL);
2676 releaseCapability(cap);
2677 }
2678 sched_state = SCHED_SHUTTING_DOWN;
2679
2680 shutdownCapabilities(task, wait_foreign);
2681
2682 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2683 // n_failed_trygrab_idles, n_idle_caps);
2684
2685 boundTaskExiting(task);
2686 }
2687
2688 void
2689 freeScheduler( void )
2690 {
2691 uint32_t still_running;
2692
2693 ACQUIRE_LOCK(&sched_mutex);
2694 still_running = freeTaskManager();
2695 // We can only free the Capabilities if there are no Tasks still
2696 // running. We might have a Task about to return from a foreign
2697 // call into waitForCapability(), for example (actually,
2698 // this should be the *only* thing that a still-running Task can
2699 // do at this point, and it will block waiting for the
2700 // Capability).
2701 if (still_running == 0) {
2702 freeCapabilities();
2703 }
2704 RELEASE_LOCK(&sched_mutex);
2705 #if defined(THREADED_RTS)
2706 closeMutex(&sched_mutex);
2707 #endif
2708 }
2709
2710 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2711 void *user USED_IF_NOT_THREADS)
2712 {
2713 #if !defined(THREADED_RTS)
2714 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2715 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2716 evac(user, (StgClosure **)(void *)&sleeping_queue);
2717 #endif
2718 }
2719
2720 /* -----------------------------------------------------------------------------
2721 performGC
2722
2723 This is the interface to the garbage collector from Haskell land.
2724 We provide this so that external C code can allocate and garbage
2725 collect when called from Haskell via _ccall_GC.
2726 -------------------------------------------------------------------------- */
2727
2728 static void
2729 performGC_(bool force_major)
2730 {
2731 Task *task;
2732 Capability *cap = NULL;
2733
2734 // We must grab a new Task here, because the existing Task may be
2735 // associated with a particular Capability, and chained onto the
2736 // suspended_ccalls queue.
2737 task = newBoundTask();
2738
2739 // TODO: do we need to traceTask*() here?
2740
2741 waitForCapability(&cap,task);
2742 scheduleDoGC(&cap,task,force_major);
2743 releaseCapability(cap);
2744 boundTaskExiting(task);
2745 }
2746
2747 void
2748 performGC(void)
2749 {
2750 performGC_(false);
2751 }
2752
2753 void
2754 performMajorGC(void)
2755 {
2756 performGC_(true);
2757 }
2758
2759 /* ---------------------------------------------------------------------------
2760 Interrupt execution.
2761 Might be called inside a signal handler so it mustn't do anything fancy.
2762 ------------------------------------------------------------------------ */
2763
2764 void
2765 interruptStgRts(void)
2766 {
2767 sched_state = SCHED_INTERRUPTING;
2768 interruptAllCapabilities();
2769 #if defined(THREADED_RTS)
2770 wakeUpRts();
2771 #endif
2772 }
2773
2774 /* -----------------------------------------------------------------------------
2775 Wake up the RTS
2776
2777 This function causes at least one OS thread to wake up and run the
2778 scheduler loop. It is invoked when the RTS might be deadlocked, or
2779 an external event has arrived that may need servicing (eg. a
2780 keyboard interrupt).
2781
2782 In the single-threaded RTS we don't do anything here; we only have
2783 one thread anyway, and the event that caused us to want to wake up
2784 will have interrupted any blocking system call in progress anyway.
2785 -------------------------------------------------------------------------- */
2786
2787 #if defined(THREADED_RTS)
2788 void wakeUpRts(void)
2789 {
2790 // This forces the IO Manager thread to wakeup, which will
2791 // in turn ensure that some OS thread wakes up and runs the
2792 // scheduler loop, which will cause a GC and deadlock check.
2793 ioManagerWakeup();
2794 }
2795 #endif
2796
2797 /* -----------------------------------------------------------------------------
2798 Deleting threads
2799
2800 This is used for interruption (^C) and forking, and corresponds to
2801 raising an exception but without letting the thread catch the
2802 exception.
2803 -------------------------------------------------------------------------- */
2804
2805 static void
2806 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2807 {
2808 // NOTE: must only be called on a TSO that we have exclusive
2809 // access to, because we will call throwToSingleThreaded() below.
2810 // The TSO must be on the run queue of the Capability we own, or
2811 // we must own all Capabilities.
2812
2813 if (tso->why_blocked != BlockedOnCCall &&
2814 tso->why_blocked != BlockedOnCCall_Interruptible) {
2815 throwToSingleThreaded(tso->cap,tso,NULL);
2816 }
2817 }
2818
2819 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2820 static void
2821 deleteThread_(Capability *cap, StgTSO *tso)
2822 { // for forkProcess only:
2823 // like deleteThread(), but we delete threads in foreign calls, too.
2824
2825 if (tso->why_blocked == BlockedOnCCall ||
2826 tso->why_blocked == BlockedOnCCall_Interruptible) {
2827 tso->what_next = ThreadKilled;
2828 appendToRunQueue(tso->cap, tso);
2829 } else {
2830 deleteThread(cap,tso);
2831 }
2832 }
2833 #endif
2834
2835 /* -----------------------------------------------------------------------------
2836 raiseExceptionHelper
2837
2838 This function is called by the raise# primitve, just so that we can
2839 move some of the tricky bits of raising an exception from C-- into
2840 C. Who knows, it might be a useful re-useable thing here too.
2841 -------------------------------------------------------------------------- */
2842
2843 StgWord
2844 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2845 {
2846 Capability *cap = regTableToCapability(reg);
2847 StgThunk *raise_closure = NULL;
2848 StgPtr p, next;
2849 const StgRetInfoTable *info;
2850 //
2851 // This closure represents the expression 'raise# E' where E
2852 // is the exception raise. It is used to overwrite all the
2853 // thunks which are currently under evaluation.
2854 //
2855
2856 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2857 // LDV profiling: stg_raise_info has THUNK as its closure
2858 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2859 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2860 // 1 does not cause any problem unless profiling is performed.
2861 // However, when LDV profiling goes on, we need to linearly scan
2862 // small object pool, where raise_closure is stored, so we should
2863 // use MIN_UPD_SIZE.
2864 //
2865 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2866 // sizeofW(StgClosure)+1);
2867 //
2868
2869 //
2870 // Walk up the stack, looking for the catch frame. On the way,
2871 // we update any closures pointed to from update frames with the
2872 // raise closure that we just built.
2873 //
2874 p = tso->stackobj->sp;
2875 while(1) {
2876 info = get_ret_itbl((StgClosure *)p);
2877 next = p + stack_frame_sizeW((StgClosure *)p);
2878 switch (info->i.type) {
2879
2880 case UPDATE_FRAME:
2881 // Only create raise_closure if we need to.
2882 if (raise_closure == NULL) {
2883 raise_closure =
2884 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2885 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2886 raise_closure->payload[0] = exception;
2887 }
2888 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2889 (StgClosure *)raise_closure);
2890 p = next;
2891 continue;
2892
2893 case ATOMICALLY_FRAME:
2894 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2895 tso->stackobj->sp = p;
2896 return ATOMICALLY_FRAME;
2897
2898 case CATCH_FRAME:
2899 tso->stackobj->sp = p;
2900 return CATCH_FRAME;
2901
2902 case CATCH_STM_FRAME:
2903 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2904 tso->stackobj->sp = p;
2905 return CATCH_STM_FRAME;
2906
2907 case UNDERFLOW_FRAME:
2908 tso->stackobj->sp = p;
2909 threadStackUnderflow(cap,tso);
2910 p = tso->stackobj->sp;
2911 continue;
2912
2913 case STOP_FRAME:
2914 tso->stackobj->sp = p;
2915 return STOP_FRAME;
2916
2917 case CATCH_RETRY_FRAME: {
2918 StgTRecHeader *trec = tso -> trec;
2919 StgTRecHeader *outer = trec -> enclosing_trec;
2920 debugTrace(DEBUG_stm,
2921 "found CATCH_RETRY_FRAME at %p during raise", p);
2922 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2923 stmAbortTransaction(cap, trec);
2924 stmFreeAbortedTRec(cap, trec);
2925 tso -> trec = outer;
2926 p = next;
2927 continue;
2928 }
2929
2930 default:
2931 p = next;
2932 continue;
2933 }
2934 }
2935 }
2936
2937
2938 /* -----------------------------------------------------------------------------
2939 findRetryFrameHelper
2940
2941 This function is called by the retry# primitive. It traverses the stack
2942 leaving tso->sp referring to the frame which should handle the retry.
2943
2944 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2945 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2946
2947 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2948 create) because retries are not considered to be exceptions, despite the
2949 similar implementation.
2950
2951 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2952 not be created within memory transactions.
2953 -------------------------------------------------------------------------- */
2954
2955 StgWord
2956 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2957 {
2958 const StgRetInfoTable *info;
2959 StgPtr p, next;
2960
2961 p = tso->stackobj->sp;
2962 while (1) {
2963 info = get_ret_itbl((const StgClosure *)p);
2964 next = p + stack_frame_sizeW((StgClosure *)p);
2965 switch (info->i.type) {
2966
2967 case ATOMICALLY_FRAME:
2968 debugTrace(DEBUG_stm,
2969 "found ATOMICALLY_FRAME at %p during retry", p);
2970 tso->stackobj->sp = p;
2971 return ATOMICALLY_FRAME;
2972
2973 case CATCH_RETRY_FRAME:
2974 debugTrace(DEBUG_stm,
2975 "found CATCH_RETRY_FRAME at %p during retry", p);
2976 tso->stackobj->sp = p;
2977 return CATCH_RETRY_FRAME;
2978
2979 case CATCH_STM_FRAME: {
2980 StgTRecHeader *trec = tso -> trec;
2981 StgTRecHeader *outer = trec -> enclosing_trec;
2982 debugTrace(DEBUG_stm,
2983 "found CATCH_STM_FRAME at %p during retry", p);
2984 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2985 stmAbortTransaction(cap, trec);
2986 stmFreeAbortedTRec(cap, trec);
2987 tso -> trec = outer;
2988 p = next;
2989 continue;
2990 }
2991
2992 case UNDERFLOW_FRAME:
2993 tso->stackobj->sp = p;
2994 threadStackUnderflow(cap,tso);
2995 p = tso->stackobj->sp;
2996 continue;
2997
2998 default:
2999 ASSERT(info->i.type != CATCH_FRAME);
3000 ASSERT(info->i.type != STOP_FRAME);
3001 p = next;
3002 continue;
3003 }
3004 }
3005 }
3006
3007 /* -----------------------------------------------------------------------------
3008 resurrectThreads is called after garbage collection on the list of
3009 threads found to be garbage. Each of these threads will be woken
3010 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3011 on an MVar, or NonTermination if the thread was blocked on a Black
3012 Hole.
3013
3014 Locks: assumes we hold *all* the capabilities.
3015 -------------------------------------------------------------------------- */
3016
3017 void
3018 resurrectThreads (StgTSO *threads)
3019 {
3020 StgTSO *tso, *next;
3021 Capability *cap;
3022 generation *gen;
3023
3024 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3025 next = tso->global_link;
3026
3027 gen = Bdescr((P_)tso)->gen;
3028 tso->global_link = gen->threads;
3029 gen->threads = tso;
3030
3031 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3032
3033 // Wake up the thread on the Capability it was last on
3034 cap = tso->cap;
3035
3036 switch (tso->why_blocked) {
3037 case BlockedOnMVar:
3038 case BlockedOnMVarRead:
3039 /* Called by GC - sched_mutex lock is currently held. */
3040 throwToSingleThreaded(cap, tso,
3041 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3042 break;
3043 case BlockedOnBlackHole:
3044 throwToSingleThreaded(cap, tso,
3045 (StgClosure *)nonTermination_closure);
3046 break;
3047 case BlockedOnSTM:
3048 throwToSingleThreaded(cap, tso,
3049 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3050 break;
3051 case NotBlocked:
3052 /* This might happen if the thread was blocked on a black hole
3053 * belonging to a thread that we've just woken up (raiseAsync
3054 * can wake up threads, remember...).
3055 */
3056 continue;
3057 case BlockedOnMsgThrowTo:
3058 // This can happen if the target is masking, blocks on a
3059 // black hole, and then is found to be unreachable. In
3060 // this case, we want to let the target wake up and carry
3061 // on, and do nothing to this thread.
3062 continue;
3063 default:
3064 barf("resurrectThreads: thread blocked in a strange way: %d",
3065 tso->why_blocked);
3066 }
3067 }
3068 }