Track the lengths of the thread queues
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577 cap->n_run_queue--;
578
579 IF_DEBUG(sanity, checkRunQueue(cap));
580 }
581
582 void
583 promoteInRunQueue (Capability *cap, StgTSO *tso)
584 {
585 removeFromRunQueue(cap, tso);
586 pushOnRunQueue(cap, tso);
587 }
588
589 /* ----------------------------------------------------------------------------
590 * Setting up the scheduler loop
591 * ------------------------------------------------------------------------- */
592
593 static void
594 schedulePreLoop(void)
595 {
596 // initialisation for scheduler - what cannot go into initScheduler()
597
598 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 win32AllocStack();
600 #endif
601 }
602
603 /* -----------------------------------------------------------------------------
604 * scheduleFindWork()
605 *
606 * Search for work to do, and handle messages from elsewhere.
607 * -------------------------------------------------------------------------- */
608
609 static void
610 scheduleFindWork (Capability **pcap)
611 {
612 scheduleStartSignalHandlers(*pcap);
613
614 scheduleProcessInbox(pcap);
615
616 scheduleCheckBlockedThreads(*pcap);
617
618 #if defined(THREADED_RTS)
619 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 #endif
621 }
622
623 #if defined(THREADED_RTS)
624 STATIC_INLINE rtsBool
625 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 {
627 // we need to yield this capability to someone else if..
628 // - another thread is initiating a GC, and we didn't just do a GC
629 // (see Note [GC livelock])
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 //
635 // Note [GC livelock]
636 //
637 // If we are interrupted to do a GC, then we do not immediately do
638 // another one. This avoids a starvation situation where one
639 // Capability keeps forcing a GC and the other Capabilities make no
640 // progress at all.
641
642 return ((pending_sync && !didGcLast) ||
643 cap->n_returning_tasks != 0 ||
644 (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 ? peekRunQueue(cap)->bound != NULL
646 : peekRunQueue(cap)->bound != task->incall)));
647 }
648
649 // This is the single place where a Task goes to sleep. There are
650 // two reasons it might need to sleep:
651 // - there are no threads to run
652 // - we need to yield this Capability to someone else
653 // (see shouldYieldCapability())
654 //
655 // Careful: the scheduler loop is quite delicate. Make sure you run
656 // the tests in testsuite/concurrent (all ways) after modifying this,
657 // and also check the benchmarks in nofib/parallel for regressions.
658
659 static void
660 scheduleYield (Capability **pcap, Task *task)
661 {
662 Capability *cap = *pcap;
663 int didGcLast = rtsFalse;
664
665 // if we have work, and we don't need to give up the Capability, continue.
666 //
667 if (!shouldYieldCapability(cap,task,rtsFalse) &&
668 (!emptyRunQueue(cap) ||
669 !emptyInbox(cap) ||
670 sched_state >= SCHED_INTERRUPTING)) {
671 return;
672 }
673
674 // otherwise yield (sleep), and keep yielding if necessary.
675 do {
676 didGcLast = yieldCapability(&cap,task, !didGcLast);
677 }
678 while (shouldYieldCapability(cap,task,didGcLast));
679
680 // note there may still be no threads on the run queue at this
681 // point, the caller has to check.
682
683 *pcap = cap;
684 return;
685 }
686 #endif
687
688 /* -----------------------------------------------------------------------------
689 * schedulePushWork()
690 *
691 * Push work to other Capabilities if we have some.
692 * -------------------------------------------------------------------------- */
693
694 static void
695 schedulePushWork(Capability *cap USED_IF_THREADS,
696 Task *task USED_IF_THREADS)
697 {
698 /* following code not for PARALLEL_HASKELL. I kept the call general,
699 future GUM versions might use pushing in a distributed setup */
700 #if defined(THREADED_RTS)
701
702 Capability *free_caps[n_capabilities], *cap0;
703 uint32_t i, n_wanted_caps, n_free_caps;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Figure out how many capabilities we want to wake up. We need at least
709 // sparkPoolSize(cap) plus the number of spare threads we have.
710 n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
711
712 if (n_wanted_caps == 0) return;
713
714 // First grab as many free Capabilities as we can. ToDo: we should use
715 // capabilities on the same NUMA node preferably, but not exclusively.
716 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
717 n_free_caps < n_wanted_caps && i != cap->no;
718 i = (i + 1) % n_capabilities) {
719 cap0 = capabilities[i];
720 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
721 if (!emptyRunQueue(cap0)
722 || cap0->n_returning_tasks != 0
723 || cap0->inbox != (Message*)END_TSO_QUEUE) {
724 // it already has some work, we just grabbed it at
725 // the wrong moment. Or maybe it's deadlocked!
726 releaseCapability(cap0);
727 } else {
728 free_caps[n_free_caps++] = cap0;
729 }
730 }
731 }
732
733 // we now have n_free_caps free capabilities stashed in
734 // free_caps[]. Share our run queue equally with them. This is
735 // probably the simplest thing we could do; improvements we might
736 // want to do include:
737 //
738 // - giving high priority to moving relatively new threads, on
739 // the gournds that they haven't had time to build up a
740 // working set in the cache on this CPU/Capability.
741 //
742 // - giving low priority to moving long-lived threads
743
744 if (n_free_caps > 0) {
745 StgTSO *prev, *t, *next;
746 #ifdef SPARK_PUSHING
747 rtsBool pushed_to_all;
748 #endif
749
750 debugTrace(DEBUG_sched,
751 "cap %d: %s and %d free capabilities, sharing...",
752 cap->no,
753 (cap->n_run_queue > 1)?
754 "excess threads on run queue":"sparks to share (>=2)",
755 n_free_caps);
756
757 i = 0;
758 #ifdef SPARK_PUSHING
759 pushed_to_all = rtsFalse;
760 #endif
761
762 if (cap->run_queue_hd != END_TSO_QUEUE) {
763 prev = cap->run_queue_hd;
764 t = prev->_link;
765 prev->_link = END_TSO_QUEUE;
766 for (; t != END_TSO_QUEUE; t = next) {
767 next = t->_link;
768 t->_link = END_TSO_QUEUE;
769 if (t->bound == task->incall // don't move my bound thread
770 || tsoLocked(t)) { // don't move a locked thread
771 setTSOLink(cap, prev, t);
772 setTSOPrev(cap, t, prev);
773 prev = t;
774 } else if (i == n_free_caps) {
775 #ifdef SPARK_PUSHING
776 pushed_to_all = rtsTrue;
777 #endif
778 i = 0;
779 // keep one for us
780 setTSOLink(cap, prev, t);
781 setTSOPrev(cap, t, prev);
782 prev = t;
783 } else {
784 appendToRunQueue(free_caps[i],t);
785 cap->n_run_queue--;
786
787 traceEventMigrateThread (cap, t, free_caps[i]->no);
788
789 if (t->bound) { t->bound->task->cap = free_caps[i]; }
790 t->cap = free_caps[i];
791 i++;
792 }
793 }
794 cap->run_queue_tl = prev;
795
796 IF_DEBUG(sanity, checkRunQueue(cap));
797 }
798
799 #ifdef SPARK_PUSHING
800 /* JB I left this code in place, it would work but is not necessary */
801
802 // If there are some free capabilities that we didn't push any
803 // threads to, then try to push a spark to each one.
804 if (!pushed_to_all) {
805 StgClosure *spark;
806 // i is the next free capability to push to
807 for (; i < n_free_caps; i++) {
808 if (emptySparkPoolCap(free_caps[i])) {
809 spark = tryStealSpark(cap->sparks);
810 if (spark != NULL) {
811 /* TODO: if anyone wants to re-enable this code then
812 * they must consider the fizzledSpark(spark) case
813 * and update the per-cap spark statistics.
814 */
815 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
816
817 traceEventStealSpark(free_caps[i], t, cap->no);
818
819 newSpark(&(free_caps[i]->r), spark);
820 }
821 }
822 }
823 }
824 #endif /* SPARK_PUSHING */
825
826 // release the capabilities
827 for (i = 0; i < n_free_caps; i++) {
828 task->cap = free_caps[i];
829 if (sparkPoolSizeCap(cap) > 0) {
830 // If we have sparks to steal, wake up a worker on the
831 // capability, even if it has no threads to run.
832 releaseAndWakeupCapability(free_caps[i]);
833 } else {
834 releaseCapability(free_caps[i]);
835 }
836 }
837 }
838 task->cap = cap; // reset to point to our Capability.
839
840 #endif /* THREADED_RTS */
841
842 }
843
844 /* ----------------------------------------------------------------------------
845 * Start any pending signal handlers
846 * ------------------------------------------------------------------------- */
847
848 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
849 static void
850 scheduleStartSignalHandlers(Capability *cap)
851 {
852 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
853 // safe outside the lock
854 startSignalHandlers(cap);
855 }
856 }
857 #else
858 static void
859 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
860 {
861 }
862 #endif
863
864 /* ----------------------------------------------------------------------------
865 * Check for blocked threads that can be woken up.
866 * ------------------------------------------------------------------------- */
867
868 static void
869 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
870 {
871 #if !defined(THREADED_RTS)
872 //
873 // Check whether any waiting threads need to be woken up. If the
874 // run queue is empty, and there are no other tasks running, we
875 // can wait indefinitely for something to happen.
876 //
877 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
878 {
879 awaitEvent (emptyRunQueue(cap));
880 }
881 #endif
882 }
883
884 /* ----------------------------------------------------------------------------
885 * Detect deadlock conditions and attempt to resolve them.
886 * ------------------------------------------------------------------------- */
887
888 static void
889 scheduleDetectDeadlock (Capability **pcap, Task *task)
890 {
891 Capability *cap = *pcap;
892 /*
893 * Detect deadlock: when we have no threads to run, there are no
894 * threads blocked, waiting for I/O, or sleeping, and all the
895 * other tasks are waiting for work, we must have a deadlock of
896 * some description.
897 */
898 if ( emptyThreadQueues(cap) )
899 {
900 #if defined(THREADED_RTS)
901 /*
902 * In the threaded RTS, we only check for deadlock if there
903 * has been no activity in a complete timeslice. This means
904 * we won't eagerly start a full GC just because we don't have
905 * any threads to run currently.
906 */
907 if (recent_activity != ACTIVITY_INACTIVE) return;
908 #endif
909
910 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
911
912 // Garbage collection can release some new threads due to
913 // either (a) finalizers or (b) threads resurrected because
914 // they are unreachable and will therefore be sent an
915 // exception. Any threads thus released will be immediately
916 // runnable.
917 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
918 cap = *pcap;
919 // when force_major == rtsTrue. scheduleDoGC sets
920 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
921 // signal.
922
923 if ( !emptyRunQueue(cap) ) return;
924
925 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
926 /* If we have user-installed signal handlers, then wait
927 * for signals to arrive rather then bombing out with a
928 * deadlock.
929 */
930 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
931 debugTrace(DEBUG_sched,
932 "still deadlocked, waiting for signals...");
933
934 awaitUserSignals();
935
936 if (signals_pending()) {
937 startSignalHandlers(cap);
938 }
939
940 // either we have threads to run, or we were interrupted:
941 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
942
943 return;
944 }
945 #endif
946
947 #if !defined(THREADED_RTS)
948 /* Probably a real deadlock. Send the current main thread the
949 * Deadlock exception.
950 */
951 if (task->incall->tso) {
952 switch (task->incall->tso->why_blocked) {
953 case BlockedOnSTM:
954 case BlockedOnBlackHole:
955 case BlockedOnMsgThrowTo:
956 case BlockedOnMVar:
957 case BlockedOnMVarRead:
958 throwToSingleThreaded(cap, task->incall->tso,
959 (StgClosure *)nonTermination_closure);
960 return;
961 default:
962 barf("deadlock: main thread blocked in a strange way");
963 }
964 }
965 return;
966 #endif
967 }
968 }
969
970
971 /* ----------------------------------------------------------------------------
972 * Process message in the current Capability's inbox
973 * ------------------------------------------------------------------------- */
974
975 static void
976 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
977 {
978 #if defined(THREADED_RTS)
979 Message *m, *next;
980 int r;
981 Capability *cap = *pcap;
982
983 while (!emptyInbox(cap)) {
984 if (cap->r.rCurrentNursery->link == NULL ||
985 g0->n_new_large_words >= large_alloc_lim) {
986 scheduleDoGC(pcap, cap->running_task, rtsFalse);
987 cap = *pcap;
988 }
989
990 // don't use a blocking acquire; if the lock is held by
991 // another thread then just carry on. This seems to avoid
992 // getting stuck in a message ping-pong situation with other
993 // processors. We'll check the inbox again later anyway.
994 //
995 // We should really use a more efficient queue data structure
996 // here. The trickiness is that we must ensure a Capability
997 // never goes idle if the inbox is non-empty, which is why we
998 // use cap->lock (cap->lock is released as the last thing
999 // before going idle; see Capability.c:releaseCapability()).
1000 r = TRY_ACQUIRE_LOCK(&cap->lock);
1001 if (r != 0) return;
1002
1003 m = cap->inbox;
1004 cap->inbox = (Message*)END_TSO_QUEUE;
1005
1006 RELEASE_LOCK(&cap->lock);
1007
1008 while (m != (Message*)END_TSO_QUEUE) {
1009 next = m->link;
1010 executeMessage(cap, m);
1011 m = next;
1012 }
1013 }
1014 #endif
1015 }
1016
1017 /* ----------------------------------------------------------------------------
1018 * Activate spark threads (THREADED_RTS)
1019 * ------------------------------------------------------------------------- */
1020
1021 #if defined(THREADED_RTS)
1022 static void
1023 scheduleActivateSpark(Capability *cap)
1024 {
1025 if (anySparks() && !cap->disabled)
1026 {
1027 createSparkThread(cap);
1028 debugTrace(DEBUG_sched, "creating a spark thread");
1029 }
1030 }
1031 #endif // THREADED_RTS
1032
1033 /* ----------------------------------------------------------------------------
1034 * After running a thread...
1035 * ------------------------------------------------------------------------- */
1036
1037 static void
1038 schedulePostRunThread (Capability *cap, StgTSO *t)
1039 {
1040 // We have to be able to catch transactions that are in an
1041 // infinite loop as a result of seeing an inconsistent view of
1042 // memory, e.g.
1043 //
1044 // atomically $ do
1045 // [a,b] <- mapM readTVar [ta,tb]
1046 // when (a == b) loop
1047 //
1048 // and a is never equal to b given a consistent view of memory.
1049 //
1050 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1051 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1052 debugTrace(DEBUG_sched | DEBUG_stm,
1053 "trec %p found wasting its time", t);
1054
1055 // strip the stack back to the
1056 // ATOMICALLY_FRAME, aborting the (nested)
1057 // transaction, and saving the stack of any
1058 // partially-evaluated thunks on the heap.
1059 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1060
1061 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1062 }
1063 }
1064
1065 //
1066 // If the current thread's allocation limit has run out, send it
1067 // the AllocationLimitExceeded exception.
1068
1069 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1070 // Use a throwToSelf rather than a throwToSingleThreaded, because
1071 // it correctly handles the case where the thread is currently
1072 // inside mask. Also the thread might be blocked (e.g. on an
1073 // MVar), and throwToSingleThreaded doesn't unblock it
1074 // correctly in that case.
1075 throwToSelf(cap, t, allocationLimitExceeded_closure);
1076 ASSIGN_Int64((W_*)&(t->alloc_limit),
1077 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1078 }
1079
1080 /* some statistics gathering in the parallel case */
1081 }
1082
1083 /* -----------------------------------------------------------------------------
1084 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1085 * -------------------------------------------------------------------------- */
1086
1087 static rtsBool
1088 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1089 {
1090 if (cap->r.rHpLim == NULL || cap->context_switch) {
1091 // Sometimes we miss a context switch, e.g. when calling
1092 // primitives in a tight loop, MAYBE_GC() doesn't check the
1093 // context switch flag, and we end up waiting for a GC.
1094 // See #1984, and concurrent/should_run/1984
1095 cap->context_switch = 0;
1096 appendToRunQueue(cap,t);
1097 } else {
1098 pushOnRunQueue(cap,t);
1099 }
1100
1101 // did the task ask for a large block?
1102 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1103 // if so, get one and push it on the front of the nursery.
1104 bdescr *bd;
1105 W_ blocks;
1106
1107 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1108
1109 if (blocks > BLOCKS_PER_MBLOCK) {
1110 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1111 }
1112
1113 debugTrace(DEBUG_sched,
1114 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1115 (long)t->id, what_next_strs[t->what_next], blocks);
1116
1117 // don't do this if the nursery is (nearly) full, we'll GC first.
1118 if (cap->r.rCurrentNursery->link != NULL ||
1119 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1120 // infinite loop if the
1121 // nursery has only one
1122 // block.
1123
1124 bd = allocGroupOnNode_lock(cap->node,blocks);
1125 cap->r.rNursery->n_blocks += blocks;
1126
1127 // link the new group after CurrentNursery
1128 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1129
1130 // initialise it as a nursery block. We initialise the
1131 // step, gen_no, and flags field of *every* sub-block in
1132 // this large block, because this is easier than making
1133 // sure that we always find the block head of a large
1134 // block whenever we call Bdescr() (eg. evacuate() and
1135 // isAlive() in the GC would both have to do this, at
1136 // least).
1137 {
1138 bdescr *x;
1139 for (x = bd; x < bd + blocks; x++) {
1140 initBdescr(x,g0,g0);
1141 x->free = x->start;
1142 x->flags = 0;
1143 }
1144 }
1145
1146 // This assert can be a killer if the app is doing lots
1147 // of large block allocations.
1148 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1149
1150 // now update the nursery to point to the new block
1151 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1152 cap->r.rCurrentNursery = bd;
1153
1154 // we might be unlucky and have another thread get on the
1155 // run queue before us and steal the large block, but in that
1156 // case the thread will just end up requesting another large
1157 // block.
1158 return rtsFalse; /* not actually GC'ing */
1159 }
1160 }
1161
1162 // if we got here because we exceeded large_alloc_lim, then
1163 // proceed straight to GC.
1164 if (g0->n_new_large_words >= large_alloc_lim) {
1165 return rtsTrue;
1166 }
1167
1168 // Otherwise, we just ran out of space in the current nursery.
1169 // Grab another nursery if we can.
1170 if (getNewNursery(cap)) {
1171 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1172 return rtsFalse;
1173 }
1174
1175 return rtsTrue;
1176 /* actual GC is done at the end of the while loop in schedule() */
1177 }
1178
1179 /* -----------------------------------------------------------------------------
1180 * Handle a thread that returned to the scheduler with ThreadYielding
1181 * -------------------------------------------------------------------------- */
1182
1183 static rtsBool
1184 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1185 {
1186 /* put the thread back on the run queue. Then, if we're ready to
1187 * GC, check whether this is the last task to stop. If so, wake
1188 * up the GC thread. getThread will block during a GC until the
1189 * GC is finished.
1190 */
1191
1192 ASSERT(t->_link == END_TSO_QUEUE);
1193
1194 // Shortcut if we're just switching evaluators: don't bother
1195 // doing stack squeezing (which can be expensive), just run the
1196 // thread.
1197 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1198 debugTrace(DEBUG_sched,
1199 "--<< thread %ld (%s) stopped to switch evaluators",
1200 (long)t->id, what_next_strs[t->what_next]);
1201 return rtsTrue;
1202 }
1203
1204 // Reset the context switch flag. We don't do this just before
1205 // running the thread, because that would mean we would lose ticks
1206 // during GC, which can lead to unfair scheduling (a thread hogs
1207 // the CPU because the tick always arrives during GC). This way
1208 // penalises threads that do a lot of allocation, but that seems
1209 // better than the alternative.
1210 if (cap->context_switch != 0) {
1211 cap->context_switch = 0;
1212 appendToRunQueue(cap,t);
1213 } else {
1214 pushOnRunQueue(cap,t);
1215 }
1216
1217 IF_DEBUG(sanity,
1218 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1219 checkTSO(t));
1220
1221 return rtsFalse;
1222 }
1223
1224 /* -----------------------------------------------------------------------------
1225 * Handle a thread that returned to the scheduler with ThreadBlocked
1226 * -------------------------------------------------------------------------- */
1227
1228 static void
1229 scheduleHandleThreadBlocked( StgTSO *t
1230 #if !defined(DEBUG)
1231 STG_UNUSED
1232 #endif
1233 )
1234 {
1235
1236 // We don't need to do anything. The thread is blocked, and it
1237 // has tidied up its stack and placed itself on whatever queue
1238 // it needs to be on.
1239
1240 // ASSERT(t->why_blocked != NotBlocked);
1241 // Not true: for example,
1242 // - the thread may have woken itself up already, because
1243 // threadPaused() might have raised a blocked throwTo
1244 // exception, see maybePerformBlockedException().
1245
1246 #ifdef DEBUG
1247 traceThreadStatus(DEBUG_sched, t);
1248 #endif
1249 }
1250
1251 /* -----------------------------------------------------------------------------
1252 * Handle a thread that returned to the scheduler with ThreadFinished
1253 * -------------------------------------------------------------------------- */
1254
1255 static rtsBool
1256 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1257 {
1258 /* Need to check whether this was a main thread, and if so,
1259 * return with the return value.
1260 *
1261 * We also end up here if the thread kills itself with an
1262 * uncaught exception, see Exception.cmm.
1263 */
1264
1265 // blocked exceptions can now complete, even if the thread was in
1266 // blocked mode (see #2910).
1267 awakenBlockedExceptionQueue (cap, t);
1268
1269 //
1270 // Check whether the thread that just completed was a bound
1271 // thread, and if so return with the result.
1272 //
1273 // There is an assumption here that all thread completion goes
1274 // through this point; we need to make sure that if a thread
1275 // ends up in the ThreadKilled state, that it stays on the run
1276 // queue so it can be dealt with here.
1277 //
1278
1279 if (t->bound) {
1280
1281 if (t->bound != task->incall) {
1282 #if !defined(THREADED_RTS)
1283 // Must be a bound thread that is not the topmost one. Leave
1284 // it on the run queue until the stack has unwound to the
1285 // point where we can deal with this. Leaving it on the run
1286 // queue also ensures that the garbage collector knows about
1287 // this thread and its return value (it gets dropped from the
1288 // step->threads list so there's no other way to find it).
1289 appendToRunQueue(cap,t);
1290 return rtsFalse;
1291 #else
1292 // this cannot happen in the threaded RTS, because a
1293 // bound thread can only be run by the appropriate Task.
1294 barf("finished bound thread that isn't mine");
1295 #endif
1296 }
1297
1298 ASSERT(task->incall->tso == t);
1299
1300 if (t->what_next == ThreadComplete) {
1301 if (task->incall->ret) {
1302 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1303 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1304 }
1305 task->incall->rstat = Success;
1306 } else {
1307 if (task->incall->ret) {
1308 *(task->incall->ret) = NULL;
1309 }
1310 if (sched_state >= SCHED_INTERRUPTING) {
1311 if (heap_overflow) {
1312 task->incall->rstat = HeapExhausted;
1313 } else {
1314 task->incall->rstat = Interrupted;
1315 }
1316 } else {
1317 task->incall->rstat = Killed;
1318 }
1319 }
1320 #ifdef DEBUG
1321 removeThreadLabel((StgWord)task->incall->tso->id);
1322 #endif
1323
1324 // We no longer consider this thread and task to be bound to
1325 // each other. The TSO lives on until it is GC'd, but the
1326 // task is about to be released by the caller, and we don't
1327 // want anyone following the pointer from the TSO to the
1328 // defunct task (which might have already been
1329 // re-used). This was a real bug: the GC updated
1330 // tso->bound->tso which lead to a deadlock.
1331 t->bound = NULL;
1332 task->incall->tso = NULL;
1333
1334 return rtsTrue; // tells schedule() to return
1335 }
1336
1337 return rtsFalse;
1338 }
1339
1340 /* -----------------------------------------------------------------------------
1341 * Perform a heap census
1342 * -------------------------------------------------------------------------- */
1343
1344 static rtsBool
1345 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1346 {
1347 // When we have +RTS -i0 and we're heap profiling, do a census at
1348 // every GC. This lets us get repeatable runs for debugging.
1349 if (performHeapProfile ||
1350 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1351 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1352 return rtsTrue;
1353 } else {
1354 return rtsFalse;
1355 }
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359 * stopAllCapabilities()
1360 *
1361 * Stop all Haskell execution. This is used when we need to make some global
1362 * change to the system, such as altering the number of capabilities, or
1363 * forking.
1364 *
1365 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1366 * -------------------------------------------------------------------------- */
1367
1368 #if defined(THREADED_RTS)
1369 static void stopAllCapabilities (Capability **pCap, Task *task)
1370 {
1371 rtsBool was_syncing;
1372 SyncType prev_sync_type;
1373
1374 PendingSync sync = {
1375 .type = SYNC_OTHER,
1376 .idle = NULL,
1377 .task = task
1378 };
1379
1380 do {
1381 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1382 } while (was_syncing);
1383
1384 acquireAllCapabilities(*pCap,task);
1385
1386 pending_sync = 0;
1387 }
1388 #endif
1389
1390 /* -----------------------------------------------------------------------------
1391 * requestSync()
1392 *
1393 * Commence a synchronisation between all capabilities. Normally not called
1394 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1395 * has some special synchronisation requirements.
1396 *
1397 * Returns:
1398 * rtsFalse if we successfully got a sync
1399 * rtsTrue if there was another sync request in progress,
1400 * and we yielded to it. The value returned is the
1401 * type of the other sync request.
1402 * -------------------------------------------------------------------------- */
1403
1404 #if defined(THREADED_RTS)
1405 static rtsBool requestSync (
1406 Capability **pcap, Task *task, PendingSync *new_sync,
1407 SyncType *prev_sync_type)
1408 {
1409 PendingSync *sync;
1410
1411 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1412 (StgWord)NULL,
1413 (StgWord)new_sync);
1414
1415 if (sync != NULL)
1416 {
1417 // sync is valid until we have called yieldCapability().
1418 // After the sync is completed, we cannot read that struct any
1419 // more because it has been freed.
1420 *prev_sync_type = sync->type;
1421 do {
1422 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1423 sync->type);
1424 ASSERT(*pcap);
1425 yieldCapability(pcap,task,rtsTrue);
1426 sync = pending_sync;
1427 } while (sync != NULL);
1428
1429 // NOTE: task->cap might have changed now
1430 return rtsTrue;
1431 }
1432 else
1433 {
1434 return rtsFalse;
1435 }
1436 }
1437 #endif
1438
1439 /* -----------------------------------------------------------------------------
1440 * acquireAllCapabilities()
1441 *
1442 * Grab all the capabilities except the one we already hold. Used
1443 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1444 * before a fork (SYNC_OTHER).
1445 *
1446 * Only call this after requestSync(), otherwise a deadlock might
1447 * ensue if another thread is trying to synchronise.
1448 * -------------------------------------------------------------------------- */
1449
1450 #ifdef THREADED_RTS
1451 static void acquireAllCapabilities(Capability *cap, Task *task)
1452 {
1453 Capability *tmpcap;
1454 uint32_t i;
1455
1456 ASSERT(pending_sync != NULL);
1457 for (i=0; i < n_capabilities; i++) {
1458 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1459 i, n_capabilities);
1460 tmpcap = capabilities[i];
1461 if (tmpcap != cap) {
1462 // we better hope this task doesn't get migrated to
1463 // another Capability while we're waiting for this one.
1464 // It won't, because load balancing happens while we have
1465 // all the Capabilities, but even so it's a slightly
1466 // unsavoury invariant.
1467 task->cap = tmpcap;
1468 waitForCapability(&tmpcap, task);
1469 if (tmpcap->no != i) {
1470 barf("acquireAllCapabilities: got the wrong capability");
1471 }
1472 }
1473 }
1474 task->cap = cap;
1475 }
1476 #endif
1477
1478 /* -----------------------------------------------------------------------------
1479 * releaseAllcapabilities()
1480 *
1481 * Assuming this thread holds all the capabilities, release them all except for
1482 * the one passed in as cap.
1483 * -------------------------------------------------------------------------- */
1484
1485 #ifdef THREADED_RTS
1486 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1487 {
1488 uint32_t i;
1489
1490 for (i = 0; i < n; i++) {
1491 if (cap->no != i) {
1492 task->cap = capabilities[i];
1493 releaseCapability(capabilities[i]);
1494 }
1495 }
1496 task->cap = cap;
1497 }
1498 #endif
1499
1500 /* -----------------------------------------------------------------------------
1501 * Perform a garbage collection if necessary
1502 * -------------------------------------------------------------------------- */
1503
1504 static void
1505 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1506 rtsBool force_major)
1507 {
1508 Capability *cap = *pcap;
1509 rtsBool heap_census;
1510 uint32_t collect_gen;
1511 rtsBool major_gc;
1512 #ifdef THREADED_RTS
1513 uint32_t gc_type;
1514 uint32_t i;
1515 uint32_t need_idle;
1516 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1517 StgTSO *tso;
1518 rtsBool *idle_cap;
1519 #endif
1520
1521 if (sched_state == SCHED_SHUTTING_DOWN) {
1522 // The final GC has already been done, and the system is
1523 // shutting down. We'll probably deadlock if we try to GC
1524 // now.
1525 return;
1526 }
1527
1528 heap_census = scheduleNeedHeapProfile(rtsTrue);
1529
1530 // Figure out which generation we are collecting, so that we can
1531 // decide whether this is a parallel GC or not.
1532 collect_gen = calcNeeded(force_major || heap_census, NULL);
1533 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1534
1535 #ifdef THREADED_RTS
1536 if (sched_state < SCHED_INTERRUPTING
1537 && RtsFlags.ParFlags.parGcEnabled
1538 && collect_gen >= RtsFlags.ParFlags.parGcGen
1539 && ! oldest_gen->mark)
1540 {
1541 gc_type = SYNC_GC_PAR;
1542 } else {
1543 gc_type = SYNC_GC_SEQ;
1544 }
1545
1546 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1547 need_idle = stg_max(0, enabled_capabilities -
1548 RtsFlags.ParFlags.parGcThreads);
1549 } else {
1550 need_idle = 0;
1551 }
1552
1553 // In order to GC, there must be no threads running Haskell code.
1554 // Therefore, the GC thread needs to hold *all* the capabilities,
1555 // and release them after the GC has completed.
1556 //
1557 // This seems to be the simplest way: previous attempts involved
1558 // making all the threads with capabilities give up their
1559 // capabilities and sleep except for the *last* one, which
1560 // actually did the GC. But it's quite hard to arrange for all
1561 // the other tasks to sleep and stay asleep.
1562 //
1563
1564 /* Other capabilities are prevented from running yet more Haskell
1565 threads if pending_sync is set. Tested inside
1566 yieldCapability() and releaseCapability() in Capability.c */
1567
1568 PendingSync sync = {
1569 .type = gc_type,
1570 .idle = NULL,
1571 .task = task
1572 };
1573
1574 {
1575 SyncType prev_sync = 0;
1576 rtsBool was_syncing;
1577 do {
1578 // We need an array of size n_capabilities, but since this may
1579 // change each time around the loop we must allocate it afresh.
1580 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1581 sizeof(rtsBool),
1582 "scheduleDoGC");
1583 sync.idle = idle_cap;
1584
1585 // When using +RTS -qn, we need some capabilities to be idle during
1586 // GC. The best bet is to choose some inactive ones, so we look for
1587 // those first:
1588 uint32_t n_idle = need_idle;
1589 for (i=0; i < n_capabilities; i++) {
1590 if (capabilities[i]->disabled) {
1591 idle_cap[i] = rtsTrue;
1592 } else if (n_idle > 0 &&
1593 capabilities[i]->running_task == NULL) {
1594 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1595 n_idle--;
1596 idle_cap[i] = rtsTrue;
1597 } else {
1598 idle_cap[i] = rtsFalse;
1599 }
1600 }
1601 // If we didn't find enough inactive capabilities, just pick some
1602 // more to be idle.
1603 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1604 if (!idle_cap[i] && i != cap->no) {
1605 idle_cap[i] = rtsTrue;
1606 n_idle--;
1607 }
1608 }
1609 ASSERT(n_idle == 0);
1610
1611 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1612 cap = *pcap;
1613 if (was_syncing) {
1614 stgFree(idle_cap);
1615 }
1616 if (was_syncing &&
1617 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1618 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1619 // someone else had a pending sync request for a GC, so
1620 // let's assume GC has been done and we don't need to GC
1621 // again.
1622 // Exception to this: if SCHED_INTERRUPTING, then we still
1623 // need to do the final GC.
1624 return;
1625 }
1626 if (sched_state == SCHED_SHUTTING_DOWN) {
1627 // The scheduler might now be shutting down. We tested
1628 // this above, but it might have become true since then as
1629 // we yielded the capability in requestSync().
1630 return;
1631 }
1632 } while (was_syncing);
1633 }
1634
1635 stat_startGCSync(gc_threads[cap->no]);
1636
1637 #ifdef DEBUG
1638 unsigned int old_n_capabilities = n_capabilities;
1639 #endif
1640
1641 interruptAllCapabilities();
1642
1643 // The final shutdown GC is always single-threaded, because it's
1644 // possible that some of the Capabilities have no worker threads.
1645
1646 if (gc_type == SYNC_GC_SEQ) {
1647 traceEventRequestSeqGc(cap);
1648 } else {
1649 traceEventRequestParGc(cap);
1650 }
1651
1652 if (gc_type == SYNC_GC_SEQ) {
1653 // single-threaded GC: grab all the capabilities
1654 acquireAllCapabilities(cap,task);
1655 }
1656 else
1657 {
1658 // If we are load-balancing collections in this
1659 // generation, then we require all GC threads to participate
1660 // in the collection. Otherwise, we only require active
1661 // threads to participate, and we set gc_threads[i]->idle for
1662 // any idle capabilities. The rationale here is that waking
1663 // up an idle Capability takes much longer than just doing any
1664 // GC work on its behalf.
1665
1666 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1667 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1668 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1669 {
1670 for (i=0; i < n_capabilities; i++) {
1671 if (capabilities[i]->disabled) {
1672 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1673 if (idle_cap[i]) {
1674 n_idle_caps++;
1675 }
1676 } else {
1677 if (i != cap->no && idle_cap[i]) {
1678 Capability *tmpcap = capabilities[i];
1679 task->cap = tmpcap;
1680 waitForCapability(&tmpcap, task);
1681 n_idle_caps++;
1682 }
1683 }
1684 }
1685 }
1686 else
1687 {
1688 for (i=0; i < n_capabilities; i++) {
1689 if (capabilities[i]->disabled) {
1690 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1691 if (idle_cap[i]) {
1692 n_idle_caps++;
1693 }
1694 } else if (i != cap->no &&
1695 capabilities[i]->idle >=
1696 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1697 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1698 if (idle_cap[i]) {
1699 n_idle_caps++;
1700 } else {
1701 n_failed_trygrab_idles++;
1702 }
1703 }
1704 }
1705 }
1706 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1707
1708 // We set the gc_thread[i]->idle flag if that
1709 // capability/thread is not participating in this collection.
1710 // We also keep a local record of which capabilities are idle
1711 // in idle_cap[], because scheduleDoGC() is re-entrant:
1712 // another thread might start a GC as soon as we've finished
1713 // this one, and thus the gc_thread[]->idle flags are invalid
1714 // as soon as we release any threads after GC. Getting this
1715 // wrong leads to a rare and hard to debug deadlock!
1716
1717 for (i=0; i < n_capabilities; i++) {
1718 gc_threads[i]->idle = idle_cap[i];
1719 capabilities[i]->idle++;
1720 }
1721
1722 // For all capabilities participating in this GC, wait until
1723 // they have stopped mutating and are standing by for GC.
1724 waitForGcThreads(cap);
1725
1726 #if defined(THREADED_RTS)
1727 // Stable point where we can do a global check on our spark counters
1728 ASSERT(checkSparkCountInvariant());
1729 #endif
1730 }
1731
1732 #endif
1733
1734 IF_DEBUG(scheduler, printAllThreads());
1735
1736 delete_threads_and_gc:
1737 /*
1738 * We now have all the capabilities; if we're in an interrupting
1739 * state, then we should take the opportunity to delete all the
1740 * threads in the system.
1741 * Checking for major_gc ensures that the last GC is major.
1742 */
1743 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1744 deleteAllThreads(cap);
1745 #if defined(THREADED_RTS)
1746 // Discard all the sparks from every Capability. Why?
1747 // They'll probably be GC'd anyway since we've killed all the
1748 // threads. It just avoids the GC having to do any work to
1749 // figure out that any remaining sparks are garbage.
1750 for (i = 0; i < n_capabilities; i++) {
1751 capabilities[i]->spark_stats.gcd +=
1752 sparkPoolSize(capabilities[i]->sparks);
1753 // No race here since all Caps are stopped.
1754 discardSparksCap(capabilities[i]);
1755 }
1756 #endif
1757 sched_state = SCHED_SHUTTING_DOWN;
1758 }
1759
1760 /*
1761 * When there are disabled capabilities, we want to migrate any
1762 * threads away from them. Normally this happens in the
1763 * scheduler's loop, but only for unbound threads - it's really
1764 * hard for a bound thread to migrate itself. So we have another
1765 * go here.
1766 */
1767 #if defined(THREADED_RTS)
1768 for (i = enabled_capabilities; i < n_capabilities; i++) {
1769 Capability *tmp_cap, *dest_cap;
1770 tmp_cap = capabilities[i];
1771 ASSERT(tmp_cap->disabled);
1772 if (i != cap->no) {
1773 dest_cap = capabilities[i % enabled_capabilities];
1774 while (!emptyRunQueue(tmp_cap)) {
1775 tso = popRunQueue(tmp_cap);
1776 migrateThread(tmp_cap, tso, dest_cap);
1777 if (tso->bound) {
1778 traceTaskMigrate(tso->bound->task,
1779 tso->bound->task->cap,
1780 dest_cap);
1781 tso->bound->task->cap = dest_cap;
1782 }
1783 }
1784 }
1785 }
1786 #endif
1787
1788 #if defined(THREADED_RTS)
1789 // reset pending_sync *before* GC, so that when the GC threads
1790 // emerge they don't immediately re-enter the GC.
1791 pending_sync = 0;
1792 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1793 #else
1794 GarbageCollect(collect_gen, heap_census, 0, cap);
1795 #endif
1796
1797 traceSparkCounters(cap);
1798
1799 switch (recent_activity) {
1800 case ACTIVITY_INACTIVE:
1801 if (force_major) {
1802 // We are doing a GC because the system has been idle for a
1803 // timeslice and we need to check for deadlock. Record the
1804 // fact that we've done a GC and turn off the timer signal;
1805 // it will get re-enabled if we run any threads after the GC.
1806 recent_activity = ACTIVITY_DONE_GC;
1807 #ifndef PROFILING
1808 stopTimer();
1809 #endif
1810 break;
1811 }
1812 // fall through...
1813
1814 case ACTIVITY_MAYBE_NO:
1815 // the GC might have taken long enough for the timer to set
1816 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1817 // but we aren't necessarily deadlocked:
1818 recent_activity = ACTIVITY_YES;
1819 break;
1820
1821 case ACTIVITY_DONE_GC:
1822 // If we are actually active, the scheduler will reset the
1823 // recent_activity flag and re-enable the timer.
1824 break;
1825 }
1826
1827 #if defined(THREADED_RTS)
1828 // Stable point where we can do a global check on our spark counters
1829 ASSERT(checkSparkCountInvariant());
1830 #endif
1831
1832 // The heap census itself is done during GarbageCollect().
1833 if (heap_census) {
1834 performHeapProfile = rtsFalse;
1835 }
1836
1837 #if defined(THREADED_RTS)
1838
1839 // If n_capabilities has changed during GC, we're in trouble.
1840 ASSERT(n_capabilities == old_n_capabilities);
1841
1842 if (gc_type == SYNC_GC_PAR)
1843 {
1844 releaseGCThreads(cap);
1845 for (i = 0; i < n_capabilities; i++) {
1846 if (i != cap->no) {
1847 if (idle_cap[i]) {
1848 ASSERT(capabilities[i]->running_task == task);
1849 task->cap = capabilities[i];
1850 releaseCapability(capabilities[i]);
1851 } else {
1852 ASSERT(capabilities[i]->running_task != task);
1853 }
1854 }
1855 }
1856 task->cap = cap;
1857 }
1858 #endif
1859
1860 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1861 // GC set the heap_overflow flag, so we should proceed with
1862 // an orderly shutdown now. Ultimately we want the main
1863 // thread to return to its caller with HeapExhausted, at which
1864 // point the caller should call hs_exit(). The first step is
1865 // to delete all the threads.
1866 //
1867 // Another way to do this would be to raise an exception in
1868 // the main thread, which we really should do because it gives
1869 // the program a chance to clean up. But how do we find the
1870 // main thread? It should presumably be the same one that
1871 // gets ^C exceptions, but that's all done on the Haskell side
1872 // (GHC.TopHandler).
1873 sched_state = SCHED_INTERRUPTING;
1874 goto delete_threads_and_gc;
1875 }
1876
1877 #ifdef SPARKBALANCE
1878 /* JB
1879 Once we are all together... this would be the place to balance all
1880 spark pools. No concurrent stealing or adding of new sparks can
1881 occur. Should be defined in Sparks.c. */
1882 balanceSparkPoolsCaps(n_capabilities, capabilities);
1883 #endif
1884
1885 #if defined(THREADED_RTS)
1886 stgFree(idle_cap);
1887
1888 if (gc_type == SYNC_GC_SEQ) {
1889 // release our stash of capabilities.
1890 releaseAllCapabilities(n_capabilities, cap, task);
1891 }
1892 #endif
1893
1894 return;
1895 }
1896
1897 /* ---------------------------------------------------------------------------
1898 * Singleton fork(). Do not copy any running threads.
1899 * ------------------------------------------------------------------------- */
1900
1901 pid_t
1902 forkProcess(HsStablePtr *entry
1903 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1904 STG_UNUSED
1905 #endif
1906 )
1907 {
1908 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1909 pid_t pid;
1910 StgTSO* t,*next;
1911 Capability *cap;
1912 uint32_t g;
1913 Task *task = NULL;
1914 uint32_t i;
1915
1916 debugTrace(DEBUG_sched, "forking!");
1917
1918 task = newBoundTask();
1919
1920 cap = NULL;
1921 waitForCapability(&cap, task);
1922
1923 #ifdef THREADED_RTS
1924 stopAllCapabilities(&cap, task);
1925 #endif
1926
1927 // no funny business: hold locks while we fork, otherwise if some
1928 // other thread is holding a lock when the fork happens, the data
1929 // structure protected by the lock will forever be in an
1930 // inconsistent state in the child. See also #1391.
1931 ACQUIRE_LOCK(&sched_mutex);
1932 ACQUIRE_LOCK(&sm_mutex);
1933 ACQUIRE_LOCK(&stable_mutex);
1934 ACQUIRE_LOCK(&task->lock);
1935
1936 for (i=0; i < n_capabilities; i++) {
1937 ACQUIRE_LOCK(&capabilities[i]->lock);
1938 }
1939
1940 #ifdef THREADED_RTS
1941 ACQUIRE_LOCK(&all_tasks_mutex);
1942 #endif
1943
1944 stopTimer(); // See #4074
1945
1946 #if defined(TRACING)
1947 flushEventLog(); // so that child won't inherit dirty file buffers
1948 #endif
1949
1950 pid = fork();
1951
1952 if (pid) { // parent
1953
1954 startTimer(); // #4074
1955
1956 RELEASE_LOCK(&sched_mutex);
1957 RELEASE_LOCK(&sm_mutex);
1958 RELEASE_LOCK(&stable_mutex);
1959 RELEASE_LOCK(&task->lock);
1960
1961 for (i=0; i < n_capabilities; i++) {
1962 releaseCapability_(capabilities[i],rtsFalse);
1963 RELEASE_LOCK(&capabilities[i]->lock);
1964 }
1965
1966 #ifdef THREADED_RTS
1967 RELEASE_LOCK(&all_tasks_mutex);
1968 #endif
1969
1970 boundTaskExiting(task);
1971
1972 // just return the pid
1973 return pid;
1974
1975 } else { // child
1976
1977 #if defined(THREADED_RTS)
1978 initMutex(&sched_mutex);
1979 initMutex(&sm_mutex);
1980 initMutex(&stable_mutex);
1981 initMutex(&task->lock);
1982
1983 for (i=0; i < n_capabilities; i++) {
1984 initMutex(&capabilities[i]->lock);
1985 }
1986
1987 initMutex(&all_tasks_mutex);
1988 #endif
1989
1990 #ifdef TRACING
1991 resetTracing();
1992 #endif
1993
1994 // Now, all OS threads except the thread that forked are
1995 // stopped. We need to stop all Haskell threads, including
1996 // those involved in foreign calls. Also we need to delete
1997 // all Tasks, because they correspond to OS threads that are
1998 // now gone.
1999
2000 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2001 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2002 next = t->global_link;
2003 // don't allow threads to catch the ThreadKilled
2004 // exception, but we do want to raiseAsync() because these
2005 // threads may be evaluating thunks that we need later.
2006 deleteThread_(t->cap,t);
2007
2008 // stop the GC from updating the InCall to point to
2009 // the TSO. This is only necessary because the
2010 // OSThread bound to the TSO has been killed, and
2011 // won't get a chance to exit in the usual way (see
2012 // also scheduleHandleThreadFinished).
2013 t->bound = NULL;
2014 }
2015 }
2016
2017 discardTasksExcept(task);
2018
2019 for (i=0; i < n_capabilities; i++) {
2020 cap = capabilities[i];
2021
2022 // Empty the run queue. It seems tempting to let all the
2023 // killed threads stay on the run queue as zombies to be
2024 // cleaned up later, but some of them may correspond to
2025 // bound threads for which the corresponding Task does not
2026 // exist.
2027 truncateRunQueue(cap);
2028 cap->n_run_queue = 0;
2029
2030 // Any suspended C-calling Tasks are no more, their OS threads
2031 // don't exist now:
2032 cap->suspended_ccalls = NULL;
2033 cap->n_suspended_ccalls = 0;
2034
2035 #if defined(THREADED_RTS)
2036 // Wipe our spare workers list, they no longer exist. New
2037 // workers will be created if necessary.
2038 cap->spare_workers = NULL;
2039 cap->n_spare_workers = 0;
2040 cap->returning_tasks_hd = NULL;
2041 cap->returning_tasks_tl = NULL;
2042 cap->n_returning_tasks = 0;
2043 #endif
2044
2045 // Release all caps except 0, we'll use that for starting
2046 // the IO manager and running the client action below.
2047 if (cap->no != 0) {
2048 task->cap = cap;
2049 releaseCapability(cap);
2050 }
2051 }
2052 cap = capabilities[0];
2053 task->cap = cap;
2054
2055 // Empty the threads lists. Otherwise, the garbage
2056 // collector may attempt to resurrect some of these threads.
2057 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2058 generations[g].threads = END_TSO_QUEUE;
2059 }
2060
2061 // On Unix, all timers are reset in the child, so we need to start
2062 // the timer again.
2063 initTimer();
2064 startTimer();
2065
2066 // TODO: need to trace various other things in the child
2067 // like startup event, capabilities, process info etc
2068 traceTaskCreate(task, cap);
2069
2070 #if defined(THREADED_RTS)
2071 ioManagerStartCap(&cap);
2072 #endif
2073
2074 rts_evalStableIO(&cap, entry, NULL); // run the action
2075 rts_checkSchedStatus("forkProcess",cap);
2076
2077 rts_unlock(cap);
2078 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2079 }
2080 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2081 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2082 #endif
2083 }
2084
2085 /* ---------------------------------------------------------------------------
2086 * Changing the number of Capabilities
2087 *
2088 * Changing the number of Capabilities is very tricky! We can only do
2089 * it with the system fully stopped, so we do a full sync with
2090 * requestSync(SYNC_OTHER) and grab all the capabilities.
2091 *
2092 * Then we resize the appropriate data structures, and update all
2093 * references to the old data structures which have now moved.
2094 * Finally we release the Capabilities we are holding, and start
2095 * worker Tasks on the new Capabilities we created.
2096 *
2097 * ------------------------------------------------------------------------- */
2098
2099 void
2100 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2101 {
2102 #if !defined(THREADED_RTS)
2103 if (new_n_capabilities != 1) {
2104 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2105 }
2106 return;
2107 #elif defined(NOSMP)
2108 if (new_n_capabilities != 1) {
2109 errorBelch("setNumCapabilities: not supported on this platform");
2110 }
2111 return;
2112 #else
2113 Task *task;
2114 Capability *cap;
2115 uint32_t n;
2116 Capability *old_capabilities = NULL;
2117 uint32_t old_n_capabilities = n_capabilities;
2118
2119 if (new_n_capabilities == enabled_capabilities) return;
2120
2121 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2122 enabled_capabilities, new_n_capabilities);
2123
2124 cap = rts_lock();
2125 task = cap->running_task;
2126
2127 stopAllCapabilities(&cap, task);
2128
2129 if (new_n_capabilities < enabled_capabilities)
2130 {
2131 // Reducing the number of capabilities: we do not actually
2132 // remove the extra capabilities, we just mark them as
2133 // "disabled". This has the following effects:
2134 //
2135 // - threads on a disabled capability are migrated away by the
2136 // scheduler loop
2137 //
2138 // - disabled capabilities do not participate in GC
2139 // (see scheduleDoGC())
2140 //
2141 // - No spark threads are created on this capability
2142 // (see scheduleActivateSpark())
2143 //
2144 // - We do not attempt to migrate threads *to* a disabled
2145 // capability (see schedulePushWork()).
2146 //
2147 // but in other respects, a disabled capability remains
2148 // alive. Threads may be woken up on a disabled capability,
2149 // but they will be immediately migrated away.
2150 //
2151 // This approach is much easier than trying to actually remove
2152 // the capability; we don't have to worry about GC data
2153 // structures, the nursery, etc.
2154 //
2155 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2156 capabilities[n]->disabled = rtsTrue;
2157 traceCapDisable(capabilities[n]);
2158 }
2159 enabled_capabilities = new_n_capabilities;
2160 }
2161 else
2162 {
2163 // Increasing the number of enabled capabilities.
2164 //
2165 // enable any disabled capabilities, up to the required number
2166 for (n = enabled_capabilities;
2167 n < new_n_capabilities && n < n_capabilities; n++) {
2168 capabilities[n]->disabled = rtsFalse;
2169 traceCapEnable(capabilities[n]);
2170 }
2171 enabled_capabilities = n;
2172
2173 if (new_n_capabilities > n_capabilities) {
2174 #if defined(TRACING)
2175 // Allocate eventlog buffers for the new capabilities. Note this
2176 // must be done before calling moreCapabilities(), because that
2177 // will emit events about creating the new capabilities and adding
2178 // them to existing capsets.
2179 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2180 #endif
2181
2182 // Resize the capabilities array
2183 // NB. after this, capabilities points somewhere new. Any pointers
2184 // of type (Capability *) are now invalid.
2185 moreCapabilities(n_capabilities, new_n_capabilities);
2186
2187 // Resize and update storage manager data structures
2188 storageAddCapabilities(n_capabilities, new_n_capabilities);
2189 }
2190 }
2191
2192 // update n_capabilities before things start running
2193 if (new_n_capabilities > n_capabilities) {
2194 n_capabilities = enabled_capabilities = new_n_capabilities;
2195 }
2196
2197 // Start worker tasks on the new Capabilities
2198 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2199
2200 // We're done: release the original Capabilities
2201 releaseAllCapabilities(old_n_capabilities, cap,task);
2202
2203 // We can't free the old array until now, because we access it
2204 // while updating pointers in updateCapabilityRefs().
2205 if (old_capabilities) {
2206 stgFree(old_capabilities);
2207 }
2208
2209 // Notify IO manager that the number of capabilities has changed.
2210 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2211
2212 rts_unlock(cap);
2213
2214 #endif // THREADED_RTS
2215 }
2216
2217
2218
2219 /* ---------------------------------------------------------------------------
2220 * Delete all the threads in the system
2221 * ------------------------------------------------------------------------- */
2222
2223 static void
2224 deleteAllThreads ( Capability *cap )
2225 {
2226 // NOTE: only safe to call if we own all capabilities.
2227
2228 StgTSO* t, *next;
2229 uint32_t g;
2230
2231 debugTrace(DEBUG_sched,"deleting all threads");
2232 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2233 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2234 next = t->global_link;
2235 deleteThread(cap,t);
2236 }
2237 }
2238
2239 // The run queue now contains a bunch of ThreadKilled threads. We
2240 // must not throw these away: the main thread(s) will be in there
2241 // somewhere, and the main scheduler loop has to deal with it.
2242 // Also, the run queue is the only thing keeping these threads from
2243 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2244
2245 #if !defined(THREADED_RTS)
2246 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2247 ASSERT(sleeping_queue == END_TSO_QUEUE);
2248 #endif
2249 }
2250
2251 /* -----------------------------------------------------------------------------
2252 Managing the suspended_ccalls list.
2253 Locks required: sched_mutex
2254 -------------------------------------------------------------------------- */
2255
2256 STATIC_INLINE void
2257 suspendTask (Capability *cap, Task *task)
2258 {
2259 InCall *incall;
2260
2261 incall = task->incall;
2262 ASSERT(incall->next == NULL && incall->prev == NULL);
2263 incall->next = cap->suspended_ccalls;
2264 incall->prev = NULL;
2265 if (cap->suspended_ccalls) {
2266 cap->suspended_ccalls->prev = incall;
2267 }
2268 cap->suspended_ccalls = incall;
2269 cap->n_suspended_ccalls++;
2270 }
2271
2272 STATIC_INLINE void
2273 recoverSuspendedTask (Capability *cap, Task *task)
2274 {
2275 InCall *incall;
2276
2277 incall = task->incall;
2278 if (incall->prev) {
2279 incall->prev->next = incall->next;
2280 } else {
2281 ASSERT(cap->suspended_ccalls == incall);
2282 cap->suspended_ccalls = incall->next;
2283 }
2284 if (incall->next) {
2285 incall->next->prev = incall->prev;
2286 }
2287 incall->next = incall->prev = NULL;
2288 cap->n_suspended_ccalls--;
2289 }
2290
2291 /* ---------------------------------------------------------------------------
2292 * Suspending & resuming Haskell threads.
2293 *
2294 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2295 * its capability before calling the C function. This allows another
2296 * task to pick up the capability and carry on running Haskell
2297 * threads. It also means that if the C call blocks, it won't lock
2298 * the whole system.
2299 *
2300 * The Haskell thread making the C call is put to sleep for the
2301 * duration of the call, on the suspended_ccalling_threads queue. We
2302 * give out a token to the task, which it can use to resume the thread
2303 * on return from the C function.
2304 *
2305 * If this is an interruptible C call, this means that the FFI call may be
2306 * unceremoniously terminated and should be scheduled on an
2307 * unbound worker thread.
2308 * ------------------------------------------------------------------------- */
2309
2310 void *
2311 suspendThread (StgRegTable *reg, rtsBool interruptible)
2312 {
2313 Capability *cap;
2314 int saved_errno;
2315 StgTSO *tso;
2316 Task *task;
2317 #if mingw32_HOST_OS
2318 StgWord32 saved_winerror;
2319 #endif
2320
2321 saved_errno = errno;
2322 #if mingw32_HOST_OS
2323 saved_winerror = GetLastError();
2324 #endif
2325
2326 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2327 */
2328 cap = regTableToCapability(reg);
2329
2330 task = cap->running_task;
2331 tso = cap->r.rCurrentTSO;
2332
2333 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2334
2335 // XXX this might not be necessary --SDM
2336 tso->what_next = ThreadRunGHC;
2337
2338 threadPaused(cap,tso);
2339
2340 if (interruptible) {
2341 tso->why_blocked = BlockedOnCCall_Interruptible;
2342 } else {
2343 tso->why_blocked = BlockedOnCCall;
2344 }
2345
2346 // Hand back capability
2347 task->incall->suspended_tso = tso;
2348 task->incall->suspended_cap = cap;
2349
2350 // Otherwise allocate() will write to invalid memory.
2351 cap->r.rCurrentTSO = NULL;
2352
2353 ACQUIRE_LOCK(&cap->lock);
2354
2355 suspendTask(cap,task);
2356 cap->in_haskell = rtsFalse;
2357 releaseCapability_(cap,rtsFalse);
2358
2359 RELEASE_LOCK(&cap->lock);
2360
2361 errno = saved_errno;
2362 #if mingw32_HOST_OS
2363 SetLastError(saved_winerror);
2364 #endif
2365 return task;
2366 }
2367
2368 StgRegTable *
2369 resumeThread (void *task_)
2370 {
2371 StgTSO *tso;
2372 InCall *incall;
2373 Capability *cap;
2374 Task *task = task_;
2375 int saved_errno;
2376 #if mingw32_HOST_OS
2377 StgWord32 saved_winerror;
2378 #endif
2379
2380 saved_errno = errno;
2381 #if mingw32_HOST_OS
2382 saved_winerror = GetLastError();
2383 #endif
2384
2385 incall = task->incall;
2386 cap = incall->suspended_cap;
2387 task->cap = cap;
2388
2389 // Wait for permission to re-enter the RTS with the result.
2390 waitForCapability(&cap,task);
2391 // we might be on a different capability now... but if so, our
2392 // entry on the suspended_ccalls list will also have been
2393 // migrated.
2394
2395 // Remove the thread from the suspended list
2396 recoverSuspendedTask(cap,task);
2397
2398 tso = incall->suspended_tso;
2399 incall->suspended_tso = NULL;
2400 incall->suspended_cap = NULL;
2401 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2402
2403 traceEventRunThread(cap, tso);
2404
2405 /* Reset blocking status */
2406 tso->why_blocked = NotBlocked;
2407
2408 if ((tso->flags & TSO_BLOCKEX) == 0) {
2409 // avoid locking the TSO if we don't have to
2410 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2411 maybePerformBlockedException(cap,tso);
2412 }
2413 }
2414
2415 cap->r.rCurrentTSO = tso;
2416 cap->in_haskell = rtsTrue;
2417 errno = saved_errno;
2418 #if mingw32_HOST_OS
2419 SetLastError(saved_winerror);
2420 #endif
2421
2422 /* We might have GC'd, mark the TSO dirty again */
2423 dirty_TSO(cap,tso);
2424 dirty_STACK(cap,tso->stackobj);
2425
2426 IF_DEBUG(sanity, checkTSO(tso));
2427
2428 return &cap->r;
2429 }
2430
2431 /* ---------------------------------------------------------------------------
2432 * scheduleThread()
2433 *
2434 * scheduleThread puts a thread on the end of the runnable queue.
2435 * This will usually be done immediately after a thread is created.
2436 * The caller of scheduleThread must create the thread using e.g.
2437 * createThread and push an appropriate closure
2438 * on this thread's stack before the scheduler is invoked.
2439 * ------------------------------------------------------------------------ */
2440
2441 void
2442 scheduleThread(Capability *cap, StgTSO *tso)
2443 {
2444 // The thread goes at the *end* of the run-queue, to avoid possible
2445 // starvation of any threads already on the queue.
2446 appendToRunQueue(cap,tso);
2447 }
2448
2449 void
2450 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2451 {
2452 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2453 // move this thread from now on.
2454 #if defined(THREADED_RTS)
2455 cpu %= enabled_capabilities;
2456 if (cpu == cap->no) {
2457 appendToRunQueue(cap,tso);
2458 } else {
2459 migrateThread(cap, tso, capabilities[cpu]);
2460 }
2461 #else
2462 appendToRunQueue(cap,tso);
2463 #endif
2464 }
2465
2466 void
2467 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2468 {
2469 Task *task;
2470 DEBUG_ONLY( StgThreadID id );
2471 Capability *cap;
2472
2473 cap = *pcap;
2474
2475 // We already created/initialised the Task
2476 task = cap->running_task;
2477
2478 // This TSO is now a bound thread; make the Task and TSO
2479 // point to each other.
2480 tso->bound = task->incall;
2481 tso->cap = cap;
2482
2483 task->incall->tso = tso;
2484 task->incall->ret = ret;
2485 task->incall->rstat = NoStatus;
2486
2487 appendToRunQueue(cap,tso);
2488
2489 DEBUG_ONLY( id = tso->id );
2490 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2491
2492 cap = schedule(cap,task);
2493
2494 ASSERT(task->incall->rstat != NoStatus);
2495 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2496
2497 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2498 *pcap = cap;
2499 }
2500
2501 /* ----------------------------------------------------------------------------
2502 * Starting Tasks
2503 * ------------------------------------------------------------------------- */
2504
2505 #if defined(THREADED_RTS)
2506 void scheduleWorker (Capability *cap, Task *task)
2507 {
2508 // schedule() runs without a lock.
2509 cap = schedule(cap,task);
2510
2511 // On exit from schedule(), we have a Capability, but possibly not
2512 // the same one we started with.
2513
2514 // During shutdown, the requirement is that after all the
2515 // Capabilities are shut down, all workers that are shutting down
2516 // have finished workerTaskStop(). This is why we hold on to
2517 // cap->lock until we've finished workerTaskStop() below.
2518 //
2519 // There may be workers still involved in foreign calls; those
2520 // will just block in waitForCapability() because the
2521 // Capability has been shut down.
2522 //
2523 ACQUIRE_LOCK(&cap->lock);
2524 releaseCapability_(cap,rtsFalse);
2525 workerTaskStop(task);
2526 RELEASE_LOCK(&cap->lock);
2527 }
2528 #endif
2529
2530 /* ---------------------------------------------------------------------------
2531 * Start new worker tasks on Capabilities from--to
2532 * -------------------------------------------------------------------------- */
2533
2534 static void
2535 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2536 {
2537 #if defined(THREADED_RTS)
2538 uint32_t i;
2539 Capability *cap;
2540
2541 for (i = from; i < to; i++) {
2542 cap = capabilities[i];
2543 ACQUIRE_LOCK(&cap->lock);
2544 startWorkerTask(cap);
2545 RELEASE_LOCK(&cap->lock);
2546 }
2547 #endif
2548 }
2549
2550 /* ---------------------------------------------------------------------------
2551 * initScheduler()
2552 *
2553 * Initialise the scheduler. This resets all the queues - if the
2554 * queues contained any threads, they'll be garbage collected at the
2555 * next pass.
2556 *
2557 * ------------------------------------------------------------------------ */
2558
2559 void
2560 initScheduler(void)
2561 {
2562 #if !defined(THREADED_RTS)
2563 blocked_queue_hd = END_TSO_QUEUE;
2564 blocked_queue_tl = END_TSO_QUEUE;
2565 sleeping_queue = END_TSO_QUEUE;
2566 #endif
2567
2568 sched_state = SCHED_RUNNING;
2569 recent_activity = ACTIVITY_YES;
2570
2571 #if defined(THREADED_RTS)
2572 /* Initialise the mutex and condition variables used by
2573 * the scheduler. */
2574 initMutex(&sched_mutex);
2575 #endif
2576
2577 ACQUIRE_LOCK(&sched_mutex);
2578
2579 /* A capability holds the state a native thread needs in
2580 * order to execute STG code. At least one capability is
2581 * floating around (only THREADED_RTS builds have more than one).
2582 */
2583 initCapabilities();
2584
2585 initTaskManager();
2586
2587 /*
2588 * Eagerly start one worker to run each Capability, except for
2589 * Capability 0. The idea is that we're probably going to start a
2590 * bound thread on Capability 0 pretty soon, so we don't want a
2591 * worker task hogging it.
2592 */
2593 startWorkerTasks(1, n_capabilities);
2594
2595 RELEASE_LOCK(&sched_mutex);
2596
2597 }
2598
2599 void
2600 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2601 /* see Capability.c, shutdownCapability() */
2602 {
2603 Task *task = NULL;
2604
2605 task = newBoundTask();
2606
2607 // If we haven't killed all the threads yet, do it now.
2608 if (sched_state < SCHED_SHUTTING_DOWN) {
2609 sched_state = SCHED_INTERRUPTING;
2610 Capability *cap = task->cap;
2611 waitForCapability(&cap,task);
2612 scheduleDoGC(&cap,task,rtsTrue);
2613 ASSERT(task->incall->tso == NULL);
2614 releaseCapability(cap);
2615 }
2616 sched_state = SCHED_SHUTTING_DOWN;
2617
2618 shutdownCapabilities(task, wait_foreign);
2619
2620 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2621 // n_failed_trygrab_idles, n_idle_caps);
2622
2623 boundTaskExiting(task);
2624 }
2625
2626 void
2627 freeScheduler( void )
2628 {
2629 uint32_t still_running;
2630
2631 ACQUIRE_LOCK(&sched_mutex);
2632 still_running = freeTaskManager();
2633 // We can only free the Capabilities if there are no Tasks still
2634 // running. We might have a Task about to return from a foreign
2635 // call into waitForCapability(), for example (actually,
2636 // this should be the *only* thing that a still-running Task can
2637 // do at this point, and it will block waiting for the
2638 // Capability).
2639 if (still_running == 0) {
2640 freeCapabilities();
2641 }
2642 RELEASE_LOCK(&sched_mutex);
2643 #if defined(THREADED_RTS)
2644 closeMutex(&sched_mutex);
2645 #endif
2646 }
2647
2648 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2649 void *user USED_IF_NOT_THREADS)
2650 {
2651 #if !defined(THREADED_RTS)
2652 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2653 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2654 evac(user, (StgClosure **)(void *)&sleeping_queue);
2655 #endif
2656 }
2657
2658 /* -----------------------------------------------------------------------------
2659 performGC
2660
2661 This is the interface to the garbage collector from Haskell land.
2662 We provide this so that external C code can allocate and garbage
2663 collect when called from Haskell via _ccall_GC.
2664 -------------------------------------------------------------------------- */
2665
2666 static void
2667 performGC_(rtsBool force_major)
2668 {
2669 Task *task;
2670 Capability *cap = NULL;
2671
2672 // We must grab a new Task here, because the existing Task may be
2673 // associated with a particular Capability, and chained onto the
2674 // suspended_ccalls queue.
2675 task = newBoundTask();
2676
2677 // TODO: do we need to traceTask*() here?
2678
2679 waitForCapability(&cap,task);
2680 scheduleDoGC(&cap,task,force_major);
2681 releaseCapability(cap);
2682 boundTaskExiting(task);
2683 }
2684
2685 void
2686 performGC(void)
2687 {
2688 performGC_(rtsFalse);
2689 }
2690
2691 void
2692 performMajorGC(void)
2693 {
2694 performGC_(rtsTrue);
2695 }
2696
2697 /* ---------------------------------------------------------------------------
2698 Interrupt execution.
2699 Might be called inside a signal handler so it mustn't do anything fancy.
2700 ------------------------------------------------------------------------ */
2701
2702 void
2703 interruptStgRts(void)
2704 {
2705 sched_state = SCHED_INTERRUPTING;
2706 interruptAllCapabilities();
2707 #if defined(THREADED_RTS)
2708 wakeUpRts();
2709 #endif
2710 }
2711
2712 /* -----------------------------------------------------------------------------
2713 Wake up the RTS
2714
2715 This function causes at least one OS thread to wake up and run the
2716 scheduler loop. It is invoked when the RTS might be deadlocked, or
2717 an external event has arrived that may need servicing (eg. a
2718 keyboard interrupt).
2719
2720 In the single-threaded RTS we don't do anything here; we only have
2721 one thread anyway, and the event that caused us to want to wake up
2722 will have interrupted any blocking system call in progress anyway.
2723 -------------------------------------------------------------------------- */
2724
2725 #if defined(THREADED_RTS)
2726 void wakeUpRts(void)
2727 {
2728 // This forces the IO Manager thread to wakeup, which will
2729 // in turn ensure that some OS thread wakes up and runs the
2730 // scheduler loop, which will cause a GC and deadlock check.
2731 ioManagerWakeup();
2732 }
2733 #endif
2734
2735 /* -----------------------------------------------------------------------------
2736 Deleting threads
2737
2738 This is used for interruption (^C) and forking, and corresponds to
2739 raising an exception but without letting the thread catch the
2740 exception.
2741 -------------------------------------------------------------------------- */
2742
2743 static void
2744 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2745 {
2746 // NOTE: must only be called on a TSO that we have exclusive
2747 // access to, because we will call throwToSingleThreaded() below.
2748 // The TSO must be on the run queue of the Capability we own, or
2749 // we must own all Capabilities.
2750
2751 if (tso->why_blocked != BlockedOnCCall &&
2752 tso->why_blocked != BlockedOnCCall_Interruptible) {
2753 throwToSingleThreaded(tso->cap,tso,NULL);
2754 }
2755 }
2756
2757 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2758 static void
2759 deleteThread_(Capability *cap, StgTSO *tso)
2760 { // for forkProcess only:
2761 // like deleteThread(), but we delete threads in foreign calls, too.
2762
2763 if (tso->why_blocked == BlockedOnCCall ||
2764 tso->why_blocked == BlockedOnCCall_Interruptible) {
2765 tso->what_next = ThreadKilled;
2766 appendToRunQueue(tso->cap, tso);
2767 } else {
2768 deleteThread(cap,tso);
2769 }
2770 }
2771 #endif
2772
2773 /* -----------------------------------------------------------------------------
2774 raiseExceptionHelper
2775
2776 This function is called by the raise# primitve, just so that we can
2777 move some of the tricky bits of raising an exception from C-- into
2778 C. Who knows, it might be a useful re-useable thing here too.
2779 -------------------------------------------------------------------------- */
2780
2781 StgWord
2782 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2783 {
2784 Capability *cap = regTableToCapability(reg);
2785 StgThunk *raise_closure = NULL;
2786 StgPtr p, next;
2787 const StgRetInfoTable *info;
2788 //
2789 // This closure represents the expression 'raise# E' where E
2790 // is the exception raise. It is used to overwrite all the
2791 // thunks which are currently under evaluataion.
2792 //
2793
2794 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2795 // LDV profiling: stg_raise_info has THUNK as its closure
2796 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2797 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2798 // 1 does not cause any problem unless profiling is performed.
2799 // However, when LDV profiling goes on, we need to linearly scan
2800 // small object pool, where raise_closure is stored, so we should
2801 // use MIN_UPD_SIZE.
2802 //
2803 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2804 // sizeofW(StgClosure)+1);
2805 //
2806
2807 //
2808 // Walk up the stack, looking for the catch frame. On the way,
2809 // we update any closures pointed to from update frames with the
2810 // raise closure that we just built.
2811 //
2812 p = tso->stackobj->sp;
2813 while(1) {
2814 info = get_ret_itbl((StgClosure *)p);
2815 next = p + stack_frame_sizeW((StgClosure *)p);
2816 switch (info->i.type) {
2817
2818 case UPDATE_FRAME:
2819 // Only create raise_closure if we need to.
2820 if (raise_closure == NULL) {
2821 raise_closure =
2822 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2823 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2824 raise_closure->payload[0] = exception;
2825 }
2826 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2827 (StgClosure *)raise_closure);
2828 p = next;
2829 continue;
2830
2831 case ATOMICALLY_FRAME:
2832 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2833 tso->stackobj->sp = p;
2834 return ATOMICALLY_FRAME;
2835
2836 case CATCH_FRAME:
2837 tso->stackobj->sp = p;
2838 return CATCH_FRAME;
2839
2840 case CATCH_STM_FRAME:
2841 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2842 tso->stackobj->sp = p;
2843 return CATCH_STM_FRAME;
2844
2845 case UNDERFLOW_FRAME:
2846 tso->stackobj->sp = p;
2847 threadStackUnderflow(cap,tso);
2848 p = tso->stackobj->sp;
2849 continue;
2850
2851 case STOP_FRAME:
2852 tso->stackobj->sp = p;
2853 return STOP_FRAME;
2854
2855 case CATCH_RETRY_FRAME: {
2856 StgTRecHeader *trec = tso -> trec;
2857 StgTRecHeader *outer = trec -> enclosing_trec;
2858 debugTrace(DEBUG_stm,
2859 "found CATCH_RETRY_FRAME at %p during raise", p);
2860 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2861 stmAbortTransaction(cap, trec);
2862 stmFreeAbortedTRec(cap, trec);
2863 tso -> trec = outer;
2864 p = next;
2865 continue;
2866 }
2867
2868 default:
2869 p = next;
2870 continue;
2871 }
2872 }
2873 }
2874
2875
2876 /* -----------------------------------------------------------------------------
2877 findRetryFrameHelper
2878
2879 This function is called by the retry# primitive. It traverses the stack
2880 leaving tso->sp referring to the frame which should handle the retry.
2881
2882 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2883 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2884
2885 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2886 create) because retries are not considered to be exceptions, despite the
2887 similar implementation.
2888
2889 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2890 not be created within memory transactions.
2891 -------------------------------------------------------------------------- */
2892
2893 StgWord
2894 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2895 {
2896 const StgRetInfoTable *info;
2897 StgPtr p, next;
2898
2899 p = tso->stackobj->sp;
2900 while (1) {
2901 info = get_ret_itbl((const StgClosure *)p);
2902 next = p + stack_frame_sizeW((StgClosure *)p);
2903 switch (info->i.type) {
2904
2905 case ATOMICALLY_FRAME:
2906 debugTrace(DEBUG_stm,
2907 "found ATOMICALLY_FRAME at %p during retry", p);
2908 tso->stackobj->sp = p;
2909 return ATOMICALLY_FRAME;
2910
2911 case CATCH_RETRY_FRAME:
2912 debugTrace(DEBUG_stm,
2913 "found CATCH_RETRY_FRAME at %p during retry", p);
2914 tso->stackobj->sp = p;
2915 return CATCH_RETRY_FRAME;
2916
2917 case CATCH_STM_FRAME: {
2918 StgTRecHeader *trec = tso -> trec;
2919 StgTRecHeader *outer = trec -> enclosing_trec;
2920 debugTrace(DEBUG_stm,
2921 "found CATCH_STM_FRAME at %p during retry", p);
2922 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2923 stmAbortTransaction(cap, trec);
2924 stmFreeAbortedTRec(cap, trec);
2925 tso -> trec = outer;
2926 p = next;
2927 continue;
2928 }
2929
2930 case UNDERFLOW_FRAME:
2931 tso->stackobj->sp = p;
2932 threadStackUnderflow(cap,tso);
2933 p = tso->stackobj->sp;
2934 continue;
2935
2936 default:
2937 ASSERT(info->i.type != CATCH_FRAME);
2938 ASSERT(info->i.type != STOP_FRAME);
2939 p = next;
2940 continue;
2941 }
2942 }
2943 }
2944
2945 /* -----------------------------------------------------------------------------
2946 resurrectThreads is called after garbage collection on the list of
2947 threads found to be garbage. Each of these threads will be woken
2948 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2949 on an MVar, or NonTermination if the thread was blocked on a Black
2950 Hole.
2951
2952 Locks: assumes we hold *all* the capabilities.
2953 -------------------------------------------------------------------------- */
2954
2955 void
2956 resurrectThreads (StgTSO *threads)
2957 {
2958 StgTSO *tso, *next;
2959 Capability *cap;
2960 generation *gen;
2961
2962 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2963 next = tso->global_link;
2964
2965 gen = Bdescr((P_)tso)->gen;
2966 tso->global_link = gen->threads;
2967 gen->threads = tso;
2968
2969 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2970
2971 // Wake up the thread on the Capability it was last on
2972 cap = tso->cap;
2973
2974 switch (tso->why_blocked) {
2975 case BlockedOnMVar:
2976 case BlockedOnMVarRead:
2977 /* Called by GC - sched_mutex lock is currently held. */
2978 throwToSingleThreaded(cap, tso,
2979 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2980 break;
2981 case BlockedOnBlackHole:
2982 throwToSingleThreaded(cap, tso,
2983 (StgClosure *)nonTermination_closure);
2984 break;
2985 case BlockedOnSTM:
2986 throwToSingleThreaded(cap, tso,
2987 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2988 break;
2989 case NotBlocked:
2990 /* This might happen if the thread was blocked on a black hole
2991 * belonging to a thread that we've just woken up (raiseAsync
2992 * can wake up threads, remember...).
2993 */
2994 continue;
2995 case BlockedOnMsgThrowTo:
2996 // This can happen if the target is masking, blocks on a
2997 // black hole, and then is found to be unreachable. In
2998 // this case, we want to let the target wake up and carry
2999 // on, and do nothing to this thread.
3000 continue;
3001 default:
3002 barf("resurrectThreads: thread blocked in a strange way: %d",
3003 tso->why_blocked);
3004 }
3005 }
3006 }