rts: enable parallel GC scan of large (32M+) allocation area
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) {
709 spare_threads = 0;
710 }
711
712 // Figure out how many capabilities we want to wake up. We need at least
713 // sparkPoolSize(cap) plus the number of spare threads we have.
714 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715 if (n_wanted_caps == 0) return;
716
717 // First grab as many free Capabilities as we can. ToDo: we should use
718 // capabilities on the same NUMA node preferably, but not exclusively.
719 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
720 n_free_caps < n_wanted_caps && i != cap->no;
721 i = (i + 1) % n_capabilities) {
722 cap0 = capabilities[i];
723 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724 if (!emptyRunQueue(cap0)
725 || cap0->n_returning_tasks != 0
726 || cap0->inbox != (Message*)END_TSO_QUEUE) {
727 // it already has some work, we just grabbed it at
728 // the wrong moment. Or maybe it's deadlocked!
729 releaseCapability(cap0);
730 } else {
731 free_caps[n_free_caps++] = cap0;
732 }
733 }
734 }
735
736 // We now have n_free_caps free capabilities stashed in
737 // free_caps[]. Attempt to share our run queue equally with them.
738 // This is complicated slightly by the fact that we can't move
739 // some threads:
740 //
741 // - threads that have TSO_LOCKED cannot migrate
742 // - a thread that is bound to the current Task cannot be migrated
743 //
744 // This is about the simplest thing we could do; improvements we
745 // might want to do include:
746 //
747 // - giving high priority to moving relatively new threads, on
748 // the gournds that they haven't had time to build up a
749 // working set in the cache on this CPU/Capability.
750 //
751 // - giving low priority to moving long-lived threads
752
753 if (n_free_caps > 0) {
754 StgTSO *prev, *t, *next;
755
756 debugTrace(DEBUG_sched,
757 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
758 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
759 n_free_caps);
760
761 // There are n_free_caps+1 caps in total. We will share the threads
762 // evently between them, *except* that if the run queue does not divide
763 // evenly by n_free_caps+1 then we bias towards the current capability.
764 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
765 uint32_t keep_threads =
766 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
767
768 // This also ensures that we don't give away all our threads, since
769 // (x + y) / (y + 1) >= 1 when x >= 1.
770
771 // The number of threads we have left.
772 uint32_t n = cap->n_run_queue;
773
774 // prev = the previous thread on this cap's run queue
775 prev = END_TSO_QUEUE;
776
777 // We're going to walk through the run queue, migrating threads to other
778 // capabilities until we have only keep_threads left. We might
779 // encounter a thread that cannot be migrated, in which case we add it
780 // to the current run queue and decrement keep_threads.
781 for (t = cap->run_queue_hd, i = 0;
782 t != END_TSO_QUEUE && n > keep_threads;
783 t = next)
784 {
785 next = t->_link;
786 t->_link = END_TSO_QUEUE;
787
788 // Should we keep this thread?
789 if (t->bound == task->incall // don't move my bound thread
790 || tsoLocked(t) // don't move a locked thread
791 ) {
792 if (prev == END_TSO_QUEUE) {
793 cap->run_queue_hd = t;
794 } else {
795 setTSOLink(cap, prev, t);
796 }
797 setTSOPrev(cap, t, prev);
798 prev = t;
799 if (keep_threads > 0) keep_threads--;
800 }
801
802 // Or migrate it?
803 else {
804 appendToRunQueue(free_caps[i],t);
805 traceEventMigrateThread (cap, t, free_caps[i]->no);
806
807 if (t->bound) { t->bound->task->cap = free_caps[i]; }
808 t->cap = free_caps[i];
809 n--; // we have one fewer threads now
810 i++; // move on to the next free_cap
811 if (i == n_free_caps) i = 0;
812 }
813 }
814
815 // Join up the beginning of the queue (prev)
816 // with the rest of the queue (t)
817 if (t == END_TSO_QUEUE) {
818 cap->run_queue_tl = prev;
819 } else {
820 setTSOPrev(cap, t, prev);
821 }
822 if (prev == END_TSO_QUEUE) {
823 cap->run_queue_hd = t;
824 } else {
825 setTSOLink(cap, prev, t);
826 }
827 cap->n_run_queue = n;
828
829 IF_DEBUG(sanity, checkRunQueue(cap));
830
831 // release the capabilities
832 for (i = 0; i < n_free_caps; i++) {
833 task->cap = free_caps[i];
834 if (sparkPoolSizeCap(cap) > 0) {
835 // If we have sparks to steal, wake up a worker on the
836 // capability, even if it has no threads to run.
837 releaseAndWakeupCapability(free_caps[i]);
838 } else {
839 releaseCapability(free_caps[i]);
840 }
841 }
842 }
843 task->cap = cap; // reset to point to our Capability.
844
845 #endif /* THREADED_RTS */
846
847 }
848
849 /* ----------------------------------------------------------------------------
850 * Start any pending signal handlers
851 * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
858 // safe outside the lock
859 startSignalHandlers(cap);
860 }
861 }
862 #else
863 static void
864 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
865 {
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870 * Check for blocked threads that can be woken up.
871 * ------------------------------------------------------------------------- */
872
873 static void
874 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
875 {
876 #if !defined(THREADED_RTS)
877 //
878 // Check whether any waiting threads need to be woken up. If the
879 // run queue is empty, and there are no other tasks running, we
880 // can wait indefinitely for something to happen.
881 //
882 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
883 {
884 awaitEvent (emptyRunQueue(cap));
885 }
886 #endif
887 }
888
889 /* ----------------------------------------------------------------------------
890 * Detect deadlock conditions and attempt to resolve them.
891 * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability **pcap, Task *task)
895 {
896 Capability *cap = *pcap;
897 /*
898 * Detect deadlock: when we have no threads to run, there are no
899 * threads blocked, waiting for I/O, or sleeping, and all the
900 * other tasks are waiting for work, we must have a deadlock of
901 * some description.
902 */
903 if ( emptyThreadQueues(cap) )
904 {
905 #if defined(THREADED_RTS)
906 /*
907 * In the threaded RTS, we only check for deadlock if there
908 * has been no activity in a complete timeslice. This means
909 * we won't eagerly start a full GC just because we don't have
910 * any threads to run currently.
911 */
912 if (recent_activity != ACTIVITY_INACTIVE) return;
913 #endif
914
915 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
916
917 // Garbage collection can release some new threads due to
918 // either (a) finalizers or (b) threads resurrected because
919 // they are unreachable and will therefore be sent an
920 // exception. Any threads thus released will be immediately
921 // runnable.
922 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
923 cap = *pcap;
924 // when force_major == rtsTrue. scheduleDoGC sets
925 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
926 // signal.
927
928 if ( !emptyRunQueue(cap) ) return;
929
930 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
931 /* If we have user-installed signal handlers, then wait
932 * for signals to arrive rather then bombing out with a
933 * deadlock.
934 */
935 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
936 debugTrace(DEBUG_sched,
937 "still deadlocked, waiting for signals...");
938
939 awaitUserSignals();
940
941 if (signals_pending()) {
942 startSignalHandlers(cap);
943 }
944
945 // either we have threads to run, or we were interrupted:
946 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
947
948 return;
949 }
950 #endif
951
952 #if !defined(THREADED_RTS)
953 /* Probably a real deadlock. Send the current main thread the
954 * Deadlock exception.
955 */
956 if (task->incall->tso) {
957 switch (task->incall->tso->why_blocked) {
958 case BlockedOnSTM:
959 case BlockedOnBlackHole:
960 case BlockedOnMsgThrowTo:
961 case BlockedOnMVar:
962 case BlockedOnMVarRead:
963 throwToSingleThreaded(cap, task->incall->tso,
964 (StgClosure *)nonTermination_closure);
965 return;
966 default:
967 barf("deadlock: main thread blocked in a strange way");
968 }
969 }
970 return;
971 #endif
972 }
973 }
974
975
976 /* ----------------------------------------------------------------------------
977 * Process message in the current Capability's inbox
978 * ------------------------------------------------------------------------- */
979
980 static void
981 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
982 {
983 #if defined(THREADED_RTS)
984 Message *m, *next;
985 int r;
986 Capability *cap = *pcap;
987
988 while (!emptyInbox(cap)) {
989 if (cap->r.rCurrentNursery->link == NULL ||
990 g0->n_new_large_words >= large_alloc_lim) {
991 scheduleDoGC(pcap, cap->running_task, rtsFalse);
992 cap = *pcap;
993 }
994
995 // don't use a blocking acquire; if the lock is held by
996 // another thread then just carry on. This seems to avoid
997 // getting stuck in a message ping-pong situation with other
998 // processors. We'll check the inbox again later anyway.
999 //
1000 // We should really use a more efficient queue data structure
1001 // here. The trickiness is that we must ensure a Capability
1002 // never goes idle if the inbox is non-empty, which is why we
1003 // use cap->lock (cap->lock is released as the last thing
1004 // before going idle; see Capability.c:releaseCapability()).
1005 r = TRY_ACQUIRE_LOCK(&cap->lock);
1006 if (r != 0) return;
1007
1008 m = cap->inbox;
1009 cap->inbox = (Message*)END_TSO_QUEUE;
1010
1011 RELEASE_LOCK(&cap->lock);
1012
1013 while (m != (Message*)END_TSO_QUEUE) {
1014 next = m->link;
1015 executeMessage(cap, m);
1016 m = next;
1017 }
1018 }
1019 #endif
1020 }
1021
1022 /* ----------------------------------------------------------------------------
1023 * Activate spark threads (THREADED_RTS)
1024 * ------------------------------------------------------------------------- */
1025
1026 #if defined(THREADED_RTS)
1027 static void
1028 scheduleActivateSpark(Capability *cap)
1029 {
1030 if (anySparks() && !cap->disabled)
1031 {
1032 createSparkThread(cap);
1033 debugTrace(DEBUG_sched, "creating a spark thread");
1034 }
1035 }
1036 #endif // THREADED_RTS
1037
1038 /* ----------------------------------------------------------------------------
1039 * After running a thread...
1040 * ------------------------------------------------------------------------- */
1041
1042 static void
1043 schedulePostRunThread (Capability *cap, StgTSO *t)
1044 {
1045 // We have to be able to catch transactions that are in an
1046 // infinite loop as a result of seeing an inconsistent view of
1047 // memory, e.g.
1048 //
1049 // atomically $ do
1050 // [a,b] <- mapM readTVar [ta,tb]
1051 // when (a == b) loop
1052 //
1053 // and a is never equal to b given a consistent view of memory.
1054 //
1055 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1056 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1057 debugTrace(DEBUG_sched | DEBUG_stm,
1058 "trec %p found wasting its time", t);
1059
1060 // strip the stack back to the
1061 // ATOMICALLY_FRAME, aborting the (nested)
1062 // transaction, and saving the stack of any
1063 // partially-evaluated thunks on the heap.
1064 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1065
1066 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1067 }
1068 }
1069
1070 //
1071 // If the current thread's allocation limit has run out, send it
1072 // the AllocationLimitExceeded exception.
1073
1074 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1075 // Use a throwToSelf rather than a throwToSingleThreaded, because
1076 // it correctly handles the case where the thread is currently
1077 // inside mask. Also the thread might be blocked (e.g. on an
1078 // MVar), and throwToSingleThreaded doesn't unblock it
1079 // correctly in that case.
1080 throwToSelf(cap, t, allocationLimitExceeded_closure);
1081 ASSIGN_Int64((W_*)&(t->alloc_limit),
1082 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1083 }
1084
1085 /* some statistics gathering in the parallel case */
1086 }
1087
1088 /* -----------------------------------------------------------------------------
1089 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1090 * -------------------------------------------------------------------------- */
1091
1092 static rtsBool
1093 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1094 {
1095 if (cap->r.rHpLim == NULL || cap->context_switch) {
1096 // Sometimes we miss a context switch, e.g. when calling
1097 // primitives in a tight loop, MAYBE_GC() doesn't check the
1098 // context switch flag, and we end up waiting for a GC.
1099 // See #1984, and concurrent/should_run/1984
1100 cap->context_switch = 0;
1101 appendToRunQueue(cap,t);
1102 } else {
1103 pushOnRunQueue(cap,t);
1104 }
1105
1106 // did the task ask for a large block?
1107 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1108 // if so, get one and push it on the front of the nursery.
1109 bdescr *bd;
1110 W_ blocks;
1111
1112 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1113
1114 if (blocks > BLOCKS_PER_MBLOCK) {
1115 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1116 }
1117
1118 debugTrace(DEBUG_sched,
1119 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1120 (long)t->id, what_next_strs[t->what_next], blocks);
1121
1122 // don't do this if the nursery is (nearly) full, we'll GC first.
1123 if (cap->r.rCurrentNursery->link != NULL ||
1124 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1125 // infinite loop if the
1126 // nursery has only one
1127 // block.
1128
1129 bd = allocGroupOnNode_lock(cap->node,blocks);
1130 cap->r.rNursery->n_blocks += blocks;
1131
1132 // link the new group after CurrentNursery
1133 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1134
1135 // initialise it as a nursery block. We initialise the
1136 // step, gen_no, and flags field of *every* sub-block in
1137 // this large block, because this is easier than making
1138 // sure that we always find the block head of a large
1139 // block whenever we call Bdescr() (eg. evacuate() and
1140 // isAlive() in the GC would both have to do this, at
1141 // least).
1142 {
1143 bdescr *x;
1144 for (x = bd; x < bd + blocks; x++) {
1145 initBdescr(x,g0,g0);
1146 x->free = x->start;
1147 x->flags = 0;
1148 }
1149 }
1150
1151 // This assert can be a killer if the app is doing lots
1152 // of large block allocations.
1153 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1154
1155 // now update the nursery to point to the new block
1156 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1157 cap->r.rCurrentNursery = bd;
1158
1159 // we might be unlucky and have another thread get on the
1160 // run queue before us and steal the large block, but in that
1161 // case the thread will just end up requesting another large
1162 // block.
1163 return rtsFalse; /* not actually GC'ing */
1164 }
1165 }
1166
1167 // if we got here because we exceeded large_alloc_lim, then
1168 // proceed straight to GC.
1169 if (g0->n_new_large_words >= large_alloc_lim) {
1170 return rtsTrue;
1171 }
1172
1173 // Otherwise, we just ran out of space in the current nursery.
1174 // Grab another nursery if we can.
1175 if (getNewNursery(cap)) {
1176 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1177 return rtsFalse;
1178 }
1179
1180 return rtsTrue;
1181 /* actual GC is done at the end of the while loop in schedule() */
1182 }
1183
1184 /* -----------------------------------------------------------------------------
1185 * Handle a thread that returned to the scheduler with ThreadYielding
1186 * -------------------------------------------------------------------------- */
1187
1188 static rtsBool
1189 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1190 {
1191 /* put the thread back on the run queue. Then, if we're ready to
1192 * GC, check whether this is the last task to stop. If so, wake
1193 * up the GC thread. getThread will block during a GC until the
1194 * GC is finished.
1195 */
1196
1197 ASSERT(t->_link == END_TSO_QUEUE);
1198
1199 // Shortcut if we're just switching evaluators: don't bother
1200 // doing stack squeezing (which can be expensive), just run the
1201 // thread.
1202 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1203 debugTrace(DEBUG_sched,
1204 "--<< thread %ld (%s) stopped to switch evaluators",
1205 (long)t->id, what_next_strs[t->what_next]);
1206 return rtsTrue;
1207 }
1208
1209 // Reset the context switch flag. We don't do this just before
1210 // running the thread, because that would mean we would lose ticks
1211 // during GC, which can lead to unfair scheduling (a thread hogs
1212 // the CPU because the tick always arrives during GC). This way
1213 // penalises threads that do a lot of allocation, but that seems
1214 // better than the alternative.
1215 if (cap->context_switch != 0) {
1216 cap->context_switch = 0;
1217 appendToRunQueue(cap,t);
1218 } else {
1219 pushOnRunQueue(cap,t);
1220 }
1221
1222 IF_DEBUG(sanity,
1223 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1224 checkTSO(t));
1225
1226 return rtsFalse;
1227 }
1228
1229 /* -----------------------------------------------------------------------------
1230 * Handle a thread that returned to the scheduler with ThreadBlocked
1231 * -------------------------------------------------------------------------- */
1232
1233 static void
1234 scheduleHandleThreadBlocked( StgTSO *t
1235 #if !defined(DEBUG)
1236 STG_UNUSED
1237 #endif
1238 )
1239 {
1240
1241 // We don't need to do anything. The thread is blocked, and it
1242 // has tidied up its stack and placed itself on whatever queue
1243 // it needs to be on.
1244
1245 // ASSERT(t->why_blocked != NotBlocked);
1246 // Not true: for example,
1247 // - the thread may have woken itself up already, because
1248 // threadPaused() might have raised a blocked throwTo
1249 // exception, see maybePerformBlockedException().
1250
1251 #ifdef DEBUG
1252 traceThreadStatus(DEBUG_sched, t);
1253 #endif
1254 }
1255
1256 /* -----------------------------------------------------------------------------
1257 * Handle a thread that returned to the scheduler with ThreadFinished
1258 * -------------------------------------------------------------------------- */
1259
1260 static rtsBool
1261 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1262 {
1263 /* Need to check whether this was a main thread, and if so,
1264 * return with the return value.
1265 *
1266 * We also end up here if the thread kills itself with an
1267 * uncaught exception, see Exception.cmm.
1268 */
1269
1270 // blocked exceptions can now complete, even if the thread was in
1271 // blocked mode (see #2910).
1272 awakenBlockedExceptionQueue (cap, t);
1273
1274 //
1275 // Check whether the thread that just completed was a bound
1276 // thread, and if so return with the result.
1277 //
1278 // There is an assumption here that all thread completion goes
1279 // through this point; we need to make sure that if a thread
1280 // ends up in the ThreadKilled state, that it stays on the run
1281 // queue so it can be dealt with here.
1282 //
1283
1284 if (t->bound) {
1285
1286 if (t->bound != task->incall) {
1287 #if !defined(THREADED_RTS)
1288 // Must be a bound thread that is not the topmost one. Leave
1289 // it on the run queue until the stack has unwound to the
1290 // point where we can deal with this. Leaving it on the run
1291 // queue also ensures that the garbage collector knows about
1292 // this thread and its return value (it gets dropped from the
1293 // step->threads list so there's no other way to find it).
1294 appendToRunQueue(cap,t);
1295 return rtsFalse;
1296 #else
1297 // this cannot happen in the threaded RTS, because a
1298 // bound thread can only be run by the appropriate Task.
1299 barf("finished bound thread that isn't mine");
1300 #endif
1301 }
1302
1303 ASSERT(task->incall->tso == t);
1304
1305 if (t->what_next == ThreadComplete) {
1306 if (task->incall->ret) {
1307 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1308 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1309 }
1310 task->incall->rstat = Success;
1311 } else {
1312 if (task->incall->ret) {
1313 *(task->incall->ret) = NULL;
1314 }
1315 if (sched_state >= SCHED_INTERRUPTING) {
1316 if (heap_overflow) {
1317 task->incall->rstat = HeapExhausted;
1318 } else {
1319 task->incall->rstat = Interrupted;
1320 }
1321 } else {
1322 task->incall->rstat = Killed;
1323 }
1324 }
1325 #ifdef DEBUG
1326 removeThreadLabel((StgWord)task->incall->tso->id);
1327 #endif
1328
1329 // We no longer consider this thread and task to be bound to
1330 // each other. The TSO lives on until it is GC'd, but the
1331 // task is about to be released by the caller, and we don't
1332 // want anyone following the pointer from the TSO to the
1333 // defunct task (which might have already been
1334 // re-used). This was a real bug: the GC updated
1335 // tso->bound->tso which lead to a deadlock.
1336 t->bound = NULL;
1337 task->incall->tso = NULL;
1338
1339 return rtsTrue; // tells schedule() to return
1340 }
1341
1342 return rtsFalse;
1343 }
1344
1345 /* -----------------------------------------------------------------------------
1346 * Perform a heap census
1347 * -------------------------------------------------------------------------- */
1348
1349 static rtsBool
1350 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1351 {
1352 // When we have +RTS -i0 and we're heap profiling, do a census at
1353 // every GC. This lets us get repeatable runs for debugging.
1354 if (performHeapProfile ||
1355 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1356 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1357 return rtsTrue;
1358 } else {
1359 return rtsFalse;
1360 }
1361 }
1362
1363 /* -----------------------------------------------------------------------------
1364 * stopAllCapabilities()
1365 *
1366 * Stop all Haskell execution. This is used when we need to make some global
1367 * change to the system, such as altering the number of capabilities, or
1368 * forking.
1369 *
1370 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1371 * -------------------------------------------------------------------------- */
1372
1373 #if defined(THREADED_RTS)
1374 static void stopAllCapabilities (Capability **pCap, Task *task)
1375 {
1376 rtsBool was_syncing;
1377 SyncType prev_sync_type;
1378
1379 PendingSync sync = {
1380 .type = SYNC_OTHER,
1381 .idle = NULL,
1382 .task = task
1383 };
1384
1385 do {
1386 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1387 } while (was_syncing);
1388
1389 acquireAllCapabilities(*pCap,task);
1390
1391 pending_sync = 0;
1392 }
1393 #endif
1394
1395 /* -----------------------------------------------------------------------------
1396 * requestSync()
1397 *
1398 * Commence a synchronisation between all capabilities. Normally not called
1399 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1400 * has some special synchronisation requirements.
1401 *
1402 * Returns:
1403 * rtsFalse if we successfully got a sync
1404 * rtsTrue if there was another sync request in progress,
1405 * and we yielded to it. The value returned is the
1406 * type of the other sync request.
1407 * -------------------------------------------------------------------------- */
1408
1409 #if defined(THREADED_RTS)
1410 static rtsBool requestSync (
1411 Capability **pcap, Task *task, PendingSync *new_sync,
1412 SyncType *prev_sync_type)
1413 {
1414 PendingSync *sync;
1415
1416 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1417 (StgWord)NULL,
1418 (StgWord)new_sync);
1419
1420 if (sync != NULL)
1421 {
1422 // sync is valid until we have called yieldCapability().
1423 // After the sync is completed, we cannot read that struct any
1424 // more because it has been freed.
1425 *prev_sync_type = sync->type;
1426 do {
1427 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1428 sync->type);
1429 ASSERT(*pcap);
1430 yieldCapability(pcap,task,rtsTrue);
1431 sync = pending_sync;
1432 } while (sync != NULL);
1433
1434 // NOTE: task->cap might have changed now
1435 return rtsTrue;
1436 }
1437 else
1438 {
1439 return rtsFalse;
1440 }
1441 }
1442 #endif
1443
1444 /* -----------------------------------------------------------------------------
1445 * acquireAllCapabilities()
1446 *
1447 * Grab all the capabilities except the one we already hold. Used
1448 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1449 * before a fork (SYNC_OTHER).
1450 *
1451 * Only call this after requestSync(), otherwise a deadlock might
1452 * ensue if another thread is trying to synchronise.
1453 * -------------------------------------------------------------------------- */
1454
1455 #ifdef THREADED_RTS
1456 static void acquireAllCapabilities(Capability *cap, Task *task)
1457 {
1458 Capability *tmpcap;
1459 uint32_t i;
1460
1461 ASSERT(pending_sync != NULL);
1462 for (i=0; i < n_capabilities; i++) {
1463 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1464 i, n_capabilities);
1465 tmpcap = capabilities[i];
1466 if (tmpcap != cap) {
1467 // we better hope this task doesn't get migrated to
1468 // another Capability while we're waiting for this one.
1469 // It won't, because load balancing happens while we have
1470 // all the Capabilities, but even so it's a slightly
1471 // unsavoury invariant.
1472 task->cap = tmpcap;
1473 waitForCapability(&tmpcap, task);
1474 if (tmpcap->no != i) {
1475 barf("acquireAllCapabilities: got the wrong capability");
1476 }
1477 }
1478 }
1479 task->cap = cap;
1480 }
1481 #endif
1482
1483 /* -----------------------------------------------------------------------------
1484 * releaseAllcapabilities()
1485 *
1486 * Assuming this thread holds all the capabilities, release them all except for
1487 * the one passed in as cap.
1488 * -------------------------------------------------------------------------- */
1489
1490 #ifdef THREADED_RTS
1491 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1492 {
1493 uint32_t i;
1494
1495 for (i = 0; i < n; i++) {
1496 if (cap->no != i) {
1497 task->cap = capabilities[i];
1498 releaseCapability(capabilities[i]);
1499 }
1500 }
1501 task->cap = cap;
1502 }
1503 #endif
1504
1505 /* -----------------------------------------------------------------------------
1506 * Perform a garbage collection if necessary
1507 * -------------------------------------------------------------------------- */
1508
1509 static void
1510 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1511 rtsBool force_major)
1512 {
1513 Capability *cap = *pcap;
1514 rtsBool heap_census;
1515 uint32_t collect_gen;
1516 rtsBool major_gc;
1517 #ifdef THREADED_RTS
1518 uint32_t gc_type;
1519 uint32_t i;
1520 uint32_t need_idle;
1521 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1522 StgTSO *tso;
1523 rtsBool *idle_cap;
1524 #endif
1525
1526 if (sched_state == SCHED_SHUTTING_DOWN) {
1527 // The final GC has already been done, and the system is
1528 // shutting down. We'll probably deadlock if we try to GC
1529 // now.
1530 return;
1531 }
1532
1533 heap_census = scheduleNeedHeapProfile(rtsTrue);
1534
1535 // Figure out which generation we are collecting, so that we can
1536 // decide whether this is a parallel GC or not.
1537 collect_gen = calcNeeded(force_major || heap_census, NULL);
1538 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1539
1540 #ifdef THREADED_RTS
1541 if (sched_state < SCHED_INTERRUPTING
1542 && RtsFlags.ParFlags.parGcEnabled
1543 && collect_gen >= RtsFlags.ParFlags.parGcGen
1544 && ! oldest_gen->mark)
1545 {
1546 gc_type = SYNC_GC_PAR;
1547 } else {
1548 gc_type = SYNC_GC_SEQ;
1549 }
1550
1551 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1552 need_idle = stg_max(0, enabled_capabilities -
1553 RtsFlags.ParFlags.parGcThreads);
1554 } else {
1555 need_idle = 0;
1556 }
1557
1558 // In order to GC, there must be no threads running Haskell code.
1559 // Therefore, the GC thread needs to hold *all* the capabilities,
1560 // and release them after the GC has completed.
1561 //
1562 // This seems to be the simplest way: previous attempts involved
1563 // making all the threads with capabilities give up their
1564 // capabilities and sleep except for the *last* one, which
1565 // actually did the GC. But it's quite hard to arrange for all
1566 // the other tasks to sleep and stay asleep.
1567 //
1568
1569 /* Other capabilities are prevented from running yet more Haskell
1570 threads if pending_sync is set. Tested inside
1571 yieldCapability() and releaseCapability() in Capability.c */
1572
1573 PendingSync sync = {
1574 .type = gc_type,
1575 .idle = NULL,
1576 .task = task
1577 };
1578
1579 {
1580 SyncType prev_sync = 0;
1581 rtsBool was_syncing;
1582 do {
1583 // We need an array of size n_capabilities, but since this may
1584 // change each time around the loop we must allocate it afresh.
1585 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1586 sizeof(rtsBool),
1587 "scheduleDoGC");
1588 sync.idle = idle_cap;
1589
1590 // When using +RTS -qn, we need some capabilities to be idle during
1591 // GC. The best bet is to choose some inactive ones, so we look for
1592 // those first:
1593 uint32_t n_idle = need_idle;
1594 for (i=0; i < n_capabilities; i++) {
1595 if (capabilities[i]->disabled) {
1596 idle_cap[i] = rtsTrue;
1597 } else if (n_idle > 0 &&
1598 capabilities[i]->running_task == NULL) {
1599 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1600 n_idle--;
1601 idle_cap[i] = rtsTrue;
1602 } else {
1603 idle_cap[i] = rtsFalse;
1604 }
1605 }
1606 // If we didn't find enough inactive capabilities, just pick some
1607 // more to be idle.
1608 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1609 if (!idle_cap[i] && i != cap->no) {
1610 idle_cap[i] = rtsTrue;
1611 n_idle--;
1612 }
1613 }
1614 ASSERT(n_idle == 0);
1615
1616 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1617 cap = *pcap;
1618 if (was_syncing) {
1619 stgFree(idle_cap);
1620 }
1621 if (was_syncing &&
1622 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1623 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1624 // someone else had a pending sync request for a GC, so
1625 // let's assume GC has been done and we don't need to GC
1626 // again.
1627 // Exception to this: if SCHED_INTERRUPTING, then we still
1628 // need to do the final GC.
1629 return;
1630 }
1631 if (sched_state == SCHED_SHUTTING_DOWN) {
1632 // The scheduler might now be shutting down. We tested
1633 // this above, but it might have become true since then as
1634 // we yielded the capability in requestSync().
1635 return;
1636 }
1637 } while (was_syncing);
1638 }
1639
1640 stat_startGCSync(gc_threads[cap->no]);
1641
1642 #ifdef DEBUG
1643 unsigned int old_n_capabilities = n_capabilities;
1644 #endif
1645
1646 interruptAllCapabilities();
1647
1648 // The final shutdown GC is always single-threaded, because it's
1649 // possible that some of the Capabilities have no worker threads.
1650
1651 if (gc_type == SYNC_GC_SEQ) {
1652 traceEventRequestSeqGc(cap);
1653 } else {
1654 traceEventRequestParGc(cap);
1655 }
1656
1657 if (gc_type == SYNC_GC_SEQ) {
1658 // single-threaded GC: grab all the capabilities
1659 acquireAllCapabilities(cap,task);
1660 }
1661 else
1662 {
1663 // If we are load-balancing collections in this
1664 // generation, then we require all GC threads to participate
1665 // in the collection. Otherwise, we only require active
1666 // threads to participate, and we set gc_threads[i]->idle for
1667 // any idle capabilities. The rationale here is that waking
1668 // up an idle Capability takes much longer than just doing any
1669 // GC work on its behalf.
1670
1671 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1672 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1673 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1674 {
1675 for (i=0; i < n_capabilities; i++) {
1676 if (capabilities[i]->disabled) {
1677 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1678 if (idle_cap[i]) {
1679 n_idle_caps++;
1680 }
1681 } else {
1682 if (i != cap->no && idle_cap[i]) {
1683 Capability *tmpcap = capabilities[i];
1684 task->cap = tmpcap;
1685 waitForCapability(&tmpcap, task);
1686 n_idle_caps++;
1687 }
1688 }
1689 }
1690 }
1691 else
1692 {
1693 for (i=0; i < n_capabilities; i++) {
1694 if (capabilities[i]->disabled) {
1695 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1696 if (idle_cap[i]) {
1697 n_idle_caps++;
1698 }
1699 } else if (i != cap->no &&
1700 capabilities[i]->idle >=
1701 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1702 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1703 if (idle_cap[i]) {
1704 n_idle_caps++;
1705 } else {
1706 n_failed_trygrab_idles++;
1707 }
1708 }
1709 }
1710 }
1711 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1712
1713 // We set the gc_thread[i]->idle flag if that
1714 // capability/thread is not participating in this collection.
1715 // We also keep a local record of which capabilities are idle
1716 // in idle_cap[], because scheduleDoGC() is re-entrant:
1717 // another thread might start a GC as soon as we've finished
1718 // this one, and thus the gc_thread[]->idle flags are invalid
1719 // as soon as we release any threads after GC. Getting this
1720 // wrong leads to a rare and hard to debug deadlock!
1721
1722 for (i=0; i < n_capabilities; i++) {
1723 gc_threads[i]->idle = idle_cap[i];
1724 capabilities[i]->idle++;
1725 }
1726
1727 // For all capabilities participating in this GC, wait until
1728 // they have stopped mutating and are standing by for GC.
1729 waitForGcThreads(cap);
1730
1731 #if defined(THREADED_RTS)
1732 // Stable point where we can do a global check on our spark counters
1733 ASSERT(checkSparkCountInvariant());
1734 #endif
1735 }
1736
1737 #endif
1738
1739 IF_DEBUG(scheduler, printAllThreads());
1740
1741 delete_threads_and_gc:
1742 /*
1743 * We now have all the capabilities; if we're in an interrupting
1744 * state, then we should take the opportunity to delete all the
1745 * threads in the system.
1746 * Checking for major_gc ensures that the last GC is major.
1747 */
1748 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1749 deleteAllThreads(cap);
1750 #if defined(THREADED_RTS)
1751 // Discard all the sparks from every Capability. Why?
1752 // They'll probably be GC'd anyway since we've killed all the
1753 // threads. It just avoids the GC having to do any work to
1754 // figure out that any remaining sparks are garbage.
1755 for (i = 0; i < n_capabilities; i++) {
1756 capabilities[i]->spark_stats.gcd +=
1757 sparkPoolSize(capabilities[i]->sparks);
1758 // No race here since all Caps are stopped.
1759 discardSparksCap(capabilities[i]);
1760 }
1761 #endif
1762 sched_state = SCHED_SHUTTING_DOWN;
1763 }
1764
1765 /*
1766 * When there are disabled capabilities, we want to migrate any
1767 * threads away from them. Normally this happens in the
1768 * scheduler's loop, but only for unbound threads - it's really
1769 * hard for a bound thread to migrate itself. So we have another
1770 * go here.
1771 */
1772 #if defined(THREADED_RTS)
1773 for (i = enabled_capabilities; i < n_capabilities; i++) {
1774 Capability *tmp_cap, *dest_cap;
1775 tmp_cap = capabilities[i];
1776 ASSERT(tmp_cap->disabled);
1777 if (i != cap->no) {
1778 dest_cap = capabilities[i % enabled_capabilities];
1779 while (!emptyRunQueue(tmp_cap)) {
1780 tso = popRunQueue(tmp_cap);
1781 migrateThread(tmp_cap, tso, dest_cap);
1782 if (tso->bound) {
1783 traceTaskMigrate(tso->bound->task,
1784 tso->bound->task->cap,
1785 dest_cap);
1786 tso->bound->task->cap = dest_cap;
1787 }
1788 }
1789 }
1790 }
1791 #endif
1792
1793 #if defined(THREADED_RTS)
1794 // reset pending_sync *before* GC, so that when the GC threads
1795 // emerge they don't immediately re-enter the GC.
1796 pending_sync = 0;
1797 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1798 #else
1799 GarbageCollect(collect_gen, heap_census, 0, cap);
1800 #endif
1801
1802 traceSparkCounters(cap);
1803
1804 switch (recent_activity) {
1805 case ACTIVITY_INACTIVE:
1806 if (force_major) {
1807 // We are doing a GC because the system has been idle for a
1808 // timeslice and we need to check for deadlock. Record the
1809 // fact that we've done a GC and turn off the timer signal;
1810 // it will get re-enabled if we run any threads after the GC.
1811 recent_activity = ACTIVITY_DONE_GC;
1812 #ifndef PROFILING
1813 stopTimer();
1814 #endif
1815 break;
1816 }
1817 // fall through...
1818
1819 case ACTIVITY_MAYBE_NO:
1820 // the GC might have taken long enough for the timer to set
1821 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1822 // but we aren't necessarily deadlocked:
1823 recent_activity = ACTIVITY_YES;
1824 break;
1825
1826 case ACTIVITY_DONE_GC:
1827 // If we are actually active, the scheduler will reset the
1828 // recent_activity flag and re-enable the timer.
1829 break;
1830 }
1831
1832 #if defined(THREADED_RTS)
1833 // Stable point where we can do a global check on our spark counters
1834 ASSERT(checkSparkCountInvariant());
1835 #endif
1836
1837 // The heap census itself is done during GarbageCollect().
1838 if (heap_census) {
1839 performHeapProfile = rtsFalse;
1840 }
1841
1842 #if defined(THREADED_RTS)
1843
1844 // If n_capabilities has changed during GC, we're in trouble.
1845 ASSERT(n_capabilities == old_n_capabilities);
1846
1847 if (gc_type == SYNC_GC_PAR)
1848 {
1849 releaseGCThreads(cap);
1850 for (i = 0; i < n_capabilities; i++) {
1851 if (i != cap->no) {
1852 if (idle_cap[i]) {
1853 ASSERT(capabilities[i]->running_task == task);
1854 task->cap = capabilities[i];
1855 releaseCapability(capabilities[i]);
1856 } else {
1857 ASSERT(capabilities[i]->running_task != task);
1858 }
1859 }
1860 }
1861 task->cap = cap;
1862 }
1863 #endif
1864
1865 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1866 // GC set the heap_overflow flag, so we should proceed with
1867 // an orderly shutdown now. Ultimately we want the main
1868 // thread to return to its caller with HeapExhausted, at which
1869 // point the caller should call hs_exit(). The first step is
1870 // to delete all the threads.
1871 //
1872 // Another way to do this would be to raise an exception in
1873 // the main thread, which we really should do because it gives
1874 // the program a chance to clean up. But how do we find the
1875 // main thread? It should presumably be the same one that
1876 // gets ^C exceptions, but that's all done on the Haskell side
1877 // (GHC.TopHandler).
1878 sched_state = SCHED_INTERRUPTING;
1879 goto delete_threads_and_gc;
1880 }
1881
1882 #ifdef SPARKBALANCE
1883 /* JB
1884 Once we are all together... this would be the place to balance all
1885 spark pools. No concurrent stealing or adding of new sparks can
1886 occur. Should be defined in Sparks.c. */
1887 balanceSparkPoolsCaps(n_capabilities, capabilities);
1888 #endif
1889
1890 #if defined(THREADED_RTS)
1891 stgFree(idle_cap);
1892
1893 if (gc_type == SYNC_GC_SEQ) {
1894 // release our stash of capabilities.
1895 releaseAllCapabilities(n_capabilities, cap, task);
1896 }
1897 #endif
1898
1899 return;
1900 }
1901
1902 /* ---------------------------------------------------------------------------
1903 * Singleton fork(). Do not copy any running threads.
1904 * ------------------------------------------------------------------------- */
1905
1906 pid_t
1907 forkProcess(HsStablePtr *entry
1908 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1909 STG_UNUSED
1910 #endif
1911 )
1912 {
1913 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1914 pid_t pid;
1915 StgTSO* t,*next;
1916 Capability *cap;
1917 uint32_t g;
1918 Task *task = NULL;
1919 uint32_t i;
1920
1921 debugTrace(DEBUG_sched, "forking!");
1922
1923 task = newBoundTask();
1924
1925 cap = NULL;
1926 waitForCapability(&cap, task);
1927
1928 #ifdef THREADED_RTS
1929 stopAllCapabilities(&cap, task);
1930 #endif
1931
1932 // no funny business: hold locks while we fork, otherwise if some
1933 // other thread is holding a lock when the fork happens, the data
1934 // structure protected by the lock will forever be in an
1935 // inconsistent state in the child. See also #1391.
1936 ACQUIRE_LOCK(&sched_mutex);
1937 ACQUIRE_LOCK(&sm_mutex);
1938 ACQUIRE_LOCK(&stable_mutex);
1939 ACQUIRE_LOCK(&task->lock);
1940
1941 for (i=0; i < n_capabilities; i++) {
1942 ACQUIRE_LOCK(&capabilities[i]->lock);
1943 }
1944
1945 #ifdef THREADED_RTS
1946 ACQUIRE_LOCK(&all_tasks_mutex);
1947 #endif
1948
1949 stopTimer(); // See #4074
1950
1951 #if defined(TRACING)
1952 flushEventLog(); // so that child won't inherit dirty file buffers
1953 #endif
1954
1955 pid = fork();
1956
1957 if (pid) { // parent
1958
1959 startTimer(); // #4074
1960
1961 RELEASE_LOCK(&sched_mutex);
1962 RELEASE_LOCK(&sm_mutex);
1963 RELEASE_LOCK(&stable_mutex);
1964 RELEASE_LOCK(&task->lock);
1965
1966 for (i=0; i < n_capabilities; i++) {
1967 releaseCapability_(capabilities[i],rtsFalse);
1968 RELEASE_LOCK(&capabilities[i]->lock);
1969 }
1970
1971 #ifdef THREADED_RTS
1972 RELEASE_LOCK(&all_tasks_mutex);
1973 #endif
1974
1975 boundTaskExiting(task);
1976
1977 // just return the pid
1978 return pid;
1979
1980 } else { // child
1981
1982 #if defined(THREADED_RTS)
1983 initMutex(&sched_mutex);
1984 initMutex(&sm_mutex);
1985 initMutex(&stable_mutex);
1986 initMutex(&task->lock);
1987
1988 for (i=0; i < n_capabilities; i++) {
1989 initMutex(&capabilities[i]->lock);
1990 }
1991
1992 initMutex(&all_tasks_mutex);
1993 #endif
1994
1995 #ifdef TRACING
1996 resetTracing();
1997 #endif
1998
1999 // Now, all OS threads except the thread that forked are
2000 // stopped. We need to stop all Haskell threads, including
2001 // those involved in foreign calls. Also we need to delete
2002 // all Tasks, because they correspond to OS threads that are
2003 // now gone.
2004
2005 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2006 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2007 next = t->global_link;
2008 // don't allow threads to catch the ThreadKilled
2009 // exception, but we do want to raiseAsync() because these
2010 // threads may be evaluating thunks that we need later.
2011 deleteThread_(t->cap,t);
2012
2013 // stop the GC from updating the InCall to point to
2014 // the TSO. This is only necessary because the
2015 // OSThread bound to the TSO has been killed, and
2016 // won't get a chance to exit in the usual way (see
2017 // also scheduleHandleThreadFinished).
2018 t->bound = NULL;
2019 }
2020 }
2021
2022 discardTasksExcept(task);
2023
2024 for (i=0; i < n_capabilities; i++) {
2025 cap = capabilities[i];
2026
2027 // Empty the run queue. It seems tempting to let all the
2028 // killed threads stay on the run queue as zombies to be
2029 // cleaned up later, but some of them may correspond to
2030 // bound threads for which the corresponding Task does not
2031 // exist.
2032 truncateRunQueue(cap);
2033 cap->n_run_queue = 0;
2034
2035 // Any suspended C-calling Tasks are no more, their OS threads
2036 // don't exist now:
2037 cap->suspended_ccalls = NULL;
2038 cap->n_suspended_ccalls = 0;
2039
2040 #if defined(THREADED_RTS)
2041 // Wipe our spare workers list, they no longer exist. New
2042 // workers will be created if necessary.
2043 cap->spare_workers = NULL;
2044 cap->n_spare_workers = 0;
2045 cap->returning_tasks_hd = NULL;
2046 cap->returning_tasks_tl = NULL;
2047 cap->n_returning_tasks = 0;
2048 #endif
2049
2050 // Release all caps except 0, we'll use that for starting
2051 // the IO manager and running the client action below.
2052 if (cap->no != 0) {
2053 task->cap = cap;
2054 releaseCapability(cap);
2055 }
2056 }
2057 cap = capabilities[0];
2058 task->cap = cap;
2059
2060 // Empty the threads lists. Otherwise, the garbage
2061 // collector may attempt to resurrect some of these threads.
2062 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2063 generations[g].threads = END_TSO_QUEUE;
2064 }
2065
2066 // On Unix, all timers are reset in the child, so we need to start
2067 // the timer again.
2068 initTimer();
2069 startTimer();
2070
2071 // TODO: need to trace various other things in the child
2072 // like startup event, capabilities, process info etc
2073 traceTaskCreate(task, cap);
2074
2075 #if defined(THREADED_RTS)
2076 ioManagerStartCap(&cap);
2077 #endif
2078
2079 rts_evalStableIO(&cap, entry, NULL); // run the action
2080 rts_checkSchedStatus("forkProcess",cap);
2081
2082 rts_unlock(cap);
2083 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2084 }
2085 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2086 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2087 #endif
2088 }
2089
2090 /* ---------------------------------------------------------------------------
2091 * Changing the number of Capabilities
2092 *
2093 * Changing the number of Capabilities is very tricky! We can only do
2094 * it with the system fully stopped, so we do a full sync with
2095 * requestSync(SYNC_OTHER) and grab all the capabilities.
2096 *
2097 * Then we resize the appropriate data structures, and update all
2098 * references to the old data structures which have now moved.
2099 * Finally we release the Capabilities we are holding, and start
2100 * worker Tasks on the new Capabilities we created.
2101 *
2102 * ------------------------------------------------------------------------- */
2103
2104 void
2105 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2106 {
2107 #if !defined(THREADED_RTS)
2108 if (new_n_capabilities != 1) {
2109 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2110 }
2111 return;
2112 #elif defined(NOSMP)
2113 if (new_n_capabilities != 1) {
2114 errorBelch("setNumCapabilities: not supported on this platform");
2115 }
2116 return;
2117 #else
2118 Task *task;
2119 Capability *cap;
2120 uint32_t n;
2121 Capability *old_capabilities = NULL;
2122 uint32_t old_n_capabilities = n_capabilities;
2123
2124 if (new_n_capabilities == enabled_capabilities) return;
2125
2126 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2127 enabled_capabilities, new_n_capabilities);
2128
2129 cap = rts_lock();
2130 task = cap->running_task;
2131
2132 stopAllCapabilities(&cap, task);
2133
2134 if (new_n_capabilities < enabled_capabilities)
2135 {
2136 // Reducing the number of capabilities: we do not actually
2137 // remove the extra capabilities, we just mark them as
2138 // "disabled". This has the following effects:
2139 //
2140 // - threads on a disabled capability are migrated away by the
2141 // scheduler loop
2142 //
2143 // - disabled capabilities do not participate in GC
2144 // (see scheduleDoGC())
2145 //
2146 // - No spark threads are created on this capability
2147 // (see scheduleActivateSpark())
2148 //
2149 // - We do not attempt to migrate threads *to* a disabled
2150 // capability (see schedulePushWork()).
2151 //
2152 // but in other respects, a disabled capability remains
2153 // alive. Threads may be woken up on a disabled capability,
2154 // but they will be immediately migrated away.
2155 //
2156 // This approach is much easier than trying to actually remove
2157 // the capability; we don't have to worry about GC data
2158 // structures, the nursery, etc.
2159 //
2160 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2161 capabilities[n]->disabled = rtsTrue;
2162 traceCapDisable(capabilities[n]);
2163 }
2164 enabled_capabilities = new_n_capabilities;
2165 }
2166 else
2167 {
2168 // Increasing the number of enabled capabilities.
2169 //
2170 // enable any disabled capabilities, up to the required number
2171 for (n = enabled_capabilities;
2172 n < new_n_capabilities && n < n_capabilities; n++) {
2173 capabilities[n]->disabled = rtsFalse;
2174 traceCapEnable(capabilities[n]);
2175 }
2176 enabled_capabilities = n;
2177
2178 if (new_n_capabilities > n_capabilities) {
2179 #if defined(TRACING)
2180 // Allocate eventlog buffers for the new capabilities. Note this
2181 // must be done before calling moreCapabilities(), because that
2182 // will emit events about creating the new capabilities and adding
2183 // them to existing capsets.
2184 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2185 #endif
2186
2187 // Resize the capabilities array
2188 // NB. after this, capabilities points somewhere new. Any pointers
2189 // of type (Capability *) are now invalid.
2190 moreCapabilities(n_capabilities, new_n_capabilities);
2191
2192 // Resize and update storage manager data structures
2193 storageAddCapabilities(n_capabilities, new_n_capabilities);
2194 }
2195 }
2196
2197 // update n_capabilities before things start running
2198 if (new_n_capabilities > n_capabilities) {
2199 n_capabilities = enabled_capabilities = new_n_capabilities;
2200 }
2201
2202 // We're done: release the original Capabilities
2203 releaseAllCapabilities(old_n_capabilities, cap,task);
2204
2205 // We can't free the old array until now, because we access it
2206 // while updating pointers in updateCapabilityRefs().
2207 if (old_capabilities) {
2208 stgFree(old_capabilities);
2209 }
2210
2211 // Notify IO manager that the number of capabilities has changed.
2212 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2213
2214 rts_unlock(cap);
2215
2216 #endif // THREADED_RTS
2217 }
2218
2219
2220
2221 /* ---------------------------------------------------------------------------
2222 * Delete all the threads in the system
2223 * ------------------------------------------------------------------------- */
2224
2225 static void
2226 deleteAllThreads ( Capability *cap )
2227 {
2228 // NOTE: only safe to call if we own all capabilities.
2229
2230 StgTSO* t, *next;
2231 uint32_t g;
2232
2233 debugTrace(DEBUG_sched,"deleting all threads");
2234 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2235 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2236 next = t->global_link;
2237 deleteThread(cap,t);
2238 }
2239 }
2240
2241 // The run queue now contains a bunch of ThreadKilled threads. We
2242 // must not throw these away: the main thread(s) will be in there
2243 // somewhere, and the main scheduler loop has to deal with it.
2244 // Also, the run queue is the only thing keeping these threads from
2245 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2246
2247 #if !defined(THREADED_RTS)
2248 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2249 ASSERT(sleeping_queue == END_TSO_QUEUE);
2250 #endif
2251 }
2252
2253 /* -----------------------------------------------------------------------------
2254 Managing the suspended_ccalls list.
2255 Locks required: sched_mutex
2256 -------------------------------------------------------------------------- */
2257
2258 STATIC_INLINE void
2259 suspendTask (Capability *cap, Task *task)
2260 {
2261 InCall *incall;
2262
2263 incall = task->incall;
2264 ASSERT(incall->next == NULL && incall->prev == NULL);
2265 incall->next = cap->suspended_ccalls;
2266 incall->prev = NULL;
2267 if (cap->suspended_ccalls) {
2268 cap->suspended_ccalls->prev = incall;
2269 }
2270 cap->suspended_ccalls = incall;
2271 cap->n_suspended_ccalls++;
2272 }
2273
2274 STATIC_INLINE void
2275 recoverSuspendedTask (Capability *cap, Task *task)
2276 {
2277 InCall *incall;
2278
2279 incall = task->incall;
2280 if (incall->prev) {
2281 incall->prev->next = incall->next;
2282 } else {
2283 ASSERT(cap->suspended_ccalls == incall);
2284 cap->suspended_ccalls = incall->next;
2285 }
2286 if (incall->next) {
2287 incall->next->prev = incall->prev;
2288 }
2289 incall->next = incall->prev = NULL;
2290 cap->n_suspended_ccalls--;
2291 }
2292
2293 /* ---------------------------------------------------------------------------
2294 * Suspending & resuming Haskell threads.
2295 *
2296 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2297 * its capability before calling the C function. This allows another
2298 * task to pick up the capability and carry on running Haskell
2299 * threads. It also means that if the C call blocks, it won't lock
2300 * the whole system.
2301 *
2302 * The Haskell thread making the C call is put to sleep for the
2303 * duration of the call, on the suspended_ccalling_threads queue. We
2304 * give out a token to the task, which it can use to resume the thread
2305 * on return from the C function.
2306 *
2307 * If this is an interruptible C call, this means that the FFI call may be
2308 * unceremoniously terminated and should be scheduled on an
2309 * unbound worker thread.
2310 * ------------------------------------------------------------------------- */
2311
2312 void *
2313 suspendThread (StgRegTable *reg, rtsBool interruptible)
2314 {
2315 Capability *cap;
2316 int saved_errno;
2317 StgTSO *tso;
2318 Task *task;
2319 #if mingw32_HOST_OS
2320 StgWord32 saved_winerror;
2321 #endif
2322
2323 saved_errno = errno;
2324 #if mingw32_HOST_OS
2325 saved_winerror = GetLastError();
2326 #endif
2327
2328 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2329 */
2330 cap = regTableToCapability(reg);
2331
2332 task = cap->running_task;
2333 tso = cap->r.rCurrentTSO;
2334
2335 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2336
2337 // XXX this might not be necessary --SDM
2338 tso->what_next = ThreadRunGHC;
2339
2340 threadPaused(cap,tso);
2341
2342 if (interruptible) {
2343 tso->why_blocked = BlockedOnCCall_Interruptible;
2344 } else {
2345 tso->why_blocked = BlockedOnCCall;
2346 }
2347
2348 // Hand back capability
2349 task->incall->suspended_tso = tso;
2350 task->incall->suspended_cap = cap;
2351
2352 // Otherwise allocate() will write to invalid memory.
2353 cap->r.rCurrentTSO = NULL;
2354
2355 ACQUIRE_LOCK(&cap->lock);
2356
2357 suspendTask(cap,task);
2358 cap->in_haskell = rtsFalse;
2359 releaseCapability_(cap,rtsFalse);
2360
2361 RELEASE_LOCK(&cap->lock);
2362
2363 errno = saved_errno;
2364 #if mingw32_HOST_OS
2365 SetLastError(saved_winerror);
2366 #endif
2367 return task;
2368 }
2369
2370 StgRegTable *
2371 resumeThread (void *task_)
2372 {
2373 StgTSO *tso;
2374 InCall *incall;
2375 Capability *cap;
2376 Task *task = task_;
2377 int saved_errno;
2378 #if mingw32_HOST_OS
2379 StgWord32 saved_winerror;
2380 #endif
2381
2382 saved_errno = errno;
2383 #if mingw32_HOST_OS
2384 saved_winerror = GetLastError();
2385 #endif
2386
2387 incall = task->incall;
2388 cap = incall->suspended_cap;
2389 task->cap = cap;
2390
2391 // Wait for permission to re-enter the RTS with the result.
2392 waitForCapability(&cap,task);
2393 // we might be on a different capability now... but if so, our
2394 // entry on the suspended_ccalls list will also have been
2395 // migrated.
2396
2397 // Remove the thread from the suspended list
2398 recoverSuspendedTask(cap,task);
2399
2400 tso = incall->suspended_tso;
2401 incall->suspended_tso = NULL;
2402 incall->suspended_cap = NULL;
2403 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2404
2405 traceEventRunThread(cap, tso);
2406
2407 /* Reset blocking status */
2408 tso->why_blocked = NotBlocked;
2409
2410 if ((tso->flags & TSO_BLOCKEX) == 0) {
2411 // avoid locking the TSO if we don't have to
2412 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2413 maybePerformBlockedException(cap,tso);
2414 }
2415 }
2416
2417 cap->r.rCurrentTSO = tso;
2418 cap->in_haskell = rtsTrue;
2419 errno = saved_errno;
2420 #if mingw32_HOST_OS
2421 SetLastError(saved_winerror);
2422 #endif
2423
2424 /* We might have GC'd, mark the TSO dirty again */
2425 dirty_TSO(cap,tso);
2426 dirty_STACK(cap,tso->stackobj);
2427
2428 IF_DEBUG(sanity, checkTSO(tso));
2429
2430 return &cap->r;
2431 }
2432
2433 /* ---------------------------------------------------------------------------
2434 * scheduleThread()
2435 *
2436 * scheduleThread puts a thread on the end of the runnable queue.
2437 * This will usually be done immediately after a thread is created.
2438 * The caller of scheduleThread must create the thread using e.g.
2439 * createThread and push an appropriate closure
2440 * on this thread's stack before the scheduler is invoked.
2441 * ------------------------------------------------------------------------ */
2442
2443 void
2444 scheduleThread(Capability *cap, StgTSO *tso)
2445 {
2446 // The thread goes at the *end* of the run-queue, to avoid possible
2447 // starvation of any threads already on the queue.
2448 appendToRunQueue(cap,tso);
2449 }
2450
2451 void
2452 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2453 {
2454 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2455 // move this thread from now on.
2456 #if defined(THREADED_RTS)
2457 cpu %= enabled_capabilities;
2458 if (cpu == cap->no) {
2459 appendToRunQueue(cap,tso);
2460 } else {
2461 migrateThread(cap, tso, capabilities[cpu]);
2462 }
2463 #else
2464 appendToRunQueue(cap,tso);
2465 #endif
2466 }
2467
2468 void
2469 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2470 {
2471 Task *task;
2472 DEBUG_ONLY( StgThreadID id );
2473 Capability *cap;
2474
2475 cap = *pcap;
2476
2477 // We already created/initialised the Task
2478 task = cap->running_task;
2479
2480 // This TSO is now a bound thread; make the Task and TSO
2481 // point to each other.
2482 tso->bound = task->incall;
2483 tso->cap = cap;
2484
2485 task->incall->tso = tso;
2486 task->incall->ret = ret;
2487 task->incall->rstat = NoStatus;
2488
2489 appendToRunQueue(cap,tso);
2490
2491 DEBUG_ONLY( id = tso->id );
2492 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2493
2494 cap = schedule(cap,task);
2495
2496 ASSERT(task->incall->rstat != NoStatus);
2497 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2498
2499 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2500 *pcap = cap;
2501 }
2502
2503 /* ----------------------------------------------------------------------------
2504 * Starting Tasks
2505 * ------------------------------------------------------------------------- */
2506
2507 #if defined(THREADED_RTS)
2508 void scheduleWorker (Capability *cap, Task *task)
2509 {
2510 // schedule() runs without a lock.
2511 cap = schedule(cap,task);
2512
2513 // On exit from schedule(), we have a Capability, but possibly not
2514 // the same one we started with.
2515
2516 // During shutdown, the requirement is that after all the
2517 // Capabilities are shut down, all workers that are shutting down
2518 // have finished workerTaskStop(). This is why we hold on to
2519 // cap->lock until we've finished workerTaskStop() below.
2520 //
2521 // There may be workers still involved in foreign calls; those
2522 // will just block in waitForCapability() because the
2523 // Capability has been shut down.
2524 //
2525 ACQUIRE_LOCK(&cap->lock);
2526 releaseCapability_(cap,rtsFalse);
2527 workerTaskStop(task);
2528 RELEASE_LOCK(&cap->lock);
2529 }
2530 #endif
2531
2532 /* ---------------------------------------------------------------------------
2533 * Start new worker tasks on Capabilities from--to
2534 * -------------------------------------------------------------------------- */
2535
2536 static void
2537 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2538 {
2539 #if defined(THREADED_RTS)
2540 uint32_t i;
2541 Capability *cap;
2542
2543 for (i = from; i < to; i++) {
2544 cap = capabilities[i];
2545 ACQUIRE_LOCK(&cap->lock);
2546 startWorkerTask(cap);
2547 RELEASE_LOCK(&cap->lock);
2548 }
2549 #endif
2550 }
2551
2552 /* ---------------------------------------------------------------------------
2553 * initScheduler()
2554 *
2555 * Initialise the scheduler. This resets all the queues - if the
2556 * queues contained any threads, they'll be garbage collected at the
2557 * next pass.
2558 *
2559 * ------------------------------------------------------------------------ */
2560
2561 void
2562 initScheduler(void)
2563 {
2564 #if !defined(THREADED_RTS)
2565 blocked_queue_hd = END_TSO_QUEUE;
2566 blocked_queue_tl = END_TSO_QUEUE;
2567 sleeping_queue = END_TSO_QUEUE;
2568 #endif
2569
2570 sched_state = SCHED_RUNNING;
2571 recent_activity = ACTIVITY_YES;
2572
2573 #if defined(THREADED_RTS)
2574 /* Initialise the mutex and condition variables used by
2575 * the scheduler. */
2576 initMutex(&sched_mutex);
2577 #endif
2578
2579 ACQUIRE_LOCK(&sched_mutex);
2580
2581 /* A capability holds the state a native thread needs in
2582 * order to execute STG code. At least one capability is
2583 * floating around (only THREADED_RTS builds have more than one).
2584 */
2585 initCapabilities();
2586
2587 initTaskManager();
2588
2589 /*
2590 * Eagerly start one worker to run each Capability, except for
2591 * Capability 0. The idea is that we're probably going to start a
2592 * bound thread on Capability 0 pretty soon, so we don't want a
2593 * worker task hogging it.
2594 */
2595 startWorkerTasks(1, n_capabilities);
2596
2597 RELEASE_LOCK(&sched_mutex);
2598
2599 }
2600
2601 void
2602 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2603 /* see Capability.c, shutdownCapability() */
2604 {
2605 Task *task = NULL;
2606
2607 task = newBoundTask();
2608
2609 // If we haven't killed all the threads yet, do it now.
2610 if (sched_state < SCHED_SHUTTING_DOWN) {
2611 sched_state = SCHED_INTERRUPTING;
2612 Capability *cap = task->cap;
2613 waitForCapability(&cap,task);
2614 scheduleDoGC(&cap,task,rtsTrue);
2615 ASSERT(task->incall->tso == NULL);
2616 releaseCapability(cap);
2617 }
2618 sched_state = SCHED_SHUTTING_DOWN;
2619
2620 shutdownCapabilities(task, wait_foreign);
2621
2622 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2623 // n_failed_trygrab_idles, n_idle_caps);
2624
2625 boundTaskExiting(task);
2626 }
2627
2628 void
2629 freeScheduler( void )
2630 {
2631 uint32_t still_running;
2632
2633 ACQUIRE_LOCK(&sched_mutex);
2634 still_running = freeTaskManager();
2635 // We can only free the Capabilities if there are no Tasks still
2636 // running. We might have a Task about to return from a foreign
2637 // call into waitForCapability(), for example (actually,
2638 // this should be the *only* thing that a still-running Task can
2639 // do at this point, and it will block waiting for the
2640 // Capability).
2641 if (still_running == 0) {
2642 freeCapabilities();
2643 }
2644 RELEASE_LOCK(&sched_mutex);
2645 #if defined(THREADED_RTS)
2646 closeMutex(&sched_mutex);
2647 #endif
2648 }
2649
2650 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2651 void *user USED_IF_NOT_THREADS)
2652 {
2653 #if !defined(THREADED_RTS)
2654 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2655 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2656 evac(user, (StgClosure **)(void *)&sleeping_queue);
2657 #endif
2658 }
2659
2660 /* -----------------------------------------------------------------------------
2661 performGC
2662
2663 This is the interface to the garbage collector from Haskell land.
2664 We provide this so that external C code can allocate and garbage
2665 collect when called from Haskell via _ccall_GC.
2666 -------------------------------------------------------------------------- */
2667
2668 static void
2669 performGC_(rtsBool force_major)
2670 {
2671 Task *task;
2672 Capability *cap = NULL;
2673
2674 // We must grab a new Task here, because the existing Task may be
2675 // associated with a particular Capability, and chained onto the
2676 // suspended_ccalls queue.
2677 task = newBoundTask();
2678
2679 // TODO: do we need to traceTask*() here?
2680
2681 waitForCapability(&cap,task);
2682 scheduleDoGC(&cap,task,force_major);
2683 releaseCapability(cap);
2684 boundTaskExiting(task);
2685 }
2686
2687 void
2688 performGC(void)
2689 {
2690 performGC_(rtsFalse);
2691 }
2692
2693 void
2694 performMajorGC(void)
2695 {
2696 performGC_(rtsTrue);
2697 }
2698
2699 /* ---------------------------------------------------------------------------
2700 Interrupt execution.
2701 Might be called inside a signal handler so it mustn't do anything fancy.
2702 ------------------------------------------------------------------------ */
2703
2704 void
2705 interruptStgRts(void)
2706 {
2707 sched_state = SCHED_INTERRUPTING;
2708 interruptAllCapabilities();
2709 #if defined(THREADED_RTS)
2710 wakeUpRts();
2711 #endif
2712 }
2713
2714 /* -----------------------------------------------------------------------------
2715 Wake up the RTS
2716
2717 This function causes at least one OS thread to wake up and run the
2718 scheduler loop. It is invoked when the RTS might be deadlocked, or
2719 an external event has arrived that may need servicing (eg. a
2720 keyboard interrupt).
2721
2722 In the single-threaded RTS we don't do anything here; we only have
2723 one thread anyway, and the event that caused us to want to wake up
2724 will have interrupted any blocking system call in progress anyway.
2725 -------------------------------------------------------------------------- */
2726
2727 #if defined(THREADED_RTS)
2728 void wakeUpRts(void)
2729 {
2730 // This forces the IO Manager thread to wakeup, which will
2731 // in turn ensure that some OS thread wakes up and runs the
2732 // scheduler loop, which will cause a GC and deadlock check.
2733 ioManagerWakeup();
2734 }
2735 #endif
2736
2737 /* -----------------------------------------------------------------------------
2738 Deleting threads
2739
2740 This is used for interruption (^C) and forking, and corresponds to
2741 raising an exception but without letting the thread catch the
2742 exception.
2743 -------------------------------------------------------------------------- */
2744
2745 static void
2746 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2747 {
2748 // NOTE: must only be called on a TSO that we have exclusive
2749 // access to, because we will call throwToSingleThreaded() below.
2750 // The TSO must be on the run queue of the Capability we own, or
2751 // we must own all Capabilities.
2752
2753 if (tso->why_blocked != BlockedOnCCall &&
2754 tso->why_blocked != BlockedOnCCall_Interruptible) {
2755 throwToSingleThreaded(tso->cap,tso,NULL);
2756 }
2757 }
2758
2759 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2760 static void
2761 deleteThread_(Capability *cap, StgTSO *tso)
2762 { // for forkProcess only:
2763 // like deleteThread(), but we delete threads in foreign calls, too.
2764
2765 if (tso->why_blocked == BlockedOnCCall ||
2766 tso->why_blocked == BlockedOnCCall_Interruptible) {
2767 tso->what_next = ThreadKilled;
2768 appendToRunQueue(tso->cap, tso);
2769 } else {
2770 deleteThread(cap,tso);
2771 }
2772 }
2773 #endif
2774
2775 /* -----------------------------------------------------------------------------
2776 raiseExceptionHelper
2777
2778 This function is called by the raise# primitve, just so that we can
2779 move some of the tricky bits of raising an exception from C-- into
2780 C. Who knows, it might be a useful re-useable thing here too.
2781 -------------------------------------------------------------------------- */
2782
2783 StgWord
2784 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2785 {
2786 Capability *cap = regTableToCapability(reg);
2787 StgThunk *raise_closure = NULL;
2788 StgPtr p, next;
2789 const StgRetInfoTable *info;
2790 //
2791 // This closure represents the expression 'raise# E' where E
2792 // is the exception raise. It is used to overwrite all the
2793 // thunks which are currently under evaluataion.
2794 //
2795
2796 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2797 // LDV profiling: stg_raise_info has THUNK as its closure
2798 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2799 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2800 // 1 does not cause any problem unless profiling is performed.
2801 // However, when LDV profiling goes on, we need to linearly scan
2802 // small object pool, where raise_closure is stored, so we should
2803 // use MIN_UPD_SIZE.
2804 //
2805 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2806 // sizeofW(StgClosure)+1);
2807 //
2808
2809 //
2810 // Walk up the stack, looking for the catch frame. On the way,
2811 // we update any closures pointed to from update frames with the
2812 // raise closure that we just built.
2813 //
2814 p = tso->stackobj->sp;
2815 while(1) {
2816 info = get_ret_itbl((StgClosure *)p);
2817 next = p + stack_frame_sizeW((StgClosure *)p);
2818 switch (info->i.type) {
2819
2820 case UPDATE_FRAME:
2821 // Only create raise_closure if we need to.
2822 if (raise_closure == NULL) {
2823 raise_closure =
2824 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2825 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2826 raise_closure->payload[0] = exception;
2827 }
2828 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2829 (StgClosure *)raise_closure);
2830 p = next;
2831 continue;
2832
2833 case ATOMICALLY_FRAME:
2834 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2835 tso->stackobj->sp = p;
2836 return ATOMICALLY_FRAME;
2837
2838 case CATCH_FRAME:
2839 tso->stackobj->sp = p;
2840 return CATCH_FRAME;
2841
2842 case CATCH_STM_FRAME:
2843 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2844 tso->stackobj->sp = p;
2845 return CATCH_STM_FRAME;
2846
2847 case UNDERFLOW_FRAME:
2848 tso->stackobj->sp = p;
2849 threadStackUnderflow(cap,tso);
2850 p = tso->stackobj->sp;
2851 continue;
2852
2853 case STOP_FRAME:
2854 tso->stackobj->sp = p;
2855 return STOP_FRAME;
2856
2857 case CATCH_RETRY_FRAME: {
2858 StgTRecHeader *trec = tso -> trec;
2859 StgTRecHeader *outer = trec -> enclosing_trec;
2860 debugTrace(DEBUG_stm,
2861 "found CATCH_RETRY_FRAME at %p during raise", p);
2862 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2863 stmAbortTransaction(cap, trec);
2864 stmFreeAbortedTRec(cap, trec);
2865 tso -> trec = outer;
2866 p = next;
2867 continue;
2868 }
2869
2870 default:
2871 p = next;
2872 continue;
2873 }
2874 }
2875 }
2876
2877
2878 /* -----------------------------------------------------------------------------
2879 findRetryFrameHelper
2880
2881 This function is called by the retry# primitive. It traverses the stack
2882 leaving tso->sp referring to the frame which should handle the retry.
2883
2884 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2885 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2886
2887 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2888 create) because retries are not considered to be exceptions, despite the
2889 similar implementation.
2890
2891 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2892 not be created within memory transactions.
2893 -------------------------------------------------------------------------- */
2894
2895 StgWord
2896 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2897 {
2898 const StgRetInfoTable *info;
2899 StgPtr p, next;
2900
2901 p = tso->stackobj->sp;
2902 while (1) {
2903 info = get_ret_itbl((const StgClosure *)p);
2904 next = p + stack_frame_sizeW((StgClosure *)p);
2905 switch (info->i.type) {
2906
2907 case ATOMICALLY_FRAME:
2908 debugTrace(DEBUG_stm,
2909 "found ATOMICALLY_FRAME at %p during retry", p);
2910 tso->stackobj->sp = p;
2911 return ATOMICALLY_FRAME;
2912
2913 case CATCH_RETRY_FRAME:
2914 debugTrace(DEBUG_stm,
2915 "found CATCH_RETRY_FRAME at %p during retry", p);
2916 tso->stackobj->sp = p;
2917 return CATCH_RETRY_FRAME;
2918
2919 case CATCH_STM_FRAME: {
2920 StgTRecHeader *trec = tso -> trec;
2921 StgTRecHeader *outer = trec -> enclosing_trec;
2922 debugTrace(DEBUG_stm,
2923 "found CATCH_STM_FRAME at %p during retry", p);
2924 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2925 stmAbortTransaction(cap, trec);
2926 stmFreeAbortedTRec(cap, trec);
2927 tso -> trec = outer;
2928 p = next;
2929 continue;
2930 }
2931
2932 case UNDERFLOW_FRAME:
2933 tso->stackobj->sp = p;
2934 threadStackUnderflow(cap,tso);
2935 p = tso->stackobj->sp;
2936 continue;
2937
2938 default:
2939 ASSERT(info->i.type != CATCH_FRAME);
2940 ASSERT(info->i.type != STOP_FRAME);
2941 p = next;
2942 continue;
2943 }
2944 }
2945 }
2946
2947 /* -----------------------------------------------------------------------------
2948 resurrectThreads is called after garbage collection on the list of
2949 threads found to be garbage. Each of these threads will be woken
2950 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2951 on an MVar, or NonTermination if the thread was blocked on a Black
2952 Hole.
2953
2954 Locks: assumes we hold *all* the capabilities.
2955 -------------------------------------------------------------------------- */
2956
2957 void
2958 resurrectThreads (StgTSO *threads)
2959 {
2960 StgTSO *tso, *next;
2961 Capability *cap;
2962 generation *gen;
2963
2964 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2965 next = tso->global_link;
2966
2967 gen = Bdescr((P_)tso)->gen;
2968 tso->global_link = gen->threads;
2969 gen->threads = tso;
2970
2971 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2972
2973 // Wake up the thread on the Capability it was last on
2974 cap = tso->cap;
2975
2976 switch (tso->why_blocked) {
2977 case BlockedOnMVar:
2978 case BlockedOnMVarRead:
2979 /* Called by GC - sched_mutex lock is currently held. */
2980 throwToSingleThreaded(cap, tso,
2981 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2982 break;
2983 case BlockedOnBlackHole:
2984 throwToSingleThreaded(cap, tso,
2985 (StgClosure *)nonTermination_closure);
2986 break;
2987 case BlockedOnSTM:
2988 throwToSingleThreaded(cap, tso,
2989 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2990 break;
2991 case NotBlocked:
2992 /* This might happen if the thread was blocked on a black hole
2993 * belonging to a thread that we've just woken up (raiseAsync
2994 * can wake up threads, remember...).
2995 */
2996 continue;
2997 case BlockedOnMsgThrowTo:
2998 // This can happen if the target is masking, blocks on a
2999 // black hole, and then is found to be unreachable. In
3000 // this case, we want to let the target wake up and carry
3001 // on, and do nothing to this thread.
3002 continue;
3003 default:
3004 barf("resurrectThreads: thread blocked in a strange way: %d",
3005 tso->why_blocked);
3006 }
3007 }
3008 }