Install toplevel handler inside fork.
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 bool heap_overflow = false;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static bool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static bool scheduleNeedHeapProfile(bool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 bool ready_to_gc;
177 #if defined(THREADED_RTS)
178 bool first = true;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,true);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = false;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = true;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = false;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = false;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,false);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE bool
625 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 bool didGcLast = false;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,false) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) {
709 spare_threads = 0;
710 }
711
712 // Figure out how many capabilities we want to wake up. We need at least
713 // sparkPoolSize(cap) plus the number of spare threads we have.
714 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715 if (n_wanted_caps == 0) return;
716
717 // First grab as many free Capabilities as we can. ToDo: we should use
718 // capabilities on the same NUMA node preferably, but not exclusively.
719 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
720 n_free_caps < n_wanted_caps && i != cap->no;
721 i = (i + 1) % n_capabilities) {
722 cap0 = capabilities[i];
723 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724 if (!emptyRunQueue(cap0)
725 || cap0->n_returning_tasks != 0
726 || !emptyInbox(cap0)) {
727 // it already has some work, we just grabbed it at
728 // the wrong moment. Or maybe it's deadlocked!
729 releaseCapability(cap0);
730 } else {
731 free_caps[n_free_caps++] = cap0;
732 }
733 }
734 }
735
736 // We now have n_free_caps free capabilities stashed in
737 // free_caps[]. Attempt to share our run queue equally with them.
738 // This is complicated slightly by the fact that we can't move
739 // some threads:
740 //
741 // - threads that have TSO_LOCKED cannot migrate
742 // - a thread that is bound to the current Task cannot be migrated
743 //
744 // This is about the simplest thing we could do; improvements we
745 // might want to do include:
746 //
747 // - giving high priority to moving relatively new threads, on
748 // the gournds that they haven't had time to build up a
749 // working set in the cache on this CPU/Capability.
750 //
751 // - giving low priority to moving long-lived threads
752
753 if (n_free_caps > 0) {
754 StgTSO *prev, *t, *next;
755
756 debugTrace(DEBUG_sched,
757 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
758 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
759 n_free_caps);
760
761 // There are n_free_caps+1 caps in total. We will share the threads
762 // evently between them, *except* that if the run queue does not divide
763 // evenly by n_free_caps+1 then we bias towards the current capability.
764 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
765 uint32_t keep_threads =
766 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
767
768 // This also ensures that we don't give away all our threads, since
769 // (x + y) / (y + 1) >= 1 when x >= 1.
770
771 // The number of threads we have left.
772 uint32_t n = cap->n_run_queue;
773
774 // prev = the previous thread on this cap's run queue
775 prev = END_TSO_QUEUE;
776
777 // We're going to walk through the run queue, migrating threads to other
778 // capabilities until we have only keep_threads left. We might
779 // encounter a thread that cannot be migrated, in which case we add it
780 // to the current run queue and decrement keep_threads.
781 for (t = cap->run_queue_hd, i = 0;
782 t != END_TSO_QUEUE && n > keep_threads;
783 t = next)
784 {
785 next = t->_link;
786 t->_link = END_TSO_QUEUE;
787
788 // Should we keep this thread?
789 if (t->bound == task->incall // don't move my bound thread
790 || tsoLocked(t) // don't move a locked thread
791 ) {
792 if (prev == END_TSO_QUEUE) {
793 cap->run_queue_hd = t;
794 } else {
795 setTSOLink(cap, prev, t);
796 }
797 setTSOPrev(cap, t, prev);
798 prev = t;
799 if (keep_threads > 0) keep_threads--;
800 }
801
802 // Or migrate it?
803 else {
804 appendToRunQueue(free_caps[i],t);
805 traceEventMigrateThread (cap, t, free_caps[i]->no);
806
807 if (t->bound) { t->bound->task->cap = free_caps[i]; }
808 t->cap = free_caps[i];
809 n--; // we have one fewer threads now
810 i++; // move on to the next free_cap
811 if (i == n_free_caps) i = 0;
812 }
813 }
814
815 // Join up the beginning of the queue (prev)
816 // with the rest of the queue (t)
817 if (t == END_TSO_QUEUE) {
818 cap->run_queue_tl = prev;
819 } else {
820 setTSOPrev(cap, t, prev);
821 }
822 if (prev == END_TSO_QUEUE) {
823 cap->run_queue_hd = t;
824 } else {
825 setTSOLink(cap, prev, t);
826 }
827 cap->n_run_queue = n;
828
829 IF_DEBUG(sanity, checkRunQueue(cap));
830
831 // release the capabilities
832 for (i = 0; i < n_free_caps; i++) {
833 task->cap = free_caps[i];
834 if (sparkPoolSizeCap(cap) > 0) {
835 // If we have sparks to steal, wake up a worker on the
836 // capability, even if it has no threads to run.
837 releaseAndWakeupCapability(free_caps[i]);
838 } else {
839 releaseCapability(free_caps[i]);
840 }
841 }
842 }
843 task->cap = cap; // reset to point to our Capability.
844
845 #endif /* THREADED_RTS */
846
847 }
848
849 /* ----------------------------------------------------------------------------
850 * Start any pending signal handlers
851 * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
858 // safe outside the lock
859 startSignalHandlers(cap);
860 }
861 }
862 #else
863 static void
864 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
865 {
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870 * Check for blocked threads that can be woken up.
871 * ------------------------------------------------------------------------- */
872
873 static void
874 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
875 {
876 #if !defined(THREADED_RTS)
877 //
878 // Check whether any waiting threads need to be woken up. If the
879 // run queue is empty, and there are no other tasks running, we
880 // can wait indefinitely for something to happen.
881 //
882 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
883 {
884 awaitEvent (emptyRunQueue(cap));
885 }
886 #endif
887 }
888
889 /* ----------------------------------------------------------------------------
890 * Detect deadlock conditions and attempt to resolve them.
891 * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability **pcap, Task *task)
895 {
896 Capability *cap = *pcap;
897 /*
898 * Detect deadlock: when we have no threads to run, there are no
899 * threads blocked, waiting for I/O, or sleeping, and all the
900 * other tasks are waiting for work, we must have a deadlock of
901 * some description.
902 */
903 if ( emptyThreadQueues(cap) )
904 {
905 #if defined(THREADED_RTS)
906 /*
907 * In the threaded RTS, we only check for deadlock if there
908 * has been no activity in a complete timeslice. This means
909 * we won't eagerly start a full GC just because we don't have
910 * any threads to run currently.
911 */
912 if (recent_activity != ACTIVITY_INACTIVE) return;
913 #endif
914
915 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
916
917 // Garbage collection can release some new threads due to
918 // either (a) finalizers or (b) threads resurrected because
919 // they are unreachable and will therefore be sent an
920 // exception. Any threads thus released will be immediately
921 // runnable.
922 scheduleDoGC (pcap, task, true/*force major GC*/);
923 cap = *pcap;
924 // when force_major == true. scheduleDoGC sets
925 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
926 // signal.
927
928 if ( !emptyRunQueue(cap) ) return;
929
930 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
931 /* If we have user-installed signal handlers, then wait
932 * for signals to arrive rather then bombing out with a
933 * deadlock.
934 */
935 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
936 debugTrace(DEBUG_sched,
937 "still deadlocked, waiting for signals...");
938
939 awaitUserSignals();
940
941 if (signals_pending()) {
942 startSignalHandlers(cap);
943 }
944
945 // either we have threads to run, or we were interrupted:
946 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
947
948 return;
949 }
950 #endif
951
952 #if !defined(THREADED_RTS)
953 /* Probably a real deadlock. Send the current main thread the
954 * Deadlock exception.
955 */
956 if (task->incall->tso) {
957 switch (task->incall->tso->why_blocked) {
958 case BlockedOnSTM:
959 case BlockedOnBlackHole:
960 case BlockedOnMsgThrowTo:
961 case BlockedOnMVar:
962 case BlockedOnMVarRead:
963 throwToSingleThreaded(cap, task->incall->tso,
964 (StgClosure *)nonTermination_closure);
965 return;
966 default:
967 barf("deadlock: main thread blocked in a strange way");
968 }
969 }
970 return;
971 #endif
972 }
973 }
974
975
976 /* ----------------------------------------------------------------------------
977 * Process message in the current Capability's inbox
978 * ------------------------------------------------------------------------- */
979
980 static void
981 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
982 {
983 #if defined(THREADED_RTS)
984 Message *m, *next;
985 PutMVar *p, *pnext;
986 int r;
987 Capability *cap = *pcap;
988
989 while (!emptyInbox(cap)) {
990 if (cap->r.rCurrentNursery->link == NULL ||
991 g0->n_new_large_words >= large_alloc_lim) {
992 scheduleDoGC(pcap, cap->running_task, false);
993 cap = *pcap;
994 }
995
996 // don't use a blocking acquire; if the lock is held by
997 // another thread then just carry on. This seems to avoid
998 // getting stuck in a message ping-pong situation with other
999 // processors. We'll check the inbox again later anyway.
1000 //
1001 // We should really use a more efficient queue data structure
1002 // here. The trickiness is that we must ensure a Capability
1003 // never goes idle if the inbox is non-empty, which is why we
1004 // use cap->lock (cap->lock is released as the last thing
1005 // before going idle; see Capability.c:releaseCapability()).
1006 r = TRY_ACQUIRE_LOCK(&cap->lock);
1007 if (r != 0) return;
1008
1009 m = cap->inbox;
1010 p = cap->putMVars;
1011 cap->inbox = (Message*)END_TSO_QUEUE;
1012 cap->putMVars = NULL;
1013
1014 RELEASE_LOCK(&cap->lock);
1015
1016 while (m != (Message*)END_TSO_QUEUE) {
1017 next = m->link;
1018 executeMessage(cap, m);
1019 m = next;
1020 }
1021
1022 while (p != NULL) {
1023 pnext = p->link;
1024 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1025 Unit_closure);
1026 freeStablePtr(p->mvar);
1027 stgFree(p);
1028 p = pnext;
1029 }
1030 }
1031 #endif
1032 }
1033
1034
1035 /* ----------------------------------------------------------------------------
1036 * Activate spark threads (THREADED_RTS)
1037 * ------------------------------------------------------------------------- */
1038
1039 #if defined(THREADED_RTS)
1040 static void
1041 scheduleActivateSpark(Capability *cap)
1042 {
1043 if (anySparks() && !cap->disabled)
1044 {
1045 createSparkThread(cap);
1046 debugTrace(DEBUG_sched, "creating a spark thread");
1047 }
1048 }
1049 #endif // THREADED_RTS
1050
1051 /* ----------------------------------------------------------------------------
1052 * After running a thread...
1053 * ------------------------------------------------------------------------- */
1054
1055 static void
1056 schedulePostRunThread (Capability *cap, StgTSO *t)
1057 {
1058 // We have to be able to catch transactions that are in an
1059 // infinite loop as a result of seeing an inconsistent view of
1060 // memory, e.g.
1061 //
1062 // atomically $ do
1063 // [a,b] <- mapM readTVar [ta,tb]
1064 // when (a == b) loop
1065 //
1066 // and a is never equal to b given a consistent view of memory.
1067 //
1068 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1069 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1070 debugTrace(DEBUG_sched | DEBUG_stm,
1071 "trec %p found wasting its time", t);
1072
1073 // strip the stack back to the
1074 // ATOMICALLY_FRAME, aborting the (nested)
1075 // transaction, and saving the stack of any
1076 // partially-evaluated thunks on the heap.
1077 throwToSingleThreaded_(cap, t, NULL, true);
1078
1079 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1080 }
1081 }
1082
1083 //
1084 // If the current thread's allocation limit has run out, send it
1085 // the AllocationLimitExceeded exception.
1086
1087 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1088 // Use a throwToSelf rather than a throwToSingleThreaded, because
1089 // it correctly handles the case where the thread is currently
1090 // inside mask. Also the thread might be blocked (e.g. on an
1091 // MVar), and throwToSingleThreaded doesn't unblock it
1092 // correctly in that case.
1093 throwToSelf(cap, t, allocationLimitExceeded_closure);
1094 ASSIGN_Int64((W_*)&(t->alloc_limit),
1095 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1096 }
1097
1098 /* some statistics gathering in the parallel case */
1099 }
1100
1101 /* -----------------------------------------------------------------------------
1102 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1103 * -------------------------------------------------------------------------- */
1104
1105 static bool
1106 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1107 {
1108 if (cap->r.rHpLim == NULL || cap->context_switch) {
1109 // Sometimes we miss a context switch, e.g. when calling
1110 // primitives in a tight loop, MAYBE_GC() doesn't check the
1111 // context switch flag, and we end up waiting for a GC.
1112 // See #1984, and concurrent/should_run/1984
1113 cap->context_switch = 0;
1114 appendToRunQueue(cap,t);
1115 } else {
1116 pushOnRunQueue(cap,t);
1117 }
1118
1119 // did the task ask for a large block?
1120 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1121 // if so, get one and push it on the front of the nursery.
1122 bdescr *bd;
1123 W_ blocks;
1124
1125 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1126
1127 if (blocks > BLOCKS_PER_MBLOCK) {
1128 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1129 }
1130
1131 debugTrace(DEBUG_sched,
1132 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1133 (long)t->id, what_next_strs[t->what_next], blocks);
1134
1135 // don't do this if the nursery is (nearly) full, we'll GC first.
1136 if (cap->r.rCurrentNursery->link != NULL ||
1137 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1138 // infinite loop if the
1139 // nursery has only one
1140 // block.
1141
1142 bd = allocGroupOnNode_lock(cap->node,blocks);
1143 cap->r.rNursery->n_blocks += blocks;
1144
1145 // link the new group after CurrentNursery
1146 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1147
1148 // initialise it as a nursery block. We initialise the
1149 // step, gen_no, and flags field of *every* sub-block in
1150 // this large block, because this is easier than making
1151 // sure that we always find the block head of a large
1152 // block whenever we call Bdescr() (eg. evacuate() and
1153 // isAlive() in the GC would both have to do this, at
1154 // least).
1155 {
1156 bdescr *x;
1157 for (x = bd; x < bd + blocks; x++) {
1158 initBdescr(x,g0,g0);
1159 x->free = x->start;
1160 x->flags = 0;
1161 }
1162 }
1163
1164 // This assert can be a killer if the app is doing lots
1165 // of large block allocations.
1166 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1167
1168 // now update the nursery to point to the new block
1169 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1170 cap->r.rCurrentNursery = bd;
1171
1172 // we might be unlucky and have another thread get on the
1173 // run queue before us and steal the large block, but in that
1174 // case the thread will just end up requesting another large
1175 // block.
1176 return false; /* not actually GC'ing */
1177 }
1178 }
1179
1180 // if we got here because we exceeded large_alloc_lim, then
1181 // proceed straight to GC.
1182 if (g0->n_new_large_words >= large_alloc_lim) {
1183 return true;
1184 }
1185
1186 // Otherwise, we just ran out of space in the current nursery.
1187 // Grab another nursery if we can.
1188 if (getNewNursery(cap)) {
1189 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1190 return false;
1191 }
1192
1193 return true;
1194 /* actual GC is done at the end of the while loop in schedule() */
1195 }
1196
1197 /* -----------------------------------------------------------------------------
1198 * Handle a thread that returned to the scheduler with ThreadYielding
1199 * -------------------------------------------------------------------------- */
1200
1201 static bool
1202 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1203 {
1204 /* put the thread back on the run queue. Then, if we're ready to
1205 * GC, check whether this is the last task to stop. If so, wake
1206 * up the GC thread. getThread will block during a GC until the
1207 * GC is finished.
1208 */
1209
1210 ASSERT(t->_link == END_TSO_QUEUE);
1211
1212 // Shortcut if we're just switching evaluators: don't bother
1213 // doing stack squeezing (which can be expensive), just run the
1214 // thread.
1215 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1216 debugTrace(DEBUG_sched,
1217 "--<< thread %ld (%s) stopped to switch evaluators",
1218 (long)t->id, what_next_strs[t->what_next]);
1219 return true;
1220 }
1221
1222 // Reset the context switch flag. We don't do this just before
1223 // running the thread, because that would mean we would lose ticks
1224 // during GC, which can lead to unfair scheduling (a thread hogs
1225 // the CPU because the tick always arrives during GC). This way
1226 // penalises threads that do a lot of allocation, but that seems
1227 // better than the alternative.
1228 if (cap->context_switch != 0) {
1229 cap->context_switch = 0;
1230 appendToRunQueue(cap,t);
1231 } else {
1232 pushOnRunQueue(cap,t);
1233 }
1234
1235 IF_DEBUG(sanity,
1236 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237 checkTSO(t));
1238
1239 return false;
1240 }
1241
1242 /* -----------------------------------------------------------------------------
1243 * Handle a thread that returned to the scheduler with ThreadBlocked
1244 * -------------------------------------------------------------------------- */
1245
1246 static void
1247 scheduleHandleThreadBlocked( StgTSO *t
1248 #if !defined(DEBUG)
1249 STG_UNUSED
1250 #endif
1251 )
1252 {
1253
1254 // We don't need to do anything. The thread is blocked, and it
1255 // has tidied up its stack and placed itself on whatever queue
1256 // it needs to be on.
1257
1258 // ASSERT(t->why_blocked != NotBlocked);
1259 // Not true: for example,
1260 // - the thread may have woken itself up already, because
1261 // threadPaused() might have raised a blocked throwTo
1262 // exception, see maybePerformBlockedException().
1263
1264 #ifdef DEBUG
1265 traceThreadStatus(DEBUG_sched, t);
1266 #endif
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270 * Handle a thread that returned to the scheduler with ThreadFinished
1271 * -------------------------------------------------------------------------- */
1272
1273 static bool
1274 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1275 {
1276 /* Need to check whether this was a main thread, and if so,
1277 * return with the return value.
1278 *
1279 * We also end up here if the thread kills itself with an
1280 * uncaught exception, see Exception.cmm.
1281 */
1282
1283 // blocked exceptions can now complete, even if the thread was in
1284 // blocked mode (see #2910).
1285 awakenBlockedExceptionQueue (cap, t);
1286
1287 //
1288 // Check whether the thread that just completed was a bound
1289 // thread, and if so return with the result.
1290 //
1291 // There is an assumption here that all thread completion goes
1292 // through this point; we need to make sure that if a thread
1293 // ends up in the ThreadKilled state, that it stays on the run
1294 // queue so it can be dealt with here.
1295 //
1296
1297 if (t->bound) {
1298
1299 if (t->bound != task->incall) {
1300 #if !defined(THREADED_RTS)
1301 // Must be a bound thread that is not the topmost one. Leave
1302 // it on the run queue until the stack has unwound to the
1303 // point where we can deal with this. Leaving it on the run
1304 // queue also ensures that the garbage collector knows about
1305 // this thread and its return value (it gets dropped from the
1306 // step->threads list so there's no other way to find it).
1307 appendToRunQueue(cap,t);
1308 return false;
1309 #else
1310 // this cannot happen in the threaded RTS, because a
1311 // bound thread can only be run by the appropriate Task.
1312 barf("finished bound thread that isn't mine");
1313 #endif
1314 }
1315
1316 ASSERT(task->incall->tso == t);
1317
1318 if (t->what_next == ThreadComplete) {
1319 if (task->incall->ret) {
1320 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1321 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1322 }
1323 task->incall->rstat = Success;
1324 } else {
1325 if (task->incall->ret) {
1326 *(task->incall->ret) = NULL;
1327 }
1328 if (sched_state >= SCHED_INTERRUPTING) {
1329 if (heap_overflow) {
1330 task->incall->rstat = HeapExhausted;
1331 } else {
1332 task->incall->rstat = Interrupted;
1333 }
1334 } else {
1335 task->incall->rstat = Killed;
1336 }
1337 }
1338 #ifdef DEBUG
1339 removeThreadLabel((StgWord)task->incall->tso->id);
1340 #endif
1341
1342 // We no longer consider this thread and task to be bound to
1343 // each other. The TSO lives on until it is GC'd, but the
1344 // task is about to be released by the caller, and we don't
1345 // want anyone following the pointer from the TSO to the
1346 // defunct task (which might have already been
1347 // re-used). This was a real bug: the GC updated
1348 // tso->bound->tso which lead to a deadlock.
1349 t->bound = NULL;
1350 task->incall->tso = NULL;
1351
1352 return true; // tells schedule() to return
1353 }
1354
1355 return false;
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359 * Perform a heap census
1360 * -------------------------------------------------------------------------- */
1361
1362 static bool
1363 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1364 {
1365 // When we have +RTS -i0 and we're heap profiling, do a census at
1366 // every GC. This lets us get repeatable runs for debugging.
1367 if (performHeapProfile ||
1368 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1369 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1370 return true;
1371 } else {
1372 return false;
1373 }
1374 }
1375
1376 /* -----------------------------------------------------------------------------
1377 * stopAllCapabilities()
1378 *
1379 * Stop all Haskell execution. This is used when we need to make some global
1380 * change to the system, such as altering the number of capabilities, or
1381 * forking.
1382 *
1383 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1384 * -------------------------------------------------------------------------- */
1385
1386 #if defined(THREADED_RTS)
1387 static void stopAllCapabilities (Capability **pCap, Task *task)
1388 {
1389 bool was_syncing;
1390 SyncType prev_sync_type;
1391
1392 PendingSync sync = {
1393 .type = SYNC_OTHER,
1394 .idle = NULL,
1395 .task = task
1396 };
1397
1398 do {
1399 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1400 } while (was_syncing);
1401
1402 acquireAllCapabilities(*pCap,task);
1403
1404 pending_sync = 0;
1405 }
1406 #endif
1407
1408 /* -----------------------------------------------------------------------------
1409 * requestSync()
1410 *
1411 * Commence a synchronisation between all capabilities. Normally not called
1412 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1413 * has some special synchronisation requirements.
1414 *
1415 * Returns:
1416 * false if we successfully got a sync
1417 * true if there was another sync request in progress,
1418 * and we yielded to it. The value returned is the
1419 * type of the other sync request.
1420 * -------------------------------------------------------------------------- */
1421
1422 #if defined(THREADED_RTS)
1423 static bool requestSync (
1424 Capability **pcap, Task *task, PendingSync *new_sync,
1425 SyncType *prev_sync_type)
1426 {
1427 PendingSync *sync;
1428
1429 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1430 (StgWord)NULL,
1431 (StgWord)new_sync);
1432
1433 if (sync != NULL)
1434 {
1435 // sync is valid until we have called yieldCapability().
1436 // After the sync is completed, we cannot read that struct any
1437 // more because it has been freed.
1438 *prev_sync_type = sync->type;
1439 do {
1440 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1441 sync->type);
1442 ASSERT(*pcap);
1443 yieldCapability(pcap,task,true);
1444 sync = pending_sync;
1445 } while (sync != NULL);
1446
1447 // NOTE: task->cap might have changed now
1448 return true;
1449 }
1450 else
1451 {
1452 return false;
1453 }
1454 }
1455 #endif
1456
1457 /* -----------------------------------------------------------------------------
1458 * acquireAllCapabilities()
1459 *
1460 * Grab all the capabilities except the one we already hold. Used
1461 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1462 * before a fork (SYNC_OTHER).
1463 *
1464 * Only call this after requestSync(), otherwise a deadlock might
1465 * ensue if another thread is trying to synchronise.
1466 * -------------------------------------------------------------------------- */
1467
1468 #ifdef THREADED_RTS
1469 static void acquireAllCapabilities(Capability *cap, Task *task)
1470 {
1471 Capability *tmpcap;
1472 uint32_t i;
1473
1474 ASSERT(pending_sync != NULL);
1475 for (i=0; i < n_capabilities; i++) {
1476 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1477 i, n_capabilities);
1478 tmpcap = capabilities[i];
1479 if (tmpcap != cap) {
1480 // we better hope this task doesn't get migrated to
1481 // another Capability while we're waiting for this one.
1482 // It won't, because load balancing happens while we have
1483 // all the Capabilities, but even so it's a slightly
1484 // unsavoury invariant.
1485 task->cap = tmpcap;
1486 waitForCapability(&tmpcap, task);
1487 if (tmpcap->no != i) {
1488 barf("acquireAllCapabilities: got the wrong capability");
1489 }
1490 }
1491 }
1492 task->cap = cap;
1493 }
1494 #endif
1495
1496 /* -----------------------------------------------------------------------------
1497 * releaseAllcapabilities()
1498 *
1499 * Assuming this thread holds all the capabilities, release them all except for
1500 * the one passed in as cap.
1501 * -------------------------------------------------------------------------- */
1502
1503 #ifdef THREADED_RTS
1504 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1505 {
1506 uint32_t i;
1507
1508 for (i = 0; i < n; i++) {
1509 if (cap->no != i) {
1510 task->cap = capabilities[i];
1511 releaseCapability(capabilities[i]);
1512 }
1513 }
1514 task->cap = cap;
1515 }
1516 #endif
1517
1518 /* -----------------------------------------------------------------------------
1519 * Perform a garbage collection if necessary
1520 * -------------------------------------------------------------------------- */
1521
1522 static void
1523 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1524 bool force_major)
1525 {
1526 Capability *cap = *pcap;
1527 bool heap_census;
1528 uint32_t collect_gen;
1529 bool major_gc;
1530 #ifdef THREADED_RTS
1531 uint32_t gc_type;
1532 uint32_t i;
1533 uint32_t need_idle;
1534 uint32_t n_gc_threads;
1535 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1536 StgTSO *tso;
1537 bool *idle_cap;
1538 // idle_cap is an array (allocated later) of size n_capabilities, where
1539 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1540 // cycle.
1541 #endif
1542
1543 if (sched_state == SCHED_SHUTTING_DOWN) {
1544 // The final GC has already been done, and the system is
1545 // shutting down. We'll probably deadlock if we try to GC
1546 // now.
1547 return;
1548 }
1549
1550 heap_census = scheduleNeedHeapProfile(true);
1551
1552 // Figure out which generation we are collecting, so that we can
1553 // decide whether this is a parallel GC or not.
1554 collect_gen = calcNeeded(force_major || heap_census, NULL);
1555 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1556
1557 #ifdef THREADED_RTS
1558 if (sched_state < SCHED_INTERRUPTING
1559 && RtsFlags.ParFlags.parGcEnabled
1560 && collect_gen >= RtsFlags.ParFlags.parGcGen
1561 && ! oldest_gen->mark)
1562 {
1563 gc_type = SYNC_GC_PAR;
1564 } else {
1565 gc_type = SYNC_GC_SEQ;
1566 }
1567
1568 // In order to GC, there must be no threads running Haskell code.
1569 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1570 // capabilities, and release them after the GC has completed. For parallel
1571 // GC, we synchronise all the running threads using requestSync().
1572 //
1573 // Other capabilities are prevented from running yet more Haskell threads if
1574 // pending_sync is set. Tested inside yieldCapability() and
1575 // releaseCapability() in Capability.c
1576
1577 PendingSync sync = {
1578 .type = gc_type,
1579 .idle = NULL,
1580 .task = task
1581 };
1582
1583 {
1584 SyncType prev_sync = 0;
1585 bool was_syncing;
1586 do {
1587 // If -qn is not set and we have more capabilities than cores, set
1588 // the number of GC threads to #cores. We do this here rather than
1589 // in normaliseRtsOpts() because here it will work if the program
1590 // calls setNumCapabilities.
1591 //
1592 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1593 if (n_gc_threads == 0 &&
1594 enabled_capabilities > getNumberOfProcessors()) {
1595 n_gc_threads = getNumberOfProcessors();
1596 }
1597
1598 // This calculation must be inside the loop because
1599 // enabled_capabilities may change if requestSync() below fails and
1600 // we retry.
1601 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1602 if (n_gc_threads >= enabled_capabilities) {
1603 need_idle = 0;
1604 } else {
1605 need_idle = enabled_capabilities - n_gc_threads;
1606 }
1607 } else {
1608 need_idle = 0;
1609 }
1610
1611 // We need an array of size n_capabilities, but since this may
1612 // change each time around the loop we must allocate it afresh.
1613 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1614 sizeof(bool),
1615 "scheduleDoGC");
1616 sync.idle = idle_cap;
1617
1618 // When using +RTS -qn, we need some capabilities to be idle during
1619 // GC. The best bet is to choose some inactive ones, so we look for
1620 // those first:
1621 uint32_t n_idle = need_idle;
1622 for (i=0; i < n_capabilities; i++) {
1623 if (capabilities[i]->disabled) {
1624 idle_cap[i] = true;
1625 } else if (n_idle > 0 &&
1626 capabilities[i]->running_task == NULL) {
1627 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1628 n_idle--;
1629 idle_cap[i] = true;
1630 } else {
1631 idle_cap[i] = false;
1632 }
1633 }
1634 // If we didn't find enough inactive capabilities, just pick some
1635 // more to be idle.
1636 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1637 if (!idle_cap[i] && i != cap->no) {
1638 idle_cap[i] = true;
1639 n_idle--;
1640 }
1641 }
1642 ASSERT(n_idle == 0);
1643
1644 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1645 cap = *pcap;
1646 if (was_syncing) {
1647 stgFree(idle_cap);
1648 }
1649 if (was_syncing &&
1650 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1651 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1652 // someone else had a pending sync request for a GC, so
1653 // let's assume GC has been done and we don't need to GC
1654 // again.
1655 // Exception to this: if SCHED_INTERRUPTING, then we still
1656 // need to do the final GC.
1657 return;
1658 }
1659 if (sched_state == SCHED_SHUTTING_DOWN) {
1660 // The scheduler might now be shutting down. We tested
1661 // this above, but it might have become true since then as
1662 // we yielded the capability in requestSync().
1663 return;
1664 }
1665 } while (was_syncing);
1666 }
1667
1668 stat_startGCSync(gc_threads[cap->no]);
1669
1670 #ifdef DEBUG
1671 unsigned int old_n_capabilities = n_capabilities;
1672 #endif
1673
1674 interruptAllCapabilities();
1675
1676 // The final shutdown GC is always single-threaded, because it's
1677 // possible that some of the Capabilities have no worker threads.
1678
1679 if (gc_type == SYNC_GC_SEQ) {
1680 traceEventRequestSeqGc(cap);
1681 } else {
1682 traceEventRequestParGc(cap);
1683 }
1684
1685 if (gc_type == SYNC_GC_SEQ) {
1686 // single-threaded GC: grab all the capabilities
1687 acquireAllCapabilities(cap,task);
1688 }
1689 else
1690 {
1691 // If we are load-balancing collections in this
1692 // generation, then we require all GC threads to participate
1693 // in the collection. Otherwise, we only require active
1694 // threads to participate, and we set gc_threads[i]->idle for
1695 // any idle capabilities. The rationale here is that waking
1696 // up an idle Capability takes much longer than just doing any
1697 // GC work on its behalf.
1698
1699 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1700 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1701 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1702 {
1703 for (i=0; i < n_capabilities; i++) {
1704 if (capabilities[i]->disabled) {
1705 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1706 if (idle_cap[i]) {
1707 n_idle_caps++;
1708 }
1709 } else {
1710 if (i != cap->no && idle_cap[i]) {
1711 Capability *tmpcap = capabilities[i];
1712 task->cap = tmpcap;
1713 waitForCapability(&tmpcap, task);
1714 n_idle_caps++;
1715 }
1716 }
1717 }
1718 }
1719 else
1720 {
1721 for (i=0; i < n_capabilities; i++) {
1722 if (capabilities[i]->disabled) {
1723 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1724 if (idle_cap[i]) {
1725 n_idle_caps++;
1726 }
1727 } else if (i != cap->no &&
1728 capabilities[i]->idle >=
1729 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1730 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1731 if (idle_cap[i]) {
1732 n_idle_caps++;
1733 } else {
1734 n_failed_trygrab_idles++;
1735 }
1736 }
1737 }
1738 }
1739 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1740
1741 for (i=0; i < n_capabilities; i++) {
1742 capabilities[i]->idle++;
1743 }
1744
1745 // For all capabilities participating in this GC, wait until
1746 // they have stopped mutating and are standing by for GC.
1747 waitForGcThreads(cap, idle_cap);
1748
1749 #if defined(THREADED_RTS)
1750 // Stable point where we can do a global check on our spark counters
1751 ASSERT(checkSparkCountInvariant());
1752 #endif
1753 }
1754
1755 #endif
1756
1757 IF_DEBUG(scheduler, printAllThreads());
1758
1759 delete_threads_and_gc:
1760 /*
1761 * We now have all the capabilities; if we're in an interrupting
1762 * state, then we should take the opportunity to delete all the
1763 * threads in the system.
1764 * Checking for major_gc ensures that the last GC is major.
1765 */
1766 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1767 deleteAllThreads(cap);
1768 #if defined(THREADED_RTS)
1769 // Discard all the sparks from every Capability. Why?
1770 // They'll probably be GC'd anyway since we've killed all the
1771 // threads. It just avoids the GC having to do any work to
1772 // figure out that any remaining sparks are garbage.
1773 for (i = 0; i < n_capabilities; i++) {
1774 capabilities[i]->spark_stats.gcd +=
1775 sparkPoolSize(capabilities[i]->sparks);
1776 // No race here since all Caps are stopped.
1777 discardSparksCap(capabilities[i]);
1778 }
1779 #endif
1780 sched_state = SCHED_SHUTTING_DOWN;
1781 }
1782
1783 /*
1784 * When there are disabled capabilities, we want to migrate any
1785 * threads away from them. Normally this happens in the
1786 * scheduler's loop, but only for unbound threads - it's really
1787 * hard for a bound thread to migrate itself. So we have another
1788 * go here.
1789 */
1790 #if defined(THREADED_RTS)
1791 for (i = enabled_capabilities; i < n_capabilities; i++) {
1792 Capability *tmp_cap, *dest_cap;
1793 tmp_cap = capabilities[i];
1794 ASSERT(tmp_cap->disabled);
1795 if (i != cap->no) {
1796 dest_cap = capabilities[i % enabled_capabilities];
1797 while (!emptyRunQueue(tmp_cap)) {
1798 tso = popRunQueue(tmp_cap);
1799 migrateThread(tmp_cap, tso, dest_cap);
1800 if (tso->bound) {
1801 traceTaskMigrate(tso->bound->task,
1802 tso->bound->task->cap,
1803 dest_cap);
1804 tso->bound->task->cap = dest_cap;
1805 }
1806 }
1807 }
1808 }
1809 #endif
1810
1811 #if defined(THREADED_RTS)
1812 // reset pending_sync *before* GC, so that when the GC threads
1813 // emerge they don't immediately re-enter the GC.
1814 pending_sync = 0;
1815 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1816 #else
1817 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1818 #endif
1819
1820 traceSparkCounters(cap);
1821
1822 switch (recent_activity) {
1823 case ACTIVITY_INACTIVE:
1824 if (force_major) {
1825 // We are doing a GC because the system has been idle for a
1826 // timeslice and we need to check for deadlock. Record the
1827 // fact that we've done a GC and turn off the timer signal;
1828 // it will get re-enabled if we run any threads after the GC.
1829 recent_activity = ACTIVITY_DONE_GC;
1830 #ifndef PROFILING
1831 stopTimer();
1832 #endif
1833 break;
1834 }
1835 // fall through...
1836
1837 case ACTIVITY_MAYBE_NO:
1838 // the GC might have taken long enough for the timer to set
1839 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1840 // but we aren't necessarily deadlocked:
1841 recent_activity = ACTIVITY_YES;
1842 break;
1843
1844 case ACTIVITY_DONE_GC:
1845 // If we are actually active, the scheduler will reset the
1846 // recent_activity flag and re-enable the timer.
1847 break;
1848 }
1849
1850 #if defined(THREADED_RTS)
1851 // Stable point where we can do a global check on our spark counters
1852 ASSERT(checkSparkCountInvariant());
1853 #endif
1854
1855 // The heap census itself is done during GarbageCollect().
1856 if (heap_census) {
1857 performHeapProfile = false;
1858 }
1859
1860 #if defined(THREADED_RTS)
1861
1862 // If n_capabilities has changed during GC, we're in trouble.
1863 ASSERT(n_capabilities == old_n_capabilities);
1864
1865 if (gc_type == SYNC_GC_PAR)
1866 {
1867 for (i = 0; i < n_capabilities; i++) {
1868 if (i != cap->no) {
1869 if (idle_cap[i]) {
1870 ASSERT(capabilities[i]->running_task == task);
1871 task->cap = capabilities[i];
1872 releaseCapability(capabilities[i]);
1873 } else {
1874 ASSERT(capabilities[i]->running_task != task);
1875 }
1876 }
1877 }
1878 task->cap = cap;
1879
1880 // releaseGCThreads() happens *after* we have released idle
1881 // capabilities. Otherwise what can happen is one of the released
1882 // threads starts a new GC, and finds that it can't acquire some of
1883 // the disabled capabilities, because the previous GC still holds
1884 // them, so those disabled capabilities will not be idle during the
1885 // next GC round. However, if we release the capabilities first,
1886 // then they will be free (because they're disabled) when the next
1887 // GC cycle happens.
1888 releaseGCThreads(cap, idle_cap);
1889 }
1890 #endif
1891
1892 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1893 // GC set the heap_overflow flag, so we should proceed with
1894 // an orderly shutdown now. Ultimately we want the main
1895 // thread to return to its caller with HeapExhausted, at which
1896 // point the caller should call hs_exit(). The first step is
1897 // to delete all the threads.
1898 //
1899 // Another way to do this would be to raise an exception in
1900 // the main thread, which we really should do because it gives
1901 // the program a chance to clean up. But how do we find the
1902 // main thread? It should presumably be the same one that
1903 // gets ^C exceptions, but that's all done on the Haskell side
1904 // (GHC.TopHandler).
1905 sched_state = SCHED_INTERRUPTING;
1906 goto delete_threads_and_gc;
1907 }
1908
1909 #ifdef SPARKBALANCE
1910 /* JB
1911 Once we are all together... this would be the place to balance all
1912 spark pools. No concurrent stealing or adding of new sparks can
1913 occur. Should be defined in Sparks.c. */
1914 balanceSparkPoolsCaps(n_capabilities, capabilities);
1915 #endif
1916
1917 #if defined(THREADED_RTS)
1918 stgFree(idle_cap);
1919
1920 if (gc_type == SYNC_GC_SEQ) {
1921 // release our stash of capabilities.
1922 releaseAllCapabilities(n_capabilities, cap, task);
1923 }
1924 #endif
1925
1926 return;
1927 }
1928
1929 /* ---------------------------------------------------------------------------
1930 * Singleton fork(). Do not copy any running threads.
1931 * ------------------------------------------------------------------------- */
1932
1933 pid_t
1934 forkProcess(HsStablePtr *entry
1935 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1936 STG_UNUSED
1937 #endif
1938 )
1939 {
1940 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1941 pid_t pid;
1942 StgTSO* t,*next;
1943 Capability *cap;
1944 uint32_t g;
1945 Task *task = NULL;
1946 uint32_t i;
1947
1948 debugTrace(DEBUG_sched, "forking!");
1949
1950 task = newBoundTask();
1951
1952 cap = NULL;
1953 waitForCapability(&cap, task);
1954
1955 #ifdef THREADED_RTS
1956 stopAllCapabilities(&cap, task);
1957 #endif
1958
1959 // no funny business: hold locks while we fork, otherwise if some
1960 // other thread is holding a lock when the fork happens, the data
1961 // structure protected by the lock will forever be in an
1962 // inconsistent state in the child. See also #1391.
1963 ACQUIRE_LOCK(&sched_mutex);
1964 ACQUIRE_LOCK(&sm_mutex);
1965 ACQUIRE_LOCK(&stable_mutex);
1966 ACQUIRE_LOCK(&task->lock);
1967
1968 for (i=0; i < n_capabilities; i++) {
1969 ACQUIRE_LOCK(&capabilities[i]->lock);
1970 }
1971
1972 #ifdef THREADED_RTS
1973 ACQUIRE_LOCK(&all_tasks_mutex);
1974 #endif
1975
1976 stopTimer(); // See #4074
1977
1978 #if defined(TRACING)
1979 flushEventLog(); // so that child won't inherit dirty file buffers
1980 #endif
1981
1982 pid = fork();
1983
1984 if (pid) { // parent
1985
1986 startTimer(); // #4074
1987
1988 RELEASE_LOCK(&sched_mutex);
1989 RELEASE_LOCK(&sm_mutex);
1990 RELEASE_LOCK(&stable_mutex);
1991 RELEASE_LOCK(&task->lock);
1992
1993 for (i=0; i < n_capabilities; i++) {
1994 releaseCapability_(capabilities[i],false);
1995 RELEASE_LOCK(&capabilities[i]->lock);
1996 }
1997
1998 #ifdef THREADED_RTS
1999 RELEASE_LOCK(&all_tasks_mutex);
2000 #endif
2001
2002 boundTaskExiting(task);
2003
2004 // just return the pid
2005 return pid;
2006
2007 } else { // child
2008
2009 #if defined(THREADED_RTS)
2010 initMutex(&sched_mutex);
2011 initMutex(&sm_mutex);
2012 initMutex(&stable_mutex);
2013 initMutex(&task->lock);
2014
2015 for (i=0; i < n_capabilities; i++) {
2016 initMutex(&capabilities[i]->lock);
2017 }
2018
2019 initMutex(&all_tasks_mutex);
2020 #endif
2021
2022 #ifdef TRACING
2023 resetTracing();
2024 #endif
2025
2026 // Now, all OS threads except the thread that forked are
2027 // stopped. We need to stop all Haskell threads, including
2028 // those involved in foreign calls. Also we need to delete
2029 // all Tasks, because they correspond to OS threads that are
2030 // now gone.
2031
2032 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2033 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2034 next = t->global_link;
2035 // don't allow threads to catch the ThreadKilled
2036 // exception, but we do want to raiseAsync() because these
2037 // threads may be evaluating thunks that we need later.
2038 deleteThread_(t->cap,t);
2039
2040 // stop the GC from updating the InCall to point to
2041 // the TSO. This is only necessary because the
2042 // OSThread bound to the TSO has been killed, and
2043 // won't get a chance to exit in the usual way (see
2044 // also scheduleHandleThreadFinished).
2045 t->bound = NULL;
2046 }
2047 }
2048
2049 discardTasksExcept(task);
2050
2051 for (i=0; i < n_capabilities; i++) {
2052 cap = capabilities[i];
2053
2054 // Empty the run queue. It seems tempting to let all the
2055 // killed threads stay on the run queue as zombies to be
2056 // cleaned up later, but some of them may correspond to
2057 // bound threads for which the corresponding Task does not
2058 // exist.
2059 truncateRunQueue(cap);
2060 cap->n_run_queue = 0;
2061
2062 // Any suspended C-calling Tasks are no more, their OS threads
2063 // don't exist now:
2064 cap->suspended_ccalls = NULL;
2065 cap->n_suspended_ccalls = 0;
2066
2067 #if defined(THREADED_RTS)
2068 // Wipe our spare workers list, they no longer exist. New
2069 // workers will be created if necessary.
2070 cap->spare_workers = NULL;
2071 cap->n_spare_workers = 0;
2072 cap->returning_tasks_hd = NULL;
2073 cap->returning_tasks_tl = NULL;
2074 cap->n_returning_tasks = 0;
2075 #endif
2076
2077 // Release all caps except 0, we'll use that for starting
2078 // the IO manager and running the client action below.
2079 if (cap->no != 0) {
2080 task->cap = cap;
2081 releaseCapability(cap);
2082 }
2083 }
2084 cap = capabilities[0];
2085 task->cap = cap;
2086
2087 // Empty the threads lists. Otherwise, the garbage
2088 // collector may attempt to resurrect some of these threads.
2089 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2090 generations[g].threads = END_TSO_QUEUE;
2091 }
2092
2093 // On Unix, all timers are reset in the child, so we need to start
2094 // the timer again.
2095 initTimer();
2096 startTimer();
2097
2098 // TODO: need to trace various other things in the child
2099 // like startup event, capabilities, process info etc
2100 traceTaskCreate(task, cap);
2101
2102 #if defined(THREADED_RTS)
2103 ioManagerStartCap(&cap);
2104 #endif
2105
2106 // Install toplevel exception handlers, so interruption
2107 // signal will be sent to the main thread.
2108 // See Trac #12903
2109 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2110 rts_checkSchedStatus("forkProcess",cap);
2111
2112 rts_unlock(cap);
2113 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2114 }
2115 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2116 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2117 #endif
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121 * Changing the number of Capabilities
2122 *
2123 * Changing the number of Capabilities is very tricky! We can only do
2124 * it with the system fully stopped, so we do a full sync with
2125 * requestSync(SYNC_OTHER) and grab all the capabilities.
2126 *
2127 * Then we resize the appropriate data structures, and update all
2128 * references to the old data structures which have now moved.
2129 * Finally we release the Capabilities we are holding, and start
2130 * worker Tasks on the new Capabilities we created.
2131 *
2132 * ------------------------------------------------------------------------- */
2133
2134 void
2135 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2136 {
2137 #if !defined(THREADED_RTS)
2138 if (new_n_capabilities != 1) {
2139 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2140 }
2141 return;
2142 #elif defined(NOSMP)
2143 if (new_n_capabilities != 1) {
2144 errorBelch("setNumCapabilities: not supported on this platform");
2145 }
2146 return;
2147 #else
2148 Task *task;
2149 Capability *cap;
2150 uint32_t n;
2151 Capability *old_capabilities = NULL;
2152 uint32_t old_n_capabilities = n_capabilities;
2153
2154 if (new_n_capabilities == enabled_capabilities) return;
2155
2156 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2157 enabled_capabilities, new_n_capabilities);
2158
2159 cap = rts_lock();
2160 task = cap->running_task;
2161
2162 stopAllCapabilities(&cap, task);
2163
2164 if (new_n_capabilities < enabled_capabilities)
2165 {
2166 // Reducing the number of capabilities: we do not actually
2167 // remove the extra capabilities, we just mark them as
2168 // "disabled". This has the following effects:
2169 //
2170 // - threads on a disabled capability are migrated away by the
2171 // scheduler loop
2172 //
2173 // - disabled capabilities do not participate in GC
2174 // (see scheduleDoGC())
2175 //
2176 // - No spark threads are created on this capability
2177 // (see scheduleActivateSpark())
2178 //
2179 // - We do not attempt to migrate threads *to* a disabled
2180 // capability (see schedulePushWork()).
2181 //
2182 // but in other respects, a disabled capability remains
2183 // alive. Threads may be woken up on a disabled capability,
2184 // but they will be immediately migrated away.
2185 //
2186 // This approach is much easier than trying to actually remove
2187 // the capability; we don't have to worry about GC data
2188 // structures, the nursery, etc.
2189 //
2190 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2191 capabilities[n]->disabled = true;
2192 traceCapDisable(capabilities[n]);
2193 }
2194 enabled_capabilities = new_n_capabilities;
2195 }
2196 else
2197 {
2198 // Increasing the number of enabled capabilities.
2199 //
2200 // enable any disabled capabilities, up to the required number
2201 for (n = enabled_capabilities;
2202 n < new_n_capabilities && n < n_capabilities; n++) {
2203 capabilities[n]->disabled = false;
2204 traceCapEnable(capabilities[n]);
2205 }
2206 enabled_capabilities = n;
2207
2208 if (new_n_capabilities > n_capabilities) {
2209 #if defined(TRACING)
2210 // Allocate eventlog buffers for the new capabilities. Note this
2211 // must be done before calling moreCapabilities(), because that
2212 // will emit events about creating the new capabilities and adding
2213 // them to existing capsets.
2214 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2215 #endif
2216
2217 // Resize the capabilities array
2218 // NB. after this, capabilities points somewhere new. Any pointers
2219 // of type (Capability *) are now invalid.
2220 moreCapabilities(n_capabilities, new_n_capabilities);
2221
2222 // Resize and update storage manager data structures
2223 storageAddCapabilities(n_capabilities, new_n_capabilities);
2224 }
2225 }
2226
2227 // update n_capabilities before things start running
2228 if (new_n_capabilities > n_capabilities) {
2229 n_capabilities = enabled_capabilities = new_n_capabilities;
2230 }
2231
2232 // We're done: release the original Capabilities
2233 releaseAllCapabilities(old_n_capabilities, cap,task);
2234
2235 // We can't free the old array until now, because we access it
2236 // while updating pointers in updateCapabilityRefs().
2237 if (old_capabilities) {
2238 stgFree(old_capabilities);
2239 }
2240
2241 // Notify IO manager that the number of capabilities has changed.
2242 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2243
2244 rts_unlock(cap);
2245
2246 #endif // THREADED_RTS
2247 }
2248
2249
2250
2251 /* ---------------------------------------------------------------------------
2252 * Delete all the threads in the system
2253 * ------------------------------------------------------------------------- */
2254
2255 static void
2256 deleteAllThreads ( Capability *cap )
2257 {
2258 // NOTE: only safe to call if we own all capabilities.
2259
2260 StgTSO* t, *next;
2261 uint32_t g;
2262
2263 debugTrace(DEBUG_sched,"deleting all threads");
2264 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2265 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2266 next = t->global_link;
2267 deleteThread(cap,t);
2268 }
2269 }
2270
2271 // The run queue now contains a bunch of ThreadKilled threads. We
2272 // must not throw these away: the main thread(s) will be in there
2273 // somewhere, and the main scheduler loop has to deal with it.
2274 // Also, the run queue is the only thing keeping these threads from
2275 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2276
2277 #if !defined(THREADED_RTS)
2278 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2279 ASSERT(sleeping_queue == END_TSO_QUEUE);
2280 #endif
2281 }
2282
2283 /* -----------------------------------------------------------------------------
2284 Managing the suspended_ccalls list.
2285 Locks required: sched_mutex
2286 -------------------------------------------------------------------------- */
2287
2288 STATIC_INLINE void
2289 suspendTask (Capability *cap, Task *task)
2290 {
2291 InCall *incall;
2292
2293 incall = task->incall;
2294 ASSERT(incall->next == NULL && incall->prev == NULL);
2295 incall->next = cap->suspended_ccalls;
2296 incall->prev = NULL;
2297 if (cap->suspended_ccalls) {
2298 cap->suspended_ccalls->prev = incall;
2299 }
2300 cap->suspended_ccalls = incall;
2301 cap->n_suspended_ccalls++;
2302 }
2303
2304 STATIC_INLINE void
2305 recoverSuspendedTask (Capability *cap, Task *task)
2306 {
2307 InCall *incall;
2308
2309 incall = task->incall;
2310 if (incall->prev) {
2311 incall->prev->next = incall->next;
2312 } else {
2313 ASSERT(cap->suspended_ccalls == incall);
2314 cap->suspended_ccalls = incall->next;
2315 }
2316 if (incall->next) {
2317 incall->next->prev = incall->prev;
2318 }
2319 incall->next = incall->prev = NULL;
2320 cap->n_suspended_ccalls--;
2321 }
2322
2323 /* ---------------------------------------------------------------------------
2324 * Suspending & resuming Haskell threads.
2325 *
2326 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2327 * its capability before calling the C function. This allows another
2328 * task to pick up the capability and carry on running Haskell
2329 * threads. It also means that if the C call blocks, it won't lock
2330 * the whole system.
2331 *
2332 * The Haskell thread making the C call is put to sleep for the
2333 * duration of the call, on the suspended_ccalling_threads queue. We
2334 * give out a token to the task, which it can use to resume the thread
2335 * on return from the C function.
2336 *
2337 * If this is an interruptible C call, this means that the FFI call may be
2338 * unceremoniously terminated and should be scheduled on an
2339 * unbound worker thread.
2340 * ------------------------------------------------------------------------- */
2341
2342 void *
2343 suspendThread (StgRegTable *reg, bool interruptible)
2344 {
2345 Capability *cap;
2346 int saved_errno;
2347 StgTSO *tso;
2348 Task *task;
2349 #if mingw32_HOST_OS
2350 StgWord32 saved_winerror;
2351 #endif
2352
2353 saved_errno = errno;
2354 #if mingw32_HOST_OS
2355 saved_winerror = GetLastError();
2356 #endif
2357
2358 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2359 */
2360 cap = regTableToCapability(reg);
2361
2362 task = cap->running_task;
2363 tso = cap->r.rCurrentTSO;
2364
2365 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2366
2367 // XXX this might not be necessary --SDM
2368 tso->what_next = ThreadRunGHC;
2369
2370 threadPaused(cap,tso);
2371
2372 if (interruptible) {
2373 tso->why_blocked = BlockedOnCCall_Interruptible;
2374 } else {
2375 tso->why_blocked = BlockedOnCCall;
2376 }
2377
2378 // Hand back capability
2379 task->incall->suspended_tso = tso;
2380 task->incall->suspended_cap = cap;
2381
2382 // Otherwise allocate() will write to invalid memory.
2383 cap->r.rCurrentTSO = NULL;
2384
2385 ACQUIRE_LOCK(&cap->lock);
2386
2387 suspendTask(cap,task);
2388 cap->in_haskell = false;
2389 releaseCapability_(cap,false);
2390
2391 RELEASE_LOCK(&cap->lock);
2392
2393 errno = saved_errno;
2394 #if mingw32_HOST_OS
2395 SetLastError(saved_winerror);
2396 #endif
2397 return task;
2398 }
2399
2400 StgRegTable *
2401 resumeThread (void *task_)
2402 {
2403 StgTSO *tso;
2404 InCall *incall;
2405 Capability *cap;
2406 Task *task = task_;
2407 int saved_errno;
2408 #if mingw32_HOST_OS
2409 StgWord32 saved_winerror;
2410 #endif
2411
2412 saved_errno = errno;
2413 #if mingw32_HOST_OS
2414 saved_winerror = GetLastError();
2415 #endif
2416
2417 incall = task->incall;
2418 cap = incall->suspended_cap;
2419 task->cap = cap;
2420
2421 // Wait for permission to re-enter the RTS with the result.
2422 waitForCapability(&cap,task);
2423 // we might be on a different capability now... but if so, our
2424 // entry on the suspended_ccalls list will also have been
2425 // migrated.
2426
2427 // Remove the thread from the suspended list
2428 recoverSuspendedTask(cap,task);
2429
2430 tso = incall->suspended_tso;
2431 incall->suspended_tso = NULL;
2432 incall->suspended_cap = NULL;
2433 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2434
2435 traceEventRunThread(cap, tso);
2436
2437 /* Reset blocking status */
2438 tso->why_blocked = NotBlocked;
2439
2440 if ((tso->flags & TSO_BLOCKEX) == 0) {
2441 // avoid locking the TSO if we don't have to
2442 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2443 maybePerformBlockedException(cap,tso);
2444 }
2445 }
2446
2447 cap->r.rCurrentTSO = tso;
2448 cap->in_haskell = true;
2449 errno = saved_errno;
2450 #if mingw32_HOST_OS
2451 SetLastError(saved_winerror);
2452 #endif
2453
2454 /* We might have GC'd, mark the TSO dirty again */
2455 dirty_TSO(cap,tso);
2456 dirty_STACK(cap,tso->stackobj);
2457
2458 IF_DEBUG(sanity, checkTSO(tso));
2459
2460 return &cap->r;
2461 }
2462
2463 /* ---------------------------------------------------------------------------
2464 * scheduleThread()
2465 *
2466 * scheduleThread puts a thread on the end of the runnable queue.
2467 * This will usually be done immediately after a thread is created.
2468 * The caller of scheduleThread must create the thread using e.g.
2469 * createThread and push an appropriate closure
2470 * on this thread's stack before the scheduler is invoked.
2471 * ------------------------------------------------------------------------ */
2472
2473 void
2474 scheduleThread(Capability *cap, StgTSO *tso)
2475 {
2476 // The thread goes at the *end* of the run-queue, to avoid possible
2477 // starvation of any threads already on the queue.
2478 appendToRunQueue(cap,tso);
2479 }
2480
2481 void
2482 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2483 {
2484 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2485 // move this thread from now on.
2486 #if defined(THREADED_RTS)
2487 cpu %= enabled_capabilities;
2488 if (cpu == cap->no) {
2489 appendToRunQueue(cap,tso);
2490 } else {
2491 migrateThread(cap, tso, capabilities[cpu]);
2492 }
2493 #else
2494 appendToRunQueue(cap,tso);
2495 #endif
2496 }
2497
2498 void
2499 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2500 {
2501 Task *task;
2502 DEBUG_ONLY( StgThreadID id );
2503 Capability *cap;
2504
2505 cap = *pcap;
2506
2507 // We already created/initialised the Task
2508 task = cap->running_task;
2509
2510 // This TSO is now a bound thread; make the Task and TSO
2511 // point to each other.
2512 tso->bound = task->incall;
2513 tso->cap = cap;
2514
2515 task->incall->tso = tso;
2516 task->incall->ret = ret;
2517 task->incall->rstat = NoStatus;
2518
2519 appendToRunQueue(cap,tso);
2520
2521 DEBUG_ONLY( id = tso->id );
2522 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2523
2524 cap = schedule(cap,task);
2525
2526 ASSERT(task->incall->rstat != NoStatus);
2527 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2528
2529 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2530 *pcap = cap;
2531 }
2532
2533 /* ----------------------------------------------------------------------------
2534 * Starting Tasks
2535 * ------------------------------------------------------------------------- */
2536
2537 #if defined(THREADED_RTS)
2538 void scheduleWorker (Capability *cap, Task *task)
2539 {
2540 // schedule() runs without a lock.
2541 cap = schedule(cap,task);
2542
2543 // On exit from schedule(), we have a Capability, but possibly not
2544 // the same one we started with.
2545
2546 // During shutdown, the requirement is that after all the
2547 // Capabilities are shut down, all workers that are shutting down
2548 // have finished workerTaskStop(). This is why we hold on to
2549 // cap->lock until we've finished workerTaskStop() below.
2550 //
2551 // There may be workers still involved in foreign calls; those
2552 // will just block in waitForCapability() because the
2553 // Capability has been shut down.
2554 //
2555 ACQUIRE_LOCK(&cap->lock);
2556 releaseCapability_(cap,false);
2557 workerTaskStop(task);
2558 RELEASE_LOCK(&cap->lock);
2559 }
2560 #endif
2561
2562 /* ---------------------------------------------------------------------------
2563 * Start new worker tasks on Capabilities from--to
2564 * -------------------------------------------------------------------------- */
2565
2566 static void
2567 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2568 {
2569 #if defined(THREADED_RTS)
2570 uint32_t i;
2571 Capability *cap;
2572
2573 for (i = from; i < to; i++) {
2574 cap = capabilities[i];
2575 ACQUIRE_LOCK(&cap->lock);
2576 startWorkerTask(cap);
2577 RELEASE_LOCK(&cap->lock);
2578 }
2579 #endif
2580 }
2581
2582 /* ---------------------------------------------------------------------------
2583 * initScheduler()
2584 *
2585 * Initialise the scheduler. This resets all the queues - if the
2586 * queues contained any threads, they'll be garbage collected at the
2587 * next pass.
2588 *
2589 * ------------------------------------------------------------------------ */
2590
2591 void
2592 initScheduler(void)
2593 {
2594 #if !defined(THREADED_RTS)
2595 blocked_queue_hd = END_TSO_QUEUE;
2596 blocked_queue_tl = END_TSO_QUEUE;
2597 sleeping_queue = END_TSO_QUEUE;
2598 #endif
2599
2600 sched_state = SCHED_RUNNING;
2601 recent_activity = ACTIVITY_YES;
2602
2603 #if defined(THREADED_RTS)
2604 /* Initialise the mutex and condition variables used by
2605 * the scheduler. */
2606 initMutex(&sched_mutex);
2607 #endif
2608
2609 ACQUIRE_LOCK(&sched_mutex);
2610
2611 /* A capability holds the state a native thread needs in
2612 * order to execute STG code. At least one capability is
2613 * floating around (only THREADED_RTS builds have more than one).
2614 */
2615 initCapabilities();
2616
2617 initTaskManager();
2618
2619 /*
2620 * Eagerly start one worker to run each Capability, except for
2621 * Capability 0. The idea is that we're probably going to start a
2622 * bound thread on Capability 0 pretty soon, so we don't want a
2623 * worker task hogging it.
2624 */
2625 startWorkerTasks(1, n_capabilities);
2626
2627 RELEASE_LOCK(&sched_mutex);
2628
2629 }
2630
2631 void
2632 exitScheduler (bool wait_foreign USED_IF_THREADS)
2633 /* see Capability.c, shutdownCapability() */
2634 {
2635 Task *task = NULL;
2636
2637 task = newBoundTask();
2638
2639 // If we haven't killed all the threads yet, do it now.
2640 if (sched_state < SCHED_SHUTTING_DOWN) {
2641 sched_state = SCHED_INTERRUPTING;
2642 Capability *cap = task->cap;
2643 waitForCapability(&cap,task);
2644 scheduleDoGC(&cap,task,true);
2645 ASSERT(task->incall->tso == NULL);
2646 releaseCapability(cap);
2647 }
2648 sched_state = SCHED_SHUTTING_DOWN;
2649
2650 shutdownCapabilities(task, wait_foreign);
2651
2652 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2653 // n_failed_trygrab_idles, n_idle_caps);
2654
2655 boundTaskExiting(task);
2656 }
2657
2658 void
2659 freeScheduler( void )
2660 {
2661 uint32_t still_running;
2662
2663 ACQUIRE_LOCK(&sched_mutex);
2664 still_running = freeTaskManager();
2665 // We can only free the Capabilities if there are no Tasks still
2666 // running. We might have a Task about to return from a foreign
2667 // call into waitForCapability(), for example (actually,
2668 // this should be the *only* thing that a still-running Task can
2669 // do at this point, and it will block waiting for the
2670 // Capability).
2671 if (still_running == 0) {
2672 freeCapabilities();
2673 }
2674 RELEASE_LOCK(&sched_mutex);
2675 #if defined(THREADED_RTS)
2676 closeMutex(&sched_mutex);
2677 #endif
2678 }
2679
2680 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2681 void *user USED_IF_NOT_THREADS)
2682 {
2683 #if !defined(THREADED_RTS)
2684 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2685 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2686 evac(user, (StgClosure **)(void *)&sleeping_queue);
2687 #endif
2688 }
2689
2690 /* -----------------------------------------------------------------------------
2691 performGC
2692
2693 This is the interface to the garbage collector from Haskell land.
2694 We provide this so that external C code can allocate and garbage
2695 collect when called from Haskell via _ccall_GC.
2696 -------------------------------------------------------------------------- */
2697
2698 static void
2699 performGC_(bool force_major)
2700 {
2701 Task *task;
2702 Capability *cap = NULL;
2703
2704 // We must grab a new Task here, because the existing Task may be
2705 // associated with a particular Capability, and chained onto the
2706 // suspended_ccalls queue.
2707 task = newBoundTask();
2708
2709 // TODO: do we need to traceTask*() here?
2710
2711 waitForCapability(&cap,task);
2712 scheduleDoGC(&cap,task,force_major);
2713 releaseCapability(cap);
2714 boundTaskExiting(task);
2715 }
2716
2717 void
2718 performGC(void)
2719 {
2720 performGC_(false);
2721 }
2722
2723 void
2724 performMajorGC(void)
2725 {
2726 performGC_(true);
2727 }
2728
2729 /* ---------------------------------------------------------------------------
2730 Interrupt execution.
2731 Might be called inside a signal handler so it mustn't do anything fancy.
2732 ------------------------------------------------------------------------ */
2733
2734 void
2735 interruptStgRts(void)
2736 {
2737 sched_state = SCHED_INTERRUPTING;
2738 interruptAllCapabilities();
2739 #if defined(THREADED_RTS)
2740 wakeUpRts();
2741 #endif
2742 }
2743
2744 /* -----------------------------------------------------------------------------
2745 Wake up the RTS
2746
2747 This function causes at least one OS thread to wake up and run the
2748 scheduler loop. It is invoked when the RTS might be deadlocked, or
2749 an external event has arrived that may need servicing (eg. a
2750 keyboard interrupt).
2751
2752 In the single-threaded RTS we don't do anything here; we only have
2753 one thread anyway, and the event that caused us to want to wake up
2754 will have interrupted any blocking system call in progress anyway.
2755 -------------------------------------------------------------------------- */
2756
2757 #if defined(THREADED_RTS)
2758 void wakeUpRts(void)
2759 {
2760 // This forces the IO Manager thread to wakeup, which will
2761 // in turn ensure that some OS thread wakes up and runs the
2762 // scheduler loop, which will cause a GC and deadlock check.
2763 ioManagerWakeup();
2764 }
2765 #endif
2766
2767 /* -----------------------------------------------------------------------------
2768 Deleting threads
2769
2770 This is used for interruption (^C) and forking, and corresponds to
2771 raising an exception but without letting the thread catch the
2772 exception.
2773 -------------------------------------------------------------------------- */
2774
2775 static void
2776 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2777 {
2778 // NOTE: must only be called on a TSO that we have exclusive
2779 // access to, because we will call throwToSingleThreaded() below.
2780 // The TSO must be on the run queue of the Capability we own, or
2781 // we must own all Capabilities.
2782
2783 if (tso->why_blocked != BlockedOnCCall &&
2784 tso->why_blocked != BlockedOnCCall_Interruptible) {
2785 throwToSingleThreaded(tso->cap,tso,NULL);
2786 }
2787 }
2788
2789 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2790 static void
2791 deleteThread_(Capability *cap, StgTSO *tso)
2792 { // for forkProcess only:
2793 // like deleteThread(), but we delete threads in foreign calls, too.
2794
2795 if (tso->why_blocked == BlockedOnCCall ||
2796 tso->why_blocked == BlockedOnCCall_Interruptible) {
2797 tso->what_next = ThreadKilled;
2798 appendToRunQueue(tso->cap, tso);
2799 } else {
2800 deleteThread(cap,tso);
2801 }
2802 }
2803 #endif
2804
2805 /* -----------------------------------------------------------------------------
2806 raiseExceptionHelper
2807
2808 This function is called by the raise# primitve, just so that we can
2809 move some of the tricky bits of raising an exception from C-- into
2810 C. Who knows, it might be a useful re-useable thing here too.
2811 -------------------------------------------------------------------------- */
2812
2813 StgWord
2814 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2815 {
2816 Capability *cap = regTableToCapability(reg);
2817 StgThunk *raise_closure = NULL;
2818 StgPtr p, next;
2819 const StgRetInfoTable *info;
2820 //
2821 // This closure represents the expression 'raise# E' where E
2822 // is the exception raise. It is used to overwrite all the
2823 // thunks which are currently under evaluation.
2824 //
2825
2826 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2827 // LDV profiling: stg_raise_info has THUNK as its closure
2828 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2829 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2830 // 1 does not cause any problem unless profiling is performed.
2831 // However, when LDV profiling goes on, we need to linearly scan
2832 // small object pool, where raise_closure is stored, so we should
2833 // use MIN_UPD_SIZE.
2834 //
2835 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2836 // sizeofW(StgClosure)+1);
2837 //
2838
2839 //
2840 // Walk up the stack, looking for the catch frame. On the way,
2841 // we update any closures pointed to from update frames with the
2842 // raise closure that we just built.
2843 //
2844 p = tso->stackobj->sp;
2845 while(1) {
2846 info = get_ret_itbl((StgClosure *)p);
2847 next = p + stack_frame_sizeW((StgClosure *)p);
2848 switch (info->i.type) {
2849
2850 case UPDATE_FRAME:
2851 // Only create raise_closure if we need to.
2852 if (raise_closure == NULL) {
2853 raise_closure =
2854 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2855 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2856 raise_closure->payload[0] = exception;
2857 }
2858 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2859 (StgClosure *)raise_closure);
2860 p = next;
2861 continue;
2862
2863 case ATOMICALLY_FRAME:
2864 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2865 tso->stackobj->sp = p;
2866 return ATOMICALLY_FRAME;
2867
2868 case CATCH_FRAME:
2869 tso->stackobj->sp = p;
2870 return CATCH_FRAME;
2871
2872 case CATCH_STM_FRAME:
2873 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2874 tso->stackobj->sp = p;
2875 return CATCH_STM_FRAME;
2876
2877 case UNDERFLOW_FRAME:
2878 tso->stackobj->sp = p;
2879 threadStackUnderflow(cap,tso);
2880 p = tso->stackobj->sp;
2881 continue;
2882
2883 case STOP_FRAME:
2884 tso->stackobj->sp = p;
2885 return STOP_FRAME;
2886
2887 case CATCH_RETRY_FRAME: {
2888 StgTRecHeader *trec = tso -> trec;
2889 StgTRecHeader *outer = trec -> enclosing_trec;
2890 debugTrace(DEBUG_stm,
2891 "found CATCH_RETRY_FRAME at %p during raise", p);
2892 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2893 stmAbortTransaction(cap, trec);
2894 stmFreeAbortedTRec(cap, trec);
2895 tso -> trec = outer;
2896 p = next;
2897 continue;
2898 }
2899
2900 default:
2901 p = next;
2902 continue;
2903 }
2904 }
2905 }
2906
2907
2908 /* -----------------------------------------------------------------------------
2909 findRetryFrameHelper
2910
2911 This function is called by the retry# primitive. It traverses the stack
2912 leaving tso->sp referring to the frame which should handle the retry.
2913
2914 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2915 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2916
2917 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2918 create) because retries are not considered to be exceptions, despite the
2919 similar implementation.
2920
2921 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2922 not be created within memory transactions.
2923 -------------------------------------------------------------------------- */
2924
2925 StgWord
2926 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2927 {
2928 const StgRetInfoTable *info;
2929 StgPtr p, next;
2930
2931 p = tso->stackobj->sp;
2932 while (1) {
2933 info = get_ret_itbl((const StgClosure *)p);
2934 next = p + stack_frame_sizeW((StgClosure *)p);
2935 switch (info->i.type) {
2936
2937 case ATOMICALLY_FRAME:
2938 debugTrace(DEBUG_stm,
2939 "found ATOMICALLY_FRAME at %p during retry", p);
2940 tso->stackobj->sp = p;
2941 return ATOMICALLY_FRAME;
2942
2943 case CATCH_RETRY_FRAME:
2944 debugTrace(DEBUG_stm,
2945 "found CATCH_RETRY_FRAME at %p during retry", p);
2946 tso->stackobj->sp = p;
2947 return CATCH_RETRY_FRAME;
2948
2949 case CATCH_STM_FRAME: {
2950 StgTRecHeader *trec = tso -> trec;
2951 StgTRecHeader *outer = trec -> enclosing_trec;
2952 debugTrace(DEBUG_stm,
2953 "found CATCH_STM_FRAME at %p during retry", p);
2954 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2955 stmAbortTransaction(cap, trec);
2956 stmFreeAbortedTRec(cap, trec);
2957 tso -> trec = outer;
2958 p = next;
2959 continue;
2960 }
2961
2962 case UNDERFLOW_FRAME:
2963 tso->stackobj->sp = p;
2964 threadStackUnderflow(cap,tso);
2965 p = tso->stackobj->sp;
2966 continue;
2967
2968 default:
2969 ASSERT(info->i.type != CATCH_FRAME);
2970 ASSERT(info->i.type != STOP_FRAME);
2971 p = next;
2972 continue;
2973 }
2974 }
2975 }
2976
2977 /* -----------------------------------------------------------------------------
2978 resurrectThreads is called after garbage collection on the list of
2979 threads found to be garbage. Each of these threads will be woken
2980 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2981 on an MVar, or NonTermination if the thread was blocked on a Black
2982 Hole.
2983
2984 Locks: assumes we hold *all* the capabilities.
2985 -------------------------------------------------------------------------- */
2986
2987 void
2988 resurrectThreads (StgTSO *threads)
2989 {
2990 StgTSO *tso, *next;
2991 Capability *cap;
2992 generation *gen;
2993
2994 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2995 next = tso->global_link;
2996
2997 gen = Bdescr((P_)tso)->gen;
2998 tso->global_link = gen->threads;
2999 gen->threads = tso;
3000
3001 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3002
3003 // Wake up the thread on the Capability it was last on
3004 cap = tso->cap;
3005
3006 switch (tso->why_blocked) {
3007 case BlockedOnMVar:
3008 case BlockedOnMVarRead:
3009 /* Called by GC - sched_mutex lock is currently held. */
3010 throwToSingleThreaded(cap, tso,
3011 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3012 break;
3013 case BlockedOnBlackHole:
3014 throwToSingleThreaded(cap, tso,
3015 (StgClosure *)nonTermination_closure);
3016 break;
3017 case BlockedOnSTM:
3018 throwToSingleThreaded(cap, tso,
3019 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3020 break;
3021 case NotBlocked:
3022 /* This might happen if the thread was blocked on a black hole
3023 * belonging to a thread that we've just woken up (raiseAsync
3024 * can wake up threads, remember...).
3025 */
3026 continue;
3027 case BlockedOnMsgThrowTo:
3028 // This can happen if the target is masking, blocks on a
3029 // black hole, and then is found to be unreachable. In
3030 // this case, we want to let the target wake up and carry
3031 // on, and do nothing to this thread.
3032 continue;
3033 default:
3034 barf("resurrectThreads: thread blocked in a strange way: %d",
3035 tso->why_blocked);
3036 }
3037 }
3038 }