Another try to get thread migration right
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
127 static void stopAllCapabilities(Capability **pCap, Task *task);
128 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 nat prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 nat prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence for ^C goes like this:
214 //
215 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
216 // arranges for some Capability to wake up
217 //
218 // * all threads in the system are halted, and the zombies are
219 // placed on the run queue for cleaning up. We acquire all
220 // the capabilities in order to delete the threads, this is
221 // done by scheduleDoGC() for convenience (because GC already
222 // needs to acquire all the capabilities). We can't kill
223 // threads involved in foreign calls.
224 //
225 // * somebody calls shutdownHaskell(), which calls exitScheduler()
226 //
227 // * sched_state := SCHED_SHUTTING_DOWN
228 //
229 // * all workers exit when the run queue on their capability
230 // drains. All main threads will also exit when their TSO
231 // reaches the head of the run queue and they can return.
232 //
233 // * eventually all Capabilities will shut down, and the RTS can
234 // exit.
235 //
236 // * We might be left with threads blocked in foreign calls,
237 // we should really attempt to kill these somehow (TODO);
238
239 switch (sched_state) {
240 case SCHED_RUNNING:
241 break;
242 case SCHED_INTERRUPTING:
243 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
244 /* scheduleDoGC() deletes all the threads */
245 scheduleDoGC(&cap,task,rtsTrue);
246
247 // after scheduleDoGC(), we must be shutting down. Either some
248 // other Capability did the final GC, or we did it above,
249 // either way we can fall through to the SCHED_SHUTTING_DOWN
250 // case now.
251 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
252 // fall through
253
254 case SCHED_SHUTTING_DOWN:
255 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
256 // If we are a worker, just exit. If we're a bound thread
257 // then we will exit below when we've removed our TSO from
258 // the run queue.
259 if (!isBoundTask(task) && emptyRunQueue(cap)) {
260 return cap;
261 }
262 break;
263 default:
264 barf("sched_state: %d", sched_state);
265 }
266
267 scheduleFindWork(&cap);
268
269 /* work pushing, currently relevant only for THREADED_RTS:
270 (pushes threads, wakes up idle capabilities for stealing) */
271 schedulePushWork(cap,task);
272
273 scheduleDetectDeadlock(&cap,task);
274
275 // Normally, the only way we can get here with no threads to
276 // run is if a keyboard interrupt received during
277 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
278 // Additionally, it is not fatal for the
279 // threaded RTS to reach here with no threads to run.
280 //
281 // win32: might be here due to awaitEvent() being abandoned
282 // as a result of a console event having been delivered.
283
284 #if defined(THREADED_RTS)
285 if (first)
286 {
287 // XXX: ToDo
288 // // don't yield the first time, we want a chance to run this
289 // // thread for a bit, even if there are others banging at the
290 // // door.
291 // first = rtsFalse;
292 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
293 }
294
295 scheduleYield(&cap,task);
296
297 if (emptyRunQueue(cap)) continue; // look for work again
298 #endif
299
300 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
301 if ( emptyRunQueue(cap) ) {
302 ASSERT(sched_state >= SCHED_INTERRUPTING);
303 }
304 #endif
305
306 //
307 // Get a thread to run
308 //
309 t = popRunQueue(cap);
310
311 // Sanity check the thread we're about to run. This can be
312 // expensive if there is lots of thread switching going on...
313 IF_DEBUG(sanity,checkTSO(t));
314
315 #if defined(THREADED_RTS)
316 // Check whether we can run this thread in the current task.
317 // If not, we have to pass our capability to the right task.
318 {
319 InCall *bound = t->bound;
320
321 if (bound) {
322 if (bound->task == task) {
323 // yes, the Haskell thread is bound to the current native thread
324 } else {
325 debugTrace(DEBUG_sched,
326 "thread %lu bound to another OS thread",
327 (unsigned long)t->id);
328 // no, bound to a different Haskell thread: pass to that thread
329 pushOnRunQueue(cap,t);
330 continue;
331 }
332 } else {
333 // The thread we want to run is unbound.
334 if (task->incall->tso) {
335 debugTrace(DEBUG_sched,
336 "this OS thread cannot run thread %lu",
337 (unsigned long)t->id);
338 // no, the current native thread is bound to a different
339 // Haskell thread, so pass it to any worker thread
340 pushOnRunQueue(cap,t);
341 continue;
342 }
343 }
344 }
345 #endif
346
347 // If we're shutting down, and this thread has not yet been
348 // killed, kill it now. This sometimes happens when a finalizer
349 // thread is created by the final GC, or a thread previously
350 // in a foreign call returns.
351 if (sched_state >= SCHED_INTERRUPTING &&
352 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
353 deleteThread(cap,t);
354 }
355
356 // If this capability is disabled, migrate the thread away rather
357 // than running it. NB. but not if the thread is bound: it is
358 // really hard for a bound thread to migrate itself. Believe me,
359 // I tried several ways and couldn't find a way to do it.
360 // Instead, when everything is stopped for GC, we migrate all the
361 // threads on the run queue then (see scheduleDoGC()).
362 //
363 // ToDo: what about TSO_LOCKED? Currently we're migrating those
364 // when the number of capabilities drops, but we never migrate
365 // them back if it rises again. Presumably we should, but after
366 // the thread has been migrated we no longer know what capability
367 // it was originally on.
368 #ifdef THREADED_RTS
369 if (cap->disabled && !t->bound) {
370 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
371 migrateThread(cap, t, dest_cap);
372 continue;
373 }
374 #endif
375
376 /* context switches are initiated by the timer signal, unless
377 * the user specified "context switch as often as possible", with
378 * +RTS -C0
379 */
380 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
381 && !emptyThreadQueues(cap)) {
382 cap->context_switch = 1;
383 }
384
385 run_thread:
386
387 // CurrentTSO is the thread to run. It might be different if we
388 // loop back to run_thread, so make sure to set CurrentTSO after
389 // that.
390 cap->r.rCurrentTSO = t;
391
392 startHeapProfTimer();
393
394 // ----------------------------------------------------------------------
395 // Run the current thread
396
397 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
398 ASSERT(t->cap == cap);
399 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
400
401 prev_what_next = t->what_next;
402
403 errno = t->saved_errno;
404 #if mingw32_HOST_OS
405 SetLastError(t->saved_winerror);
406 #endif
407
408 // reset the interrupt flag before running Haskell code
409 cap->interrupt = 0;
410
411 cap->in_haskell = rtsTrue;
412 cap->idle = 0;
413
414 dirty_TSO(cap,t);
415 dirty_STACK(cap,t->stackobj);
416
417 switch (recent_activity)
418 {
419 case ACTIVITY_DONE_GC: {
420 // ACTIVITY_DONE_GC means we turned off the timer signal to
421 // conserve power (see #1623). Re-enable it here.
422 nat prev;
423 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
424 if (prev == ACTIVITY_DONE_GC) {
425 #ifndef PROFILING
426 startTimer();
427 #endif
428 }
429 break;
430 }
431 case ACTIVITY_INACTIVE:
432 // If we reached ACTIVITY_INACTIVE, then don't reset it until
433 // we've done the GC. The thread running here might just be
434 // the IO manager thread that handle_tick() woke up via
435 // wakeUpRts().
436 break;
437 default:
438 recent_activity = ACTIVITY_YES;
439 }
440
441 traceEventRunThread(cap, t);
442
443 switch (prev_what_next) {
444
445 case ThreadKilled:
446 case ThreadComplete:
447 /* Thread already finished, return to scheduler. */
448 ret = ThreadFinished;
449 break;
450
451 case ThreadRunGHC:
452 {
453 StgRegTable *r;
454 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
455 cap = regTableToCapability(r);
456 ret = r->rRet;
457 break;
458 }
459
460 case ThreadInterpret:
461 cap = interpretBCO(cap);
462 ret = cap->r.rRet;
463 break;
464
465 default:
466 barf("schedule: invalid what_next field");
467 }
468
469 cap->in_haskell = rtsFalse;
470
471 // The TSO might have moved, eg. if it re-entered the RTS and a GC
472 // happened. So find the new location:
473 t = cap->r.rCurrentTSO;
474
475 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
476 // don't want it set when not running a Haskell thread.
477 cap->r.rCurrentTSO = NULL;
478
479 // And save the current errno in this thread.
480 // XXX: possibly bogus for SMP because this thread might already
481 // be running again, see code below.
482 t->saved_errno = errno;
483 #if mingw32_HOST_OS
484 // Similarly for Windows error code
485 t->saved_winerror = GetLastError();
486 #endif
487
488 if (ret == ThreadBlocked) {
489 if (t->why_blocked == BlockedOnBlackHole) {
490 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
491 traceEventStopThread(cap, t, t->why_blocked + 6,
492 owner != NULL ? owner->id : 0);
493 } else {
494 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
495 }
496 } else {
497 traceEventStopThread(cap, t, ret, 0);
498 }
499
500 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
501 ASSERT(t->cap == cap);
502
503 // ----------------------------------------------------------------------
504
505 // Costs for the scheduler are assigned to CCS_SYSTEM
506 stopHeapProfTimer();
507 #if defined(PROFILING)
508 cap->r.rCCCS = CCS_SYSTEM;
509 #endif
510
511 schedulePostRunThread(cap,t);
512
513 ready_to_gc = rtsFalse;
514
515 switch (ret) {
516 case HeapOverflow:
517 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
518 break;
519
520 case StackOverflow:
521 // just adjust the stack for this thread, then pop it back
522 // on the run queue.
523 threadStackOverflow(cap, t);
524 pushOnRunQueue(cap,t);
525 break;
526
527 case ThreadYielding:
528 if (scheduleHandleYield(cap, t, prev_what_next)) {
529 // shortcut for switching between compiler/interpreter:
530 goto run_thread;
531 }
532 break;
533
534 case ThreadBlocked:
535 scheduleHandleThreadBlocked(t);
536 break;
537
538 case ThreadFinished:
539 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
540 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
541 break;
542
543 default:
544 barf("schedule: invalid thread return code %d", (int)ret);
545 }
546
547 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
548 scheduleDoGC(&cap,task,rtsFalse);
549 }
550 } /* end of while() */
551 }
552
553 /* -----------------------------------------------------------------------------
554 * Run queue operations
555 * -------------------------------------------------------------------------- */
556
557 void
558 removeFromRunQueue (Capability *cap, StgTSO *tso)
559 {
560 if (tso->block_info.prev == END_TSO_QUEUE) {
561 ASSERT(cap->run_queue_hd == tso);
562 cap->run_queue_hd = tso->_link;
563 } else {
564 setTSOLink(cap, tso->block_info.prev, tso->_link);
565 }
566 if (tso->_link == END_TSO_QUEUE) {
567 ASSERT(cap->run_queue_tl == tso);
568 cap->run_queue_tl = tso->block_info.prev;
569 } else {
570 setTSOPrev(cap, tso->_link, tso->block_info.prev);
571 }
572 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
573 cap->n_run_queue--;
574
575 IF_DEBUG(sanity, checkRunQueue(cap));
576 }
577
578 void
579 promoteInRunQueue (Capability *cap, StgTSO *tso)
580 {
581 removeFromRunQueue(cap, tso);
582 pushOnRunQueue(cap, tso);
583 }
584
585 /* ----------------------------------------------------------------------------
586 * Setting up the scheduler loop
587 * ------------------------------------------------------------------------- */
588
589 static void
590 schedulePreLoop(void)
591 {
592 // initialisation for scheduler - what cannot go into initScheduler()
593
594 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
595 win32AllocStack();
596 #endif
597 }
598
599 /* -----------------------------------------------------------------------------
600 * scheduleFindWork()
601 *
602 * Search for work to do, and handle messages from elsewhere.
603 * -------------------------------------------------------------------------- */
604
605 static void
606 scheduleFindWork (Capability **pcap)
607 {
608 scheduleStartSignalHandlers(*pcap);
609
610 scheduleProcessInbox(pcap);
611
612 scheduleCheckBlockedThreads(*pcap);
613
614 #if defined(THREADED_RTS)
615 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
616 #endif
617 }
618
619 #if defined(THREADED_RTS)
620 STATIC_INLINE rtsBool
621 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
622 {
623 // we need to yield this capability to someone else if..
624 // - another thread is initiating a GC, and we didn't just do a GC
625 // (see Note [GC livelock])
626 // - another Task is returning from a foreign call
627 // - the thread at the head of the run queue cannot be run
628 // by this Task (it is bound to another Task, or it is unbound
629 // and this task it bound).
630 //
631 // Note [GC livelock]
632 //
633 // If we are interrupted to do a GC, then we do not immediately do
634 // another one. This avoids a starvation situation where one
635 // Capability keeps forcing a GC and the other Capabilities make no
636 // progress at all.
637
638 return ((pending_sync && !didGcLast) ||
639 cap->n_returning_tasks != 0 ||
640 (!emptyRunQueue(cap) && (task->incall->tso == NULL
641 ? peekRunQueue(cap)->bound != NULL
642 : peekRunQueue(cap)->bound != task->incall)));
643 }
644
645 // This is the single place where a Task goes to sleep. There are
646 // two reasons it might need to sleep:
647 // - there are no threads to run
648 // - we need to yield this Capability to someone else
649 // (see shouldYieldCapability())
650 //
651 // Careful: the scheduler loop is quite delicate. Make sure you run
652 // the tests in testsuite/concurrent (all ways) after modifying this,
653 // and also check the benchmarks in nofib/parallel for regressions.
654
655 static void
656 scheduleYield (Capability **pcap, Task *task)
657 {
658 Capability *cap = *pcap;
659 int didGcLast = rtsFalse;
660
661 // if we have work, and we don't need to give up the Capability, continue.
662 //
663 if (!shouldYieldCapability(cap,task,rtsFalse) &&
664 (!emptyRunQueue(cap) ||
665 !emptyInbox(cap) ||
666 sched_state >= SCHED_INTERRUPTING)) {
667 return;
668 }
669
670 // otherwise yield (sleep), and keep yielding if necessary.
671 do {
672 didGcLast = yieldCapability(&cap,task, !didGcLast);
673 }
674 while (shouldYieldCapability(cap,task,didGcLast));
675
676 // note there may still be no threads on the run queue at this
677 // point, the caller has to check.
678
679 *pcap = cap;
680 return;
681 }
682 #endif
683
684 /* -----------------------------------------------------------------------------
685 * schedulePushWork()
686 *
687 * Push work to other Capabilities if we have some.
688 * -------------------------------------------------------------------------- */
689
690 static void
691 schedulePushWork(Capability *cap USED_IF_THREADS,
692 Task *task USED_IF_THREADS)
693 {
694 /* following code not for PARALLEL_HASKELL. I kept the call general,
695 future GUM versions might use pushing in a distributed setup */
696 #if defined(THREADED_RTS)
697
698 Capability *free_caps[n_capabilities], *cap0;
699 uint32_t i, n_wanted_caps, n_free_caps;
700
701 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
702
703 // migration can be turned off with +RTS -qm
704 if (!RtsFlags.ParFlags.migrate) {
705 spare_threads = 0;
706 }
707
708 // Figure out how many capabilities we want to wake up. We need at least
709 // sparkPoolSize(cap) plus the number of spare threads we have.
710 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
711 if (n_wanted_caps == 0) return;
712
713 // First grab as many free Capabilities as we can.
714 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
715 n_free_caps < n_wanted_caps && i != cap->no;
716 i = (i + 1) % n_capabilities) {
717 cap0 = capabilities[i];
718 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
719 if (!emptyRunQueue(cap0)
720 || cap0->n_returning_tasks != 0
721 || cap0->inbox != (Message*)END_TSO_QUEUE) {
722 // it already has some work, we just grabbed it at
723 // the wrong moment. Or maybe it's deadlocked!
724 releaseCapability(cap0);
725 } else {
726 free_caps[n_free_caps++] = cap0;
727 }
728 }
729 }
730
731 // We now have n_free_caps free capabilities stashed in
732 // free_caps[]. Attempt to share our run queue equally with them.
733 // This is complicated slightly by the fact that we can't move
734 // some threads:
735 //
736 // - threads that have TSO_LOCKED cannot migrate
737 // - a thread that is bound to the current Task cannot be migrated
738 //
739 // This is about the simplest thing we could do; improvements we
740 // might want to do include:
741 //
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
745 //
746 // - giving low priority to moving long-lived threads
747
748 if (n_free_caps > 0) {
749 StgTSO *prev, *t, *next;
750
751 debugTrace(DEBUG_sched,
752 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
753 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
754 n_free_caps);
755
756 // There are n_free_caps+1 caps in total. We will share the threads
757 // evently between them, *except* that if the run queue does not divide
758 // evenly by n_free_caps+1 then we bias towards the current capability.
759 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
760 uint32_t keep_threads =
761 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
762
763 // This also ensures that we don't give away all our threads, since
764 // (x + y) / (y + 1) >= 1 when x >= 1.
765
766 // The number of threads we have left.
767 uint32_t n = cap->n_run_queue;
768
769 // prev = the previous thread on this cap's run queue
770 prev = END_TSO_QUEUE;
771
772 // We're going to walk through the run queue, migrating threads to other
773 // capabilities until we have only keep_threads left. We might
774 // encounter a thread that cannot be migrated, in which case we add it
775 // to the current run queue and decrement keep_threads.
776 for (t = cap->run_queue_hd, i = 0;
777 t != END_TSO_QUEUE && n > keep_threads;
778 t = next)
779 {
780 next = t->_link;
781 t->_link = END_TSO_QUEUE;
782
783 // Should we keep this thread?
784 if (t->bound == task->incall // don't move my bound thread
785 || tsoLocked(t) // don't move a locked thread
786 ) {
787 if (prev == END_TSO_QUEUE) {
788 cap->run_queue_hd = t;
789 } else {
790 setTSOLink(cap, prev, t);
791 }
792 setTSOPrev(cap, t, prev);
793 prev = t;
794 if (keep_threads > 0) keep_threads--;
795 }
796
797 // Or migrate it?
798 else {
799 appendToRunQueue(free_caps[i],t);
800 traceEventMigrateThread (cap, t, free_caps[i]->no);
801
802 if (t->bound) { t->bound->task->cap = free_caps[i]; }
803 t->cap = free_caps[i];
804 n--; // we have one fewer threads now
805 i++; // move on to the next free_cap
806 if (i == n_free_caps) i = 0;
807 }
808 }
809
810 // Join up the beginning of the queue (prev)
811 // with the rest of the queue (t)
812 if (t == END_TSO_QUEUE) {
813 cap->run_queue_tl = prev;
814 } else {
815 setTSOPrev(cap, t, prev);
816 }
817 if (prev == END_TSO_QUEUE) {
818 cap->run_queue_hd = t;
819 } else {
820 setTSOLink(cap, prev, t);
821 }
822 cap->n_run_queue = n;
823
824 IF_DEBUG(sanity, checkRunQueue(cap));
825
826 // release the capabilities
827 for (i = 0; i < n_free_caps; i++) {
828 task->cap = free_caps[i];
829 if (sparkPoolSizeCap(cap) > 0) {
830 // If we have sparks to steal, wake up a worker on the
831 // capability, even if it has no threads to run.
832 releaseAndWakeupCapability(free_caps[i]);
833 } else {
834 releaseCapability(free_caps[i]);
835 }
836 }
837 }
838 task->cap = cap; // reset to point to our Capability.
839
840 #endif /* THREADED_RTS */
841
842 }
843
844 /* ----------------------------------------------------------------------------
845 * Start any pending signal handlers
846 * ------------------------------------------------------------------------- */
847
848 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
849 static void
850 scheduleStartSignalHandlers(Capability *cap)
851 {
852 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
853 // safe outside the lock
854 startSignalHandlers(cap);
855 }
856 }
857 #else
858 static void
859 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
860 {
861 }
862 #endif
863
864 /* ----------------------------------------------------------------------------
865 * Check for blocked threads that can be woken up.
866 * ------------------------------------------------------------------------- */
867
868 static void
869 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
870 {
871 #if !defined(THREADED_RTS)
872 //
873 // Check whether any waiting threads need to be woken up. If the
874 // run queue is empty, and there are no other tasks running, we
875 // can wait indefinitely for something to happen.
876 //
877 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
878 {
879 awaitEvent (emptyRunQueue(cap));
880 }
881 #endif
882 }
883
884 /* ----------------------------------------------------------------------------
885 * Detect deadlock conditions and attempt to resolve them.
886 * ------------------------------------------------------------------------- */
887
888 static void
889 scheduleDetectDeadlock (Capability **pcap, Task *task)
890 {
891 Capability *cap = *pcap;
892 /*
893 * Detect deadlock: when we have no threads to run, there are no
894 * threads blocked, waiting for I/O, or sleeping, and all the
895 * other tasks are waiting for work, we must have a deadlock of
896 * some description.
897 */
898 if ( emptyThreadQueues(cap) )
899 {
900 #if defined(THREADED_RTS)
901 /*
902 * In the threaded RTS, we only check for deadlock if there
903 * has been no activity in a complete timeslice. This means
904 * we won't eagerly start a full GC just because we don't have
905 * any threads to run currently.
906 */
907 if (recent_activity != ACTIVITY_INACTIVE) return;
908 #endif
909
910 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
911
912 // Garbage collection can release some new threads due to
913 // either (a) finalizers or (b) threads resurrected because
914 // they are unreachable and will therefore be sent an
915 // exception. Any threads thus released will be immediately
916 // runnable.
917 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
918 cap = *pcap;
919 // when force_major == rtsTrue. scheduleDoGC sets
920 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
921 // signal.
922
923 if ( !emptyRunQueue(cap) ) return;
924
925 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
926 /* If we have user-installed signal handlers, then wait
927 * for signals to arrive rather then bombing out with a
928 * deadlock.
929 */
930 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
931 debugTrace(DEBUG_sched,
932 "still deadlocked, waiting for signals...");
933
934 awaitUserSignals();
935
936 if (signals_pending()) {
937 startSignalHandlers(cap);
938 }
939
940 // either we have threads to run, or we were interrupted:
941 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
942
943 return;
944 }
945 #endif
946
947 #if !defined(THREADED_RTS)
948 /* Probably a real deadlock. Send the current main thread the
949 * Deadlock exception.
950 */
951 if (task->incall->tso) {
952 switch (task->incall->tso->why_blocked) {
953 case BlockedOnSTM:
954 case BlockedOnBlackHole:
955 case BlockedOnMsgThrowTo:
956 case BlockedOnMVar:
957 case BlockedOnMVarRead:
958 throwToSingleThreaded(cap, task->incall->tso,
959 (StgClosure *)nonTermination_closure);
960 return;
961 default:
962 barf("deadlock: main thread blocked in a strange way");
963 }
964 }
965 return;
966 #endif
967 }
968 }
969
970
971 /* ----------------------------------------------------------------------------
972 * Process message in the current Capability's inbox
973 * ------------------------------------------------------------------------- */
974
975 static void
976 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
977 {
978 #if defined(THREADED_RTS)
979 Message *m, *next;
980 int r;
981 Capability *cap = *pcap;
982
983 while (!emptyInbox(cap)) {
984 if (cap->r.rCurrentNursery->link == NULL ||
985 g0->n_new_large_words >= large_alloc_lim) {
986 scheduleDoGC(pcap, cap->running_task, rtsFalse);
987 cap = *pcap;
988 }
989
990 // don't use a blocking acquire; if the lock is held by
991 // another thread then just carry on. This seems to avoid
992 // getting stuck in a message ping-pong situation with other
993 // processors. We'll check the inbox again later anyway.
994 //
995 // We should really use a more efficient queue data structure
996 // here. The trickiness is that we must ensure a Capability
997 // never goes idle if the inbox is non-empty, which is why we
998 // use cap->lock (cap->lock is released as the last thing
999 // before going idle; see Capability.c:releaseCapability()).
1000 r = TRY_ACQUIRE_LOCK(&cap->lock);
1001 if (r != 0) return;
1002
1003 m = cap->inbox;
1004 cap->inbox = (Message*)END_TSO_QUEUE;
1005
1006 RELEASE_LOCK(&cap->lock);
1007
1008 while (m != (Message*)END_TSO_QUEUE) {
1009 next = m->link;
1010 executeMessage(cap, m);
1011 m = next;
1012 }
1013 }
1014 #endif
1015 }
1016
1017 /* ----------------------------------------------------------------------------
1018 * Activate spark threads (THREADED_RTS)
1019 * ------------------------------------------------------------------------- */
1020
1021 #if defined(THREADED_RTS)
1022 static void
1023 scheduleActivateSpark(Capability *cap)
1024 {
1025 if (anySparks() && !cap->disabled)
1026 {
1027 createSparkThread(cap);
1028 debugTrace(DEBUG_sched, "creating a spark thread");
1029 }
1030 }
1031 #endif // THREADED_RTS
1032
1033 /* ----------------------------------------------------------------------------
1034 * After running a thread...
1035 * ------------------------------------------------------------------------- */
1036
1037 static void
1038 schedulePostRunThread (Capability *cap, StgTSO *t)
1039 {
1040 // We have to be able to catch transactions that are in an
1041 // infinite loop as a result of seeing an inconsistent view of
1042 // memory, e.g.
1043 //
1044 // atomically $ do
1045 // [a,b] <- mapM readTVar [ta,tb]
1046 // when (a == b) loop
1047 //
1048 // and a is never equal to b given a consistent view of memory.
1049 //
1050 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1051 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1052 debugTrace(DEBUG_sched | DEBUG_stm,
1053 "trec %p found wasting its time", t);
1054
1055 // strip the stack back to the
1056 // ATOMICALLY_FRAME, aborting the (nested)
1057 // transaction, and saving the stack of any
1058 // partially-evaluated thunks on the heap.
1059 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1060
1061 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1062 }
1063 }
1064
1065 //
1066 // If the current thread's allocation limit has run out, send it
1067 // the AllocationLimitExceeded exception.
1068
1069 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1070 // Use a throwToSelf rather than a throwToSingleThreaded, because
1071 // it correctly handles the case where the thread is currently
1072 // inside mask. Also the thread might be blocked (e.g. on an
1073 // MVar), and throwToSingleThreaded doesn't unblock it
1074 // correctly in that case.
1075 throwToSelf(cap, t, allocationLimitExceeded_closure);
1076 ASSIGN_Int64((W_*)&(t->alloc_limit),
1077 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1078 }
1079
1080 /* some statistics gathering in the parallel case */
1081 }
1082
1083 /* -----------------------------------------------------------------------------
1084 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1085 * -------------------------------------------------------------------------- */
1086
1087 static rtsBool
1088 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1089 {
1090 if (cap->r.rHpLim == NULL || cap->context_switch) {
1091 // Sometimes we miss a context switch, e.g. when calling
1092 // primitives in a tight loop, MAYBE_GC() doesn't check the
1093 // context switch flag, and we end up waiting for a GC.
1094 // See #1984, and concurrent/should_run/1984
1095 cap->context_switch = 0;
1096 appendToRunQueue(cap,t);
1097 } else {
1098 pushOnRunQueue(cap,t);
1099 }
1100
1101 // did the task ask for a large block?
1102 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1103 // if so, get one and push it on the front of the nursery.
1104 bdescr *bd;
1105 W_ blocks;
1106
1107 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1108
1109 if (blocks > BLOCKS_PER_MBLOCK) {
1110 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1111 }
1112
1113 debugTrace(DEBUG_sched,
1114 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1115 (long)t->id, what_next_strs[t->what_next], blocks);
1116
1117 // don't do this if the nursery is (nearly) full, we'll GC first.
1118 if (cap->r.rCurrentNursery->link != NULL ||
1119 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1120 // infinite loop if the
1121 // nursery has only one
1122 // block.
1123
1124 bd = allocGroup_lock(blocks);
1125 cap->r.rNursery->n_blocks += blocks;
1126
1127 // link the new group after CurrentNursery
1128 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1129
1130 // initialise it as a nursery block. We initialise the
1131 // step, gen_no, and flags field of *every* sub-block in
1132 // this large block, because this is easier than making
1133 // sure that we always find the block head of a large
1134 // block whenever we call Bdescr() (eg. evacuate() and
1135 // isAlive() in the GC would both have to do this, at
1136 // least).
1137 {
1138 bdescr *x;
1139 for (x = bd; x < bd + blocks; x++) {
1140 initBdescr(x,g0,g0);
1141 x->free = x->start;
1142 x->flags = 0;
1143 }
1144 }
1145
1146 // This assert can be a killer if the app is doing lots
1147 // of large block allocations.
1148 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1149
1150 // now update the nursery to point to the new block
1151 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1152 cap->r.rCurrentNursery = bd;
1153
1154 // we might be unlucky and have another thread get on the
1155 // run queue before us and steal the large block, but in that
1156 // case the thread will just end up requesting another large
1157 // block.
1158 return rtsFalse; /* not actually GC'ing */
1159 }
1160 }
1161
1162 // if we got here because we exceeded large_alloc_lim, then
1163 // proceed straight to GC.
1164 if (g0->n_new_large_words >= large_alloc_lim) {
1165 return rtsTrue;
1166 }
1167
1168 // Otherwise, we just ran out of space in the current nursery.
1169 // Grab another nursery if we can.
1170 if (getNewNursery(cap)) {
1171 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1172 return rtsFalse;
1173 }
1174
1175 return rtsTrue;
1176 /* actual GC is done at the end of the while loop in schedule() */
1177 }
1178
1179 /* -----------------------------------------------------------------------------
1180 * Handle a thread that returned to the scheduler with ThreadYielding
1181 * -------------------------------------------------------------------------- */
1182
1183 static rtsBool
1184 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1185 {
1186 /* put the thread back on the run queue. Then, if we're ready to
1187 * GC, check whether this is the last task to stop. If so, wake
1188 * up the GC thread. getThread will block during a GC until the
1189 * GC is finished.
1190 */
1191
1192 ASSERT(t->_link == END_TSO_QUEUE);
1193
1194 // Shortcut if we're just switching evaluators: don't bother
1195 // doing stack squeezing (which can be expensive), just run the
1196 // thread.
1197 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1198 debugTrace(DEBUG_sched,
1199 "--<< thread %ld (%s) stopped to switch evaluators",
1200 (long)t->id, what_next_strs[t->what_next]);
1201 return rtsTrue;
1202 }
1203
1204 // Reset the context switch flag. We don't do this just before
1205 // running the thread, because that would mean we would lose ticks
1206 // during GC, which can lead to unfair scheduling (a thread hogs
1207 // the CPU because the tick always arrives during GC). This way
1208 // penalises threads that do a lot of allocation, but that seems
1209 // better than the alternative.
1210 if (cap->context_switch != 0) {
1211 cap->context_switch = 0;
1212 appendToRunQueue(cap,t);
1213 } else {
1214 pushOnRunQueue(cap,t);
1215 }
1216
1217 IF_DEBUG(sanity,
1218 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1219 checkTSO(t));
1220
1221 return rtsFalse;
1222 }
1223
1224 /* -----------------------------------------------------------------------------
1225 * Handle a thread that returned to the scheduler with ThreadBlocked
1226 * -------------------------------------------------------------------------- */
1227
1228 static void
1229 scheduleHandleThreadBlocked( StgTSO *t
1230 #if !defined(DEBUG)
1231 STG_UNUSED
1232 #endif
1233 )
1234 {
1235
1236 // We don't need to do anything. The thread is blocked, and it
1237 // has tidied up its stack and placed itself on whatever queue
1238 // it needs to be on.
1239
1240 // ASSERT(t->why_blocked != NotBlocked);
1241 // Not true: for example,
1242 // - the thread may have woken itself up already, because
1243 // threadPaused() might have raised a blocked throwTo
1244 // exception, see maybePerformBlockedException().
1245
1246 #ifdef DEBUG
1247 traceThreadStatus(DEBUG_sched, t);
1248 #endif
1249 }
1250
1251 /* -----------------------------------------------------------------------------
1252 * Handle a thread that returned to the scheduler with ThreadFinished
1253 * -------------------------------------------------------------------------- */
1254
1255 static rtsBool
1256 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1257 {
1258 /* Need to check whether this was a main thread, and if so,
1259 * return with the return value.
1260 *
1261 * We also end up here if the thread kills itself with an
1262 * uncaught exception, see Exception.cmm.
1263 */
1264
1265 // blocked exceptions can now complete, even if the thread was in
1266 // blocked mode (see #2910).
1267 awakenBlockedExceptionQueue (cap, t);
1268
1269 //
1270 // Check whether the thread that just completed was a bound
1271 // thread, and if so return with the result.
1272 //
1273 // There is an assumption here that all thread completion goes
1274 // through this point; we need to make sure that if a thread
1275 // ends up in the ThreadKilled state, that it stays on the run
1276 // queue so it can be dealt with here.
1277 //
1278
1279 if (t->bound) {
1280
1281 if (t->bound != task->incall) {
1282 #if !defined(THREADED_RTS)
1283 // Must be a bound thread that is not the topmost one. Leave
1284 // it on the run queue until the stack has unwound to the
1285 // point where we can deal with this. Leaving it on the run
1286 // queue also ensures that the garbage collector knows about
1287 // this thread and its return value (it gets dropped from the
1288 // step->threads list so there's no other way to find it).
1289 appendToRunQueue(cap,t);
1290 return rtsFalse;
1291 #else
1292 // this cannot happen in the threaded RTS, because a
1293 // bound thread can only be run by the appropriate Task.
1294 barf("finished bound thread that isn't mine");
1295 #endif
1296 }
1297
1298 ASSERT(task->incall->tso == t);
1299
1300 if (t->what_next == ThreadComplete) {
1301 if (task->incall->ret) {
1302 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1303 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1304 }
1305 task->incall->rstat = Success;
1306 } else {
1307 if (task->incall->ret) {
1308 *(task->incall->ret) = NULL;
1309 }
1310 if (sched_state >= SCHED_INTERRUPTING) {
1311 if (heap_overflow) {
1312 task->incall->rstat = HeapExhausted;
1313 } else {
1314 task->incall->rstat = Interrupted;
1315 }
1316 } else {
1317 task->incall->rstat = Killed;
1318 }
1319 }
1320 #ifdef DEBUG
1321 removeThreadLabel((StgWord)task->incall->tso->id);
1322 #endif
1323
1324 // We no longer consider this thread and task to be bound to
1325 // each other. The TSO lives on until it is GC'd, but the
1326 // task is about to be released by the caller, and we don't
1327 // want anyone following the pointer from the TSO to the
1328 // defunct task (which might have already been
1329 // re-used). This was a real bug: the GC updated
1330 // tso->bound->tso which lead to a deadlock.
1331 t->bound = NULL;
1332 task->incall->tso = NULL;
1333
1334 return rtsTrue; // tells schedule() to return
1335 }
1336
1337 return rtsFalse;
1338 }
1339
1340 /* -----------------------------------------------------------------------------
1341 * Perform a heap census
1342 * -------------------------------------------------------------------------- */
1343
1344 static rtsBool
1345 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1346 {
1347 // When we have +RTS -i0 and we're heap profiling, do a census at
1348 // every GC. This lets us get repeatable runs for debugging.
1349 if (performHeapProfile ||
1350 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1351 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1352 return rtsTrue;
1353 } else {
1354 return rtsFalse;
1355 }
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359 * stopAllCapabilities()
1360 *
1361 * Stop all Haskell execution. This is used when we need to make some global
1362 * change to the system, such as altering the number of capabilities, or
1363 * forking.
1364 *
1365 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1366 * -------------------------------------------------------------------------- */
1367
1368 #if defined(THREADED_RTS)
1369 static void stopAllCapabilities (Capability **pCap, Task *task)
1370 {
1371 rtsBool was_syncing;
1372 SyncType prev_sync_type;
1373
1374 PendingSync sync = {
1375 .type = SYNC_OTHER,
1376 .idle = NULL,
1377 .task = task
1378 };
1379
1380 do {
1381 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1382 } while (was_syncing);
1383
1384 acquireAllCapabilities(*pCap,task);
1385
1386 pending_sync = 0;
1387 }
1388 #endif
1389
1390 /* -----------------------------------------------------------------------------
1391 * requestSync()
1392 *
1393 * Commence a synchronisation between all capabilities. Normally not called
1394 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1395 * has some special synchronisation requirements.
1396 *
1397 * Returns:
1398 * rtsFalse if we successfully got a sync
1399 * rtsTrue if there was another sync request in progress,
1400 * and we yielded to it. The value returned is the
1401 * type of the other sync request.
1402 * -------------------------------------------------------------------------- */
1403
1404 #if defined(THREADED_RTS)
1405 static rtsBool requestSync (
1406 Capability **pcap, Task *task, PendingSync *new_sync,
1407 SyncType *prev_sync_type)
1408 {
1409 PendingSync *sync;
1410
1411 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1412 (StgWord)NULL,
1413 (StgWord)new_sync);
1414
1415 if (sync != NULL)
1416 {
1417 // sync is valid until we have called yieldCapability().
1418 // After the sync is completed, we cannot read that struct any
1419 // more because it has been freed.
1420 *prev_sync_type = sync->type;
1421 do {
1422 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1423 sync->type);
1424 ASSERT(*pcap);
1425 yieldCapability(pcap,task,rtsTrue);
1426 sync = pending_sync;
1427 } while (sync != NULL);
1428
1429 // NOTE: task->cap might have changed now
1430 return rtsTrue;
1431 }
1432 else
1433 {
1434 return rtsFalse;
1435 }
1436 }
1437 #endif
1438
1439 /* -----------------------------------------------------------------------------
1440 * acquireAllCapabilities()
1441 *
1442 * Grab all the capabilities except the one we already hold. Used
1443 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1444 * before a fork (SYNC_OTHER).
1445 *
1446 * Only call this after requestSync(), otherwise a deadlock might
1447 * ensue if another thread is trying to synchronise.
1448 * -------------------------------------------------------------------------- */
1449
1450 #ifdef THREADED_RTS
1451 static void acquireAllCapabilities(Capability *cap, Task *task)
1452 {
1453 Capability *tmpcap;
1454 nat i;
1455
1456 ASSERT(pending_sync != NULL);
1457 for (i=0; i < n_capabilities; i++) {
1458 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1459 i, n_capabilities);
1460 tmpcap = capabilities[i];
1461 if (tmpcap != cap) {
1462 // we better hope this task doesn't get migrated to
1463 // another Capability while we're waiting for this one.
1464 // It won't, because load balancing happens while we have
1465 // all the Capabilities, but even so it's a slightly
1466 // unsavoury invariant.
1467 task->cap = tmpcap;
1468 waitForCapability(&tmpcap, task);
1469 if (tmpcap->no != i) {
1470 barf("acquireAllCapabilities: got the wrong capability");
1471 }
1472 }
1473 }
1474 task->cap = cap;
1475 }
1476 #endif
1477
1478 /* -----------------------------------------------------------------------------
1479 * releaseAllcapabilities()
1480 *
1481 * Assuming this thread holds all the capabilities, release them all except for
1482 * the one passed in as cap.
1483 * -------------------------------------------------------------------------- */
1484
1485 #ifdef THREADED_RTS
1486 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1487 {
1488 nat i;
1489
1490 for (i = 0; i < n; i++) {
1491 if (cap->no != i) {
1492 task->cap = capabilities[i];
1493 releaseCapability(capabilities[i]);
1494 }
1495 }
1496 task->cap = cap;
1497 }
1498 #endif
1499
1500 /* -----------------------------------------------------------------------------
1501 * Perform a garbage collection if necessary
1502 * -------------------------------------------------------------------------- */
1503
1504 static void
1505 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1506 rtsBool force_major)
1507 {
1508 Capability *cap = *pcap;
1509 rtsBool heap_census;
1510 nat collect_gen;
1511 rtsBool major_gc;
1512 #ifdef THREADED_RTS
1513 nat gc_type;
1514 nat i;
1515 nat need_idle;
1516 nat n_idle_caps = 0, n_failed_trygrab_idles = 0;
1517 StgTSO *tso;
1518 rtsBool *idle_cap;
1519 #endif
1520
1521 if (sched_state == SCHED_SHUTTING_DOWN) {
1522 // The final GC has already been done, and the system is
1523 // shutting down. We'll probably deadlock if we try to GC
1524 // now.
1525 return;
1526 }
1527
1528 heap_census = scheduleNeedHeapProfile(rtsTrue);
1529
1530 // Figure out which generation we are collecting, so that we can
1531 // decide whether this is a parallel GC or not.
1532 collect_gen = calcNeeded(force_major || heap_census, NULL);
1533 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1534
1535 #ifdef THREADED_RTS
1536 if (sched_state < SCHED_INTERRUPTING
1537 && RtsFlags.ParFlags.parGcEnabled
1538 && collect_gen >= RtsFlags.ParFlags.parGcGen
1539 && ! oldest_gen->mark)
1540 {
1541 gc_type = SYNC_GC_PAR;
1542 } else {
1543 gc_type = SYNC_GC_SEQ;
1544 }
1545
1546 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1547 need_idle = stg_max(0, enabled_capabilities -
1548 RtsFlags.ParFlags.parGcThreads);
1549 } else {
1550 need_idle = 0;
1551 }
1552
1553 // In order to GC, there must be no threads running Haskell code.
1554 // Therefore, the GC thread needs to hold *all* the capabilities,
1555 // and release them after the GC has completed.
1556 //
1557 // This seems to be the simplest way: previous attempts involved
1558 // making all the threads with capabilities give up their
1559 // capabilities and sleep except for the *last* one, which
1560 // actually did the GC. But it's quite hard to arrange for all
1561 // the other tasks to sleep and stay asleep.
1562 //
1563
1564 /* Other capabilities are prevented from running yet more Haskell
1565 threads if pending_sync is set. Tested inside
1566 yieldCapability() and releaseCapability() in Capability.c */
1567
1568 PendingSync sync = {
1569 .type = gc_type,
1570 .idle = NULL,
1571 .task = task
1572 };
1573
1574 {
1575 SyncType prev_sync = 0;
1576 rtsBool was_syncing;
1577 do {
1578 // We need an array of size n_capabilities, but since this may
1579 // change each time around the loop we must allocate it afresh.
1580 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1581 sizeof(rtsBool),
1582 "scheduleDoGC");
1583 sync.idle = idle_cap;
1584
1585 // When using +RTS -qn, we need some capabilities to be idle during
1586 // GC. The best bet is to choose some inactive ones, so we look for
1587 // those first:
1588 nat n_idle = need_idle;
1589 for (i=0; i < n_capabilities; i++) {
1590 if (capabilities[i]->disabled) {
1591 idle_cap[i] = rtsTrue;
1592 } else if (n_idle > 0 &&
1593 capabilities[i]->running_task == NULL) {
1594 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1595 n_idle--;
1596 idle_cap[i] = rtsTrue;
1597 } else {
1598 idle_cap[i] = rtsFalse;
1599 }
1600 }
1601 // If we didn't find enough inactive capabilities, just pick some
1602 // more to be idle.
1603 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1604 if (!idle_cap[i] && i != cap->no) {
1605 idle_cap[i] = rtsTrue;
1606 n_idle--;
1607 }
1608 }
1609 ASSERT(n_idle == 0);
1610
1611 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1612 cap = *pcap;
1613 if (was_syncing) {
1614 stgFree(idle_cap);
1615 }
1616 if (was_syncing &&
1617 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1618 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1619 // someone else had a pending sync request for a GC, so
1620 // let's assume GC has been done and we don't need to GC
1621 // again.
1622 // Exception to this: if SCHED_INTERRUPTING, then we still
1623 // need to do the final GC.
1624 return;
1625 }
1626 if (sched_state == SCHED_SHUTTING_DOWN) {
1627 // The scheduler might now be shutting down. We tested
1628 // this above, but it might have become true since then as
1629 // we yielded the capability in requestSync().
1630 return;
1631 }
1632 } while (was_syncing);
1633 }
1634
1635 stat_startGCSync(gc_threads[cap->no]);
1636
1637 #ifdef DEBUG
1638 unsigned int old_n_capabilities = n_capabilities;
1639 #endif
1640
1641 interruptAllCapabilities();
1642
1643 // The final shutdown GC is always single-threaded, because it's
1644 // possible that some of the Capabilities have no worker threads.
1645
1646 if (gc_type == SYNC_GC_SEQ) {
1647 traceEventRequestSeqGc(cap);
1648 } else {
1649 traceEventRequestParGc(cap);
1650 }
1651
1652 if (gc_type == SYNC_GC_SEQ) {
1653 // single-threaded GC: grab all the capabilities
1654 acquireAllCapabilities(cap,task);
1655 }
1656 else
1657 {
1658 // If we are load-balancing collections in this
1659 // generation, then we require all GC threads to participate
1660 // in the collection. Otherwise, we only require active
1661 // threads to participate, and we set gc_threads[i]->idle for
1662 // any idle capabilities. The rationale here is that waking
1663 // up an idle Capability takes much longer than just doing any
1664 // GC work on its behalf.
1665
1666 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1667 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1668 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1669 {
1670 for (i=0; i < n_capabilities; i++) {
1671 if (capabilities[i]->disabled) {
1672 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1673 if (idle_cap[i]) {
1674 n_idle_caps++;
1675 }
1676 } else {
1677 if (i != cap->no && idle_cap[i]) {
1678 Capability *tmpcap = capabilities[i];
1679 task->cap = tmpcap;
1680 waitForCapability(&tmpcap, task);
1681 n_idle_caps++;
1682 }
1683 }
1684 }
1685 }
1686 else
1687 {
1688 for (i=0; i < n_capabilities; i++) {
1689 if (capabilities[i]->disabled) {
1690 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1691 if (idle_cap[i]) {
1692 n_idle_caps++;
1693 }
1694 } else if (i != cap->no &&
1695 capabilities[i]->idle >=
1696 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1697 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1698 if (idle_cap[i]) {
1699 n_idle_caps++;
1700 } else {
1701 n_failed_trygrab_idles++;
1702 }
1703 }
1704 }
1705 }
1706 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1707
1708 // We set the gc_thread[i]->idle flag if that
1709 // capability/thread is not participating in this collection.
1710 // We also keep a local record of which capabilities are idle
1711 // in idle_cap[], because scheduleDoGC() is re-entrant:
1712 // another thread might start a GC as soon as we've finished
1713 // this one, and thus the gc_thread[]->idle flags are invalid
1714 // as soon as we release any threads after GC. Getting this
1715 // wrong leads to a rare and hard to debug deadlock!
1716
1717 for (i=0; i < n_capabilities; i++) {
1718 gc_threads[i]->idle = idle_cap[i];
1719 capabilities[i]->idle++;
1720 }
1721
1722 // For all capabilities participating in this GC, wait until
1723 // they have stopped mutating and are standing by for GC.
1724 waitForGcThreads(cap);
1725
1726 #if defined(THREADED_RTS)
1727 // Stable point where we can do a global check on our spark counters
1728 ASSERT(checkSparkCountInvariant());
1729 #endif
1730 }
1731
1732 #endif
1733
1734 IF_DEBUG(scheduler, printAllThreads());
1735
1736 delete_threads_and_gc:
1737 /*
1738 * We now have all the capabilities; if we're in an interrupting
1739 * state, then we should take the opportunity to delete all the
1740 * threads in the system.
1741 * Checking for major_gc ensures that the last GC is major.
1742 */
1743 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1744 deleteAllThreads(cap);
1745 #if defined(THREADED_RTS)
1746 // Discard all the sparks from every Capability. Why?
1747 // They'll probably be GC'd anyway since we've killed all the
1748 // threads. It just avoids the GC having to do any work to
1749 // figure out that any remaining sparks are garbage.
1750 for (i = 0; i < n_capabilities; i++) {
1751 capabilities[i]->spark_stats.gcd +=
1752 sparkPoolSize(capabilities[i]->sparks);
1753 // No race here since all Caps are stopped.
1754 discardSparksCap(capabilities[i]);
1755 }
1756 #endif
1757 sched_state = SCHED_SHUTTING_DOWN;
1758 }
1759
1760 /*
1761 * When there are disabled capabilities, we want to migrate any
1762 * threads away from them. Normally this happens in the
1763 * scheduler's loop, but only for unbound threads - it's really
1764 * hard for a bound thread to migrate itself. So we have another
1765 * go here.
1766 */
1767 #if defined(THREADED_RTS)
1768 for (i = enabled_capabilities; i < n_capabilities; i++) {
1769 Capability *tmp_cap, *dest_cap;
1770 tmp_cap = capabilities[i];
1771 ASSERT(tmp_cap->disabled);
1772 if (i != cap->no) {
1773 dest_cap = capabilities[i % enabled_capabilities];
1774 while (!emptyRunQueue(tmp_cap)) {
1775 tso = popRunQueue(tmp_cap);
1776 migrateThread(tmp_cap, tso, dest_cap);
1777 if (tso->bound) {
1778 traceTaskMigrate(tso->bound->task,
1779 tso->bound->task->cap,
1780 dest_cap);
1781 tso->bound->task->cap = dest_cap;
1782 }
1783 }
1784 }
1785 }
1786 #endif
1787
1788 #if defined(THREADED_RTS)
1789 // reset pending_sync *before* GC, so that when the GC threads
1790 // emerge they don't immediately re-enter the GC.
1791 pending_sync = 0;
1792 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1793 #else
1794 GarbageCollect(collect_gen, heap_census, 0, cap);
1795 #endif
1796
1797 traceSparkCounters(cap);
1798
1799 switch (recent_activity) {
1800 case ACTIVITY_INACTIVE:
1801 if (force_major) {
1802 // We are doing a GC because the system has been idle for a
1803 // timeslice and we need to check for deadlock. Record the
1804 // fact that we've done a GC and turn off the timer signal;
1805 // it will get re-enabled if we run any threads after the GC.
1806 recent_activity = ACTIVITY_DONE_GC;
1807 #ifndef PROFILING
1808 stopTimer();
1809 #endif
1810 break;
1811 }
1812 // fall through...
1813
1814 case ACTIVITY_MAYBE_NO:
1815 // the GC might have taken long enough for the timer to set
1816 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1817 // but we aren't necessarily deadlocked:
1818 recent_activity = ACTIVITY_YES;
1819 break;
1820
1821 case ACTIVITY_DONE_GC:
1822 // If we are actually active, the scheduler will reset the
1823 // recent_activity flag and re-enable the timer.
1824 break;
1825 }
1826
1827 #if defined(THREADED_RTS)
1828 // Stable point where we can do a global check on our spark counters
1829 ASSERT(checkSparkCountInvariant());
1830 #endif
1831
1832 // The heap census itself is done during GarbageCollect().
1833 if (heap_census) {
1834 performHeapProfile = rtsFalse;
1835 }
1836
1837 #if defined(THREADED_RTS)
1838
1839 // If n_capabilities has changed during GC, we're in trouble.
1840 ASSERT(n_capabilities == old_n_capabilities);
1841
1842 if (gc_type == SYNC_GC_PAR)
1843 {
1844 releaseGCThreads(cap);
1845 for (i = 0; i < n_capabilities; i++) {
1846 if (i != cap->no) {
1847 if (idle_cap[i]) {
1848 ASSERT(capabilities[i]->running_task == task);
1849 task->cap = capabilities[i];
1850 releaseCapability(capabilities[i]);
1851 } else {
1852 ASSERT(capabilities[i]->running_task != task);
1853 }
1854 }
1855 }
1856 task->cap = cap;
1857 }
1858 #endif
1859
1860 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1861 // GC set the heap_overflow flag, so we should proceed with
1862 // an orderly shutdown now. Ultimately we want the main
1863 // thread to return to its caller with HeapExhausted, at which
1864 // point the caller should call hs_exit(). The first step is
1865 // to delete all the threads.
1866 //
1867 // Another way to do this would be to raise an exception in
1868 // the main thread, which we really should do because it gives
1869 // the program a chance to clean up. But how do we find the
1870 // main thread? It should presumably be the same one that
1871 // gets ^C exceptions, but that's all done on the Haskell side
1872 // (GHC.TopHandler).
1873 sched_state = SCHED_INTERRUPTING;
1874 goto delete_threads_and_gc;
1875 }
1876
1877 #ifdef SPARKBALANCE
1878 /* JB
1879 Once we are all together... this would be the place to balance all
1880 spark pools. No concurrent stealing or adding of new sparks can
1881 occur. Should be defined in Sparks.c. */
1882 balanceSparkPoolsCaps(n_capabilities, capabilities);
1883 #endif
1884
1885 #if defined(THREADED_RTS)
1886 stgFree(idle_cap);
1887
1888 if (gc_type == SYNC_GC_SEQ) {
1889 // release our stash of capabilities.
1890 releaseAllCapabilities(n_capabilities, cap, task);
1891 }
1892 #endif
1893
1894 return;
1895 }
1896
1897 /* ---------------------------------------------------------------------------
1898 * Singleton fork(). Do not copy any running threads.
1899 * ------------------------------------------------------------------------- */
1900
1901 pid_t
1902 forkProcess(HsStablePtr *entry
1903 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1904 STG_UNUSED
1905 #endif
1906 )
1907 {
1908 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1909 pid_t pid;
1910 StgTSO* t,*next;
1911 Capability *cap;
1912 nat g;
1913 Task *task = NULL;
1914 nat i;
1915
1916 debugTrace(DEBUG_sched, "forking!");
1917
1918 task = newBoundTask();
1919
1920 cap = NULL;
1921 waitForCapability(&cap, task);
1922
1923 #ifdef THREADED_RTS
1924 stopAllCapabilities(&cap, task);
1925 #endif
1926
1927 // no funny business: hold locks while we fork, otherwise if some
1928 // other thread is holding a lock when the fork happens, the data
1929 // structure protected by the lock will forever be in an
1930 // inconsistent state in the child. See also #1391.
1931 ACQUIRE_LOCK(&sched_mutex);
1932 ACQUIRE_LOCK(&sm_mutex);
1933 ACQUIRE_LOCK(&stable_mutex);
1934 ACQUIRE_LOCK(&task->lock);
1935
1936 for (i=0; i < n_capabilities; i++) {
1937 ACQUIRE_LOCK(&capabilities[i]->lock);
1938 }
1939
1940 #ifdef THREADED_RTS
1941 ACQUIRE_LOCK(&all_tasks_mutex);
1942 #endif
1943
1944 stopTimer(); // See #4074
1945
1946 #if defined(TRACING)
1947 flushEventLog(); // so that child won't inherit dirty file buffers
1948 #endif
1949
1950 pid = fork();
1951
1952 if (pid) { // parent
1953
1954 startTimer(); // #4074
1955
1956 RELEASE_LOCK(&sched_mutex);
1957 RELEASE_LOCK(&sm_mutex);
1958 RELEASE_LOCK(&stable_mutex);
1959 RELEASE_LOCK(&task->lock);
1960
1961 for (i=0; i < n_capabilities; i++) {
1962 releaseCapability_(capabilities[i],rtsFalse);
1963 RELEASE_LOCK(&capabilities[i]->lock);
1964 }
1965
1966 #ifdef THREADED_RTS
1967 RELEASE_LOCK(&all_tasks_mutex);
1968 #endif
1969
1970 boundTaskExiting(task);
1971
1972 // just return the pid
1973 return pid;
1974
1975 } else { // child
1976
1977 #if defined(THREADED_RTS)
1978 initMutex(&sched_mutex);
1979 initMutex(&sm_mutex);
1980 initMutex(&stable_mutex);
1981 initMutex(&task->lock);
1982
1983 for (i=0; i < n_capabilities; i++) {
1984 initMutex(&capabilities[i]->lock);
1985 }
1986
1987 initMutex(&all_tasks_mutex);
1988 #endif
1989
1990 #ifdef TRACING
1991 resetTracing();
1992 #endif
1993
1994 // Now, all OS threads except the thread that forked are
1995 // stopped. We need to stop all Haskell threads, including
1996 // those involved in foreign calls. Also we need to delete
1997 // all Tasks, because they correspond to OS threads that are
1998 // now gone.
1999
2000 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2001 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2002 next = t->global_link;
2003 // don't allow threads to catch the ThreadKilled
2004 // exception, but we do want to raiseAsync() because these
2005 // threads may be evaluating thunks that we need later.
2006 deleteThread_(t->cap,t);
2007
2008 // stop the GC from updating the InCall to point to
2009 // the TSO. This is only necessary because the
2010 // OSThread bound to the TSO has been killed, and
2011 // won't get a chance to exit in the usual way (see
2012 // also scheduleHandleThreadFinished).
2013 t->bound = NULL;
2014 }
2015 }
2016
2017 discardTasksExcept(task);
2018
2019 for (i=0; i < n_capabilities; i++) {
2020 cap = capabilities[i];
2021
2022 // Empty the run queue. It seems tempting to let all the
2023 // killed threads stay on the run queue as zombies to be
2024 // cleaned up later, but some of them may correspond to
2025 // bound threads for which the corresponding Task does not
2026 // exist.
2027 truncateRunQueue(cap);
2028 cap->n_run_queue = 0;
2029
2030 // Any suspended C-calling Tasks are no more, their OS threads
2031 // don't exist now:
2032 cap->suspended_ccalls = NULL;
2033 cap->n_suspended_ccalls = 0;
2034
2035 #if defined(THREADED_RTS)
2036 // Wipe our spare workers list, they no longer exist. New
2037 // workers will be created if necessary.
2038 cap->spare_workers = NULL;
2039 cap->n_spare_workers = 0;
2040 cap->returning_tasks_hd = NULL;
2041 cap->returning_tasks_tl = NULL;
2042 cap->n_returning_tasks = 0;
2043 #endif
2044
2045 // Release all caps except 0, we'll use that for starting
2046 // the IO manager and running the client action below.
2047 if (cap->no != 0) {
2048 task->cap = cap;
2049 releaseCapability(cap);
2050 }
2051 }
2052 cap = capabilities[0];
2053 task->cap = cap;
2054
2055 // Empty the threads lists. Otherwise, the garbage
2056 // collector may attempt to resurrect some of these threads.
2057 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2058 generations[g].threads = END_TSO_QUEUE;
2059 }
2060
2061 // On Unix, all timers are reset in the child, so we need to start
2062 // the timer again.
2063 initTimer();
2064 startTimer();
2065
2066 // TODO: need to trace various other things in the child
2067 // like startup event, capabilities, process info etc
2068 traceTaskCreate(task, cap);
2069
2070 #if defined(THREADED_RTS)
2071 ioManagerStartCap(&cap);
2072 #endif
2073
2074 rts_evalStableIO(&cap, entry, NULL); // run the action
2075 rts_checkSchedStatus("forkProcess",cap);
2076
2077 rts_unlock(cap);
2078 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2079 }
2080 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2081 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2082 #endif
2083 }
2084
2085 /* ---------------------------------------------------------------------------
2086 * Changing the number of Capabilities
2087 *
2088 * Changing the number of Capabilities is very tricky! We can only do
2089 * it with the system fully stopped, so we do a full sync with
2090 * requestSync(SYNC_OTHER) and grab all the capabilities.
2091 *
2092 * Then we resize the appropriate data structures, and update all
2093 * references to the old data structures which have now moved.
2094 * Finally we release the Capabilities we are holding, and start
2095 * worker Tasks on the new Capabilities we created.
2096 *
2097 * ------------------------------------------------------------------------- */
2098
2099 void
2100 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
2101 {
2102 #if !defined(THREADED_RTS)
2103 if (new_n_capabilities != 1) {
2104 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2105 }
2106 return;
2107 #elif defined(NOSMP)
2108 if (new_n_capabilities != 1) {
2109 errorBelch("setNumCapabilities: not supported on this platform");
2110 }
2111 return;
2112 #else
2113 Task *task;
2114 Capability *cap;
2115 nat n;
2116 Capability *old_capabilities = NULL;
2117 nat old_n_capabilities = n_capabilities;
2118
2119 if (new_n_capabilities == enabled_capabilities) return;
2120
2121 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2122 enabled_capabilities, new_n_capabilities);
2123
2124 cap = rts_lock();
2125 task = cap->running_task;
2126
2127 stopAllCapabilities(&cap, task);
2128
2129 if (new_n_capabilities < enabled_capabilities)
2130 {
2131 // Reducing the number of capabilities: we do not actually
2132 // remove the extra capabilities, we just mark them as
2133 // "disabled". This has the following effects:
2134 //
2135 // - threads on a disabled capability are migrated away by the
2136 // scheduler loop
2137 //
2138 // - disabled capabilities do not participate in GC
2139 // (see scheduleDoGC())
2140 //
2141 // - No spark threads are created on this capability
2142 // (see scheduleActivateSpark())
2143 //
2144 // - We do not attempt to migrate threads *to* a disabled
2145 // capability (see schedulePushWork()).
2146 //
2147 // but in other respects, a disabled capability remains
2148 // alive. Threads may be woken up on a disabled capability,
2149 // but they will be immediately migrated away.
2150 //
2151 // This approach is much easier than trying to actually remove
2152 // the capability; we don't have to worry about GC data
2153 // structures, the nursery, etc.
2154 //
2155 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2156 capabilities[n]->disabled = rtsTrue;
2157 traceCapDisable(capabilities[n]);
2158 }
2159 enabled_capabilities = new_n_capabilities;
2160 }
2161 else
2162 {
2163 // Increasing the number of enabled capabilities.
2164 //
2165 // enable any disabled capabilities, up to the required number
2166 for (n = enabled_capabilities;
2167 n < new_n_capabilities && n < n_capabilities; n++) {
2168 capabilities[n]->disabled = rtsFalse;
2169 traceCapEnable(capabilities[n]);
2170 }
2171 enabled_capabilities = n;
2172
2173 if (new_n_capabilities > n_capabilities) {
2174 #if defined(TRACING)
2175 // Allocate eventlog buffers for the new capabilities. Note this
2176 // must be done before calling moreCapabilities(), because that
2177 // will emit events about creating the new capabilities and adding
2178 // them to existing capsets.
2179 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2180 #endif
2181
2182 // Resize the capabilities array
2183 // NB. after this, capabilities points somewhere new. Any pointers
2184 // of type (Capability *) are now invalid.
2185 moreCapabilities(n_capabilities, new_n_capabilities);
2186
2187 // Resize and update storage manager data structures
2188 storageAddCapabilities(n_capabilities, new_n_capabilities);
2189 }
2190 }
2191
2192 // update n_capabilities before things start running
2193 if (new_n_capabilities > n_capabilities) {
2194 n_capabilities = enabled_capabilities = new_n_capabilities;
2195 }
2196
2197 // We're done: release the original Capabilities
2198 releaseAllCapabilities(old_n_capabilities, cap,task);
2199
2200 // We can't free the old array until now, because we access it
2201 // while updating pointers in updateCapabilityRefs().
2202 if (old_capabilities) {
2203 stgFree(old_capabilities);
2204 }
2205
2206 // Notify IO manager that the number of capabilities has changed.
2207 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2208
2209 rts_unlock(cap);
2210
2211 #endif // THREADED_RTS
2212 }
2213
2214
2215
2216 /* ---------------------------------------------------------------------------
2217 * Delete all the threads in the system
2218 * ------------------------------------------------------------------------- */
2219
2220 static void
2221 deleteAllThreads ( Capability *cap )
2222 {
2223 // NOTE: only safe to call if we own all capabilities.
2224
2225 StgTSO* t, *next;
2226 nat g;
2227
2228 debugTrace(DEBUG_sched,"deleting all threads");
2229 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2230 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2231 next = t->global_link;
2232 deleteThread(cap,t);
2233 }
2234 }
2235
2236 // The run queue now contains a bunch of ThreadKilled threads. We
2237 // must not throw these away: the main thread(s) will be in there
2238 // somewhere, and the main scheduler loop has to deal with it.
2239 // Also, the run queue is the only thing keeping these threads from
2240 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2241
2242 #if !defined(THREADED_RTS)
2243 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2244 ASSERT(sleeping_queue == END_TSO_QUEUE);
2245 #endif
2246 }
2247
2248 /* -----------------------------------------------------------------------------
2249 Managing the suspended_ccalls list.
2250 Locks required: sched_mutex
2251 -------------------------------------------------------------------------- */
2252
2253 STATIC_INLINE void
2254 suspendTask (Capability *cap, Task *task)
2255 {
2256 InCall *incall;
2257
2258 incall = task->incall;
2259 ASSERT(incall->next == NULL && incall->prev == NULL);
2260 incall->next = cap->suspended_ccalls;
2261 incall->prev = NULL;
2262 if (cap->suspended_ccalls) {
2263 cap->suspended_ccalls->prev = incall;
2264 }
2265 cap->suspended_ccalls = incall;
2266 cap->n_suspended_ccalls++;
2267 }
2268
2269 STATIC_INLINE void
2270 recoverSuspendedTask (Capability *cap, Task *task)
2271 {
2272 InCall *incall;
2273
2274 incall = task->incall;
2275 if (incall->prev) {
2276 incall->prev->next = incall->next;
2277 } else {
2278 ASSERT(cap->suspended_ccalls == incall);
2279 cap->suspended_ccalls = incall->next;
2280 }
2281 if (incall->next) {
2282 incall->next->prev = incall->prev;
2283 }
2284 incall->next = incall->prev = NULL;
2285 cap->n_suspended_ccalls--;
2286 }
2287
2288 /* ---------------------------------------------------------------------------
2289 * Suspending & resuming Haskell threads.
2290 *
2291 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2292 * its capability before calling the C function. This allows another
2293 * task to pick up the capability and carry on running Haskell
2294 * threads. It also means that if the C call blocks, it won't lock
2295 * the whole system.
2296 *
2297 * The Haskell thread making the C call is put to sleep for the
2298 * duration of the call, on the suspended_ccalling_threads queue. We
2299 * give out a token to the task, which it can use to resume the thread
2300 * on return from the C function.
2301 *
2302 * If this is an interruptible C call, this means that the FFI call may be
2303 * unceremoniously terminated and should be scheduled on an
2304 * unbound worker thread.
2305 * ------------------------------------------------------------------------- */
2306
2307 void *
2308 suspendThread (StgRegTable *reg, rtsBool interruptible)
2309 {
2310 Capability *cap;
2311 int saved_errno;
2312 StgTSO *tso;
2313 Task *task;
2314 #if mingw32_HOST_OS
2315 StgWord32 saved_winerror;
2316 #endif
2317
2318 saved_errno = errno;
2319 #if mingw32_HOST_OS
2320 saved_winerror = GetLastError();
2321 #endif
2322
2323 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2324 */
2325 cap = regTableToCapability(reg);
2326
2327 task = cap->running_task;
2328 tso = cap->r.rCurrentTSO;
2329
2330 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2331
2332 // XXX this might not be necessary --SDM
2333 tso->what_next = ThreadRunGHC;
2334
2335 threadPaused(cap,tso);
2336
2337 if (interruptible) {
2338 tso->why_blocked = BlockedOnCCall_Interruptible;
2339 } else {
2340 tso->why_blocked = BlockedOnCCall;
2341 }
2342
2343 // Hand back capability
2344 task->incall->suspended_tso = tso;
2345 task->incall->suspended_cap = cap;
2346
2347 // Otherwise allocate() will write to invalid memory.
2348 cap->r.rCurrentTSO = NULL;
2349
2350 ACQUIRE_LOCK(&cap->lock);
2351
2352 suspendTask(cap,task);
2353 cap->in_haskell = rtsFalse;
2354 releaseCapability_(cap,rtsFalse);
2355
2356 RELEASE_LOCK(&cap->lock);
2357
2358 errno = saved_errno;
2359 #if mingw32_HOST_OS
2360 SetLastError(saved_winerror);
2361 #endif
2362 return task;
2363 }
2364
2365 StgRegTable *
2366 resumeThread (void *task_)
2367 {
2368 StgTSO *tso;
2369 InCall *incall;
2370 Capability *cap;
2371 Task *task = task_;
2372 int saved_errno;
2373 #if mingw32_HOST_OS
2374 StgWord32 saved_winerror;
2375 #endif
2376
2377 saved_errno = errno;
2378 #if mingw32_HOST_OS
2379 saved_winerror = GetLastError();
2380 #endif
2381
2382 incall = task->incall;
2383 cap = incall->suspended_cap;
2384 task->cap = cap;
2385
2386 // Wait for permission to re-enter the RTS with the result.
2387 waitForCapability(&cap,task);
2388 // we might be on a different capability now... but if so, our
2389 // entry on the suspended_ccalls list will also have been
2390 // migrated.
2391
2392 // Remove the thread from the suspended list
2393 recoverSuspendedTask(cap,task);
2394
2395 tso = incall->suspended_tso;
2396 incall->suspended_tso = NULL;
2397 incall->suspended_cap = NULL;
2398 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2399
2400 traceEventRunThread(cap, tso);
2401
2402 /* Reset blocking status */
2403 tso->why_blocked = NotBlocked;
2404
2405 if ((tso->flags & TSO_BLOCKEX) == 0) {
2406 // avoid locking the TSO if we don't have to
2407 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2408 maybePerformBlockedException(cap,tso);
2409 }
2410 }
2411
2412 cap->r.rCurrentTSO = tso;
2413 cap->in_haskell = rtsTrue;
2414 errno = saved_errno;
2415 #if mingw32_HOST_OS
2416 SetLastError(saved_winerror);
2417 #endif
2418
2419 /* We might have GC'd, mark the TSO dirty again */
2420 dirty_TSO(cap,tso);
2421 dirty_STACK(cap,tso->stackobj);
2422
2423 IF_DEBUG(sanity, checkTSO(tso));
2424
2425 return &cap->r;
2426 }
2427
2428 /* ---------------------------------------------------------------------------
2429 * scheduleThread()
2430 *
2431 * scheduleThread puts a thread on the end of the runnable queue.
2432 * This will usually be done immediately after a thread is created.
2433 * The caller of scheduleThread must create the thread using e.g.
2434 * createThread and push an appropriate closure
2435 * on this thread's stack before the scheduler is invoked.
2436 * ------------------------------------------------------------------------ */
2437
2438 void
2439 scheduleThread(Capability *cap, StgTSO *tso)
2440 {
2441 // The thread goes at the *end* of the run-queue, to avoid possible
2442 // starvation of any threads already on the queue.
2443 appendToRunQueue(cap,tso);
2444 }
2445
2446 void
2447 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2448 {
2449 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2450 // move this thread from now on.
2451 #if defined(THREADED_RTS)
2452 cpu %= enabled_capabilities;
2453 if (cpu == cap->no) {
2454 appendToRunQueue(cap,tso);
2455 } else {
2456 migrateThread(cap, tso, capabilities[cpu]);
2457 }
2458 #else
2459 appendToRunQueue(cap,tso);
2460 #endif
2461 }
2462
2463 void
2464 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2465 {
2466 Task *task;
2467 DEBUG_ONLY( StgThreadID id );
2468 Capability *cap;
2469
2470 cap = *pcap;
2471
2472 // We already created/initialised the Task
2473 task = cap->running_task;
2474
2475 // This TSO is now a bound thread; make the Task and TSO
2476 // point to each other.
2477 tso->bound = task->incall;
2478 tso->cap = cap;
2479
2480 task->incall->tso = tso;
2481 task->incall->ret = ret;
2482 task->incall->rstat = NoStatus;
2483
2484 appendToRunQueue(cap,tso);
2485
2486 DEBUG_ONLY( id = tso->id );
2487 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2488
2489 cap = schedule(cap,task);
2490
2491 ASSERT(task->incall->rstat != NoStatus);
2492 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2493
2494 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2495 *pcap = cap;
2496 }
2497
2498 /* ----------------------------------------------------------------------------
2499 * Starting Tasks
2500 * ------------------------------------------------------------------------- */
2501
2502 #if defined(THREADED_RTS)
2503 void scheduleWorker (Capability *cap, Task *task)
2504 {
2505 // schedule() runs without a lock.
2506 cap = schedule(cap,task);
2507
2508 // On exit from schedule(), we have a Capability, but possibly not
2509 // the same one we started with.
2510
2511 // During shutdown, the requirement is that after all the
2512 // Capabilities are shut down, all workers that are shutting down
2513 // have finished workerTaskStop(). This is why we hold on to
2514 // cap->lock until we've finished workerTaskStop() below.
2515 //
2516 // There may be workers still involved in foreign calls; those
2517 // will just block in waitForCapability() because the
2518 // Capability has been shut down.
2519 //
2520 ACQUIRE_LOCK(&cap->lock);
2521 releaseCapability_(cap,rtsFalse);
2522 workerTaskStop(task);
2523 RELEASE_LOCK(&cap->lock);
2524 }
2525 #endif
2526
2527 /* ---------------------------------------------------------------------------
2528 * Start new worker tasks on Capabilities from--to
2529 * -------------------------------------------------------------------------- */
2530
2531 static void
2532 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2533 {
2534 #if defined(THREADED_RTS)
2535 nat i;
2536 Capability *cap;
2537
2538 for (i = from; i < to; i++) {
2539 cap = capabilities[i];
2540 ACQUIRE_LOCK(&cap->lock);
2541 startWorkerTask(cap);
2542 RELEASE_LOCK(&cap->lock);
2543 }
2544 #endif
2545 }
2546
2547 /* ---------------------------------------------------------------------------
2548 * initScheduler()
2549 *
2550 * Initialise the scheduler. This resets all the queues - if the
2551 * queues contained any threads, they'll be garbage collected at the
2552 * next pass.
2553 *
2554 * ------------------------------------------------------------------------ */
2555
2556 void
2557 initScheduler(void)
2558 {
2559 #if !defined(THREADED_RTS)
2560 blocked_queue_hd = END_TSO_QUEUE;
2561 blocked_queue_tl = END_TSO_QUEUE;
2562 sleeping_queue = END_TSO_QUEUE;
2563 #endif
2564
2565 sched_state = SCHED_RUNNING;
2566 recent_activity = ACTIVITY_YES;
2567
2568 #if defined(THREADED_RTS)
2569 /* Initialise the mutex and condition variables used by
2570 * the scheduler. */
2571 initMutex(&sched_mutex);
2572 #endif
2573
2574 ACQUIRE_LOCK(&sched_mutex);
2575
2576 /* A capability holds the state a native thread needs in
2577 * order to execute STG code. At least one capability is
2578 * floating around (only THREADED_RTS builds have more than one).
2579 */
2580 initCapabilities();
2581
2582 initTaskManager();
2583
2584 /*
2585 * Eagerly start one worker to run each Capability, except for
2586 * Capability 0. The idea is that we're probably going to start a
2587 * bound thread on Capability 0 pretty soon, so we don't want a
2588 * worker task hogging it.
2589 */
2590 startWorkerTasks(1, n_capabilities);
2591
2592 RELEASE_LOCK(&sched_mutex);
2593
2594 }
2595
2596 void
2597 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2598 /* see Capability.c, shutdownCapability() */
2599 {
2600 Task *task = NULL;
2601
2602 task = newBoundTask();
2603
2604 // If we haven't killed all the threads yet, do it now.
2605 if (sched_state < SCHED_SHUTTING_DOWN) {
2606 sched_state = SCHED_INTERRUPTING;
2607 Capability *cap = task->cap;
2608 waitForCapability(&cap,task);
2609 scheduleDoGC(&cap,task,rtsTrue);
2610 ASSERT(task->incall->tso == NULL);
2611 releaseCapability(cap);
2612 }
2613 sched_state = SCHED_SHUTTING_DOWN;
2614
2615 shutdownCapabilities(task, wait_foreign);
2616
2617 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2618 // n_failed_trygrab_idles, n_idle_caps);
2619
2620 boundTaskExiting(task);
2621 }
2622
2623 void
2624 freeScheduler( void )
2625 {
2626 nat still_running;
2627
2628 ACQUIRE_LOCK(&sched_mutex);
2629 still_running = freeTaskManager();
2630 // We can only free the Capabilities if there are no Tasks still
2631 // running. We might have a Task about to return from a foreign
2632 // call into waitForCapability(), for example (actually,
2633 // this should be the *only* thing that a still-running Task can
2634 // do at this point, and it will block waiting for the
2635 // Capability).
2636 if (still_running == 0) {
2637 freeCapabilities();
2638 }
2639 RELEASE_LOCK(&sched_mutex);
2640 #if defined(THREADED_RTS)
2641 closeMutex(&sched_mutex);
2642 #endif
2643 }
2644
2645 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2646 void *user USED_IF_NOT_THREADS)
2647 {
2648 #if !defined(THREADED_RTS)
2649 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2650 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2651 evac(user, (StgClosure **)(void *)&sleeping_queue);
2652 #endif
2653 }
2654
2655 /* -----------------------------------------------------------------------------
2656 performGC
2657
2658 This is the interface to the garbage collector from Haskell land.
2659 We provide this so that external C code can allocate and garbage
2660 collect when called from Haskell via _ccall_GC.
2661 -------------------------------------------------------------------------- */
2662
2663 static void
2664 performGC_(rtsBool force_major)
2665 {
2666 Task *task;
2667 Capability *cap = NULL;
2668
2669 // We must grab a new Task here, because the existing Task may be
2670 // associated with a particular Capability, and chained onto the
2671 // suspended_ccalls queue.
2672 task = newBoundTask();
2673
2674 // TODO: do we need to traceTask*() here?
2675
2676 waitForCapability(&cap,task);
2677 scheduleDoGC(&cap,task,force_major);
2678 releaseCapability(cap);
2679 boundTaskExiting(task);
2680 }
2681
2682 void
2683 performGC(void)
2684 {
2685 performGC_(rtsFalse);
2686 }
2687
2688 void
2689 performMajorGC(void)
2690 {
2691 performGC_(rtsTrue);
2692 }
2693
2694 /* ---------------------------------------------------------------------------
2695 Interrupt execution
2696 - usually called inside a signal handler so it mustn't do anything fancy.
2697 ------------------------------------------------------------------------ */
2698
2699 void
2700 interruptStgRts(void)
2701 {
2702 sched_state = SCHED_INTERRUPTING;
2703 interruptAllCapabilities();
2704 #if defined(THREADED_RTS)
2705 wakeUpRts();
2706 #endif
2707 }
2708
2709 /* -----------------------------------------------------------------------------
2710 Wake up the RTS
2711
2712 This function causes at least one OS thread to wake up and run the
2713 scheduler loop. It is invoked when the RTS might be deadlocked, or
2714 an external event has arrived that may need servicing (eg. a
2715 keyboard interrupt).
2716
2717 In the single-threaded RTS we don't do anything here; we only have
2718 one thread anyway, and the event that caused us to want to wake up
2719 will have interrupted any blocking system call in progress anyway.
2720 -------------------------------------------------------------------------- */
2721
2722 #if defined(THREADED_RTS)
2723 void wakeUpRts(void)
2724 {
2725 // This forces the IO Manager thread to wakeup, which will
2726 // in turn ensure that some OS thread wakes up and runs the
2727 // scheduler loop, which will cause a GC and deadlock check.
2728 ioManagerWakeup();
2729 }
2730 #endif
2731
2732 /* -----------------------------------------------------------------------------
2733 Deleting threads
2734
2735 This is used for interruption (^C) and forking, and corresponds to
2736 raising an exception but without letting the thread catch the
2737 exception.
2738 -------------------------------------------------------------------------- */
2739
2740 static void
2741 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2742 {
2743 // NOTE: must only be called on a TSO that we have exclusive
2744 // access to, because we will call throwToSingleThreaded() below.
2745 // The TSO must be on the run queue of the Capability we own, or
2746 // we must own all Capabilities.
2747
2748 if (tso->why_blocked != BlockedOnCCall &&
2749 tso->why_blocked != BlockedOnCCall_Interruptible) {
2750 throwToSingleThreaded(tso->cap,tso,NULL);
2751 }
2752 }
2753
2754 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2755 static void
2756 deleteThread_(Capability *cap, StgTSO *tso)
2757 { // for forkProcess only:
2758 // like deleteThread(), but we delete threads in foreign calls, too.
2759
2760 if (tso->why_blocked == BlockedOnCCall ||
2761 tso->why_blocked == BlockedOnCCall_Interruptible) {
2762 tso->what_next = ThreadKilled;
2763 appendToRunQueue(tso->cap, tso);
2764 } else {
2765 deleteThread(cap,tso);
2766 }
2767 }
2768 #endif
2769
2770 /* -----------------------------------------------------------------------------
2771 raiseExceptionHelper
2772
2773 This function is called by the raise# primitve, just so that we can
2774 move some of the tricky bits of raising an exception from C-- into
2775 C. Who knows, it might be a useful re-useable thing here too.
2776 -------------------------------------------------------------------------- */
2777
2778 StgWord
2779 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2780 {
2781 Capability *cap = regTableToCapability(reg);
2782 StgThunk *raise_closure = NULL;
2783 StgPtr p, next;
2784 StgRetInfoTable *info;
2785 //
2786 // This closure represents the expression 'raise# E' where E
2787 // is the exception raise. It is used to overwrite all the
2788 // thunks which are currently under evaluataion.
2789 //
2790
2791 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2792 // LDV profiling: stg_raise_info has THUNK as its closure
2793 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2794 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2795 // 1 does not cause any problem unless profiling is performed.
2796 // However, when LDV profiling goes on, we need to linearly scan
2797 // small object pool, where raise_closure is stored, so we should
2798 // use MIN_UPD_SIZE.
2799 //
2800 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2801 // sizeofW(StgClosure)+1);
2802 //
2803
2804 //
2805 // Walk up the stack, looking for the catch frame. On the way,
2806 // we update any closures pointed to from update frames with the
2807 // raise closure that we just built.
2808 //
2809 p = tso->stackobj->sp;
2810 while(1) {
2811 info = get_ret_itbl((StgClosure *)p);
2812 next = p + stack_frame_sizeW((StgClosure *)p);
2813 switch (info->i.type) {
2814
2815 case UPDATE_FRAME:
2816 // Only create raise_closure if we need to.
2817 if (raise_closure == NULL) {
2818 raise_closure =
2819 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2820 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2821 raise_closure->payload[0] = exception;
2822 }
2823 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2824 (StgClosure *)raise_closure);
2825 p = next;
2826 continue;
2827
2828 case ATOMICALLY_FRAME:
2829 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2830 tso->stackobj->sp = p;
2831 return ATOMICALLY_FRAME;
2832
2833 case CATCH_FRAME:
2834 tso->stackobj->sp = p;
2835 return CATCH_FRAME;
2836
2837 case CATCH_STM_FRAME:
2838 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2839 tso->stackobj->sp = p;
2840 return CATCH_STM_FRAME;
2841
2842 case UNDERFLOW_FRAME:
2843 tso->stackobj->sp = p;
2844 threadStackUnderflow(cap,tso);
2845 p = tso->stackobj->sp;
2846 continue;
2847
2848 case STOP_FRAME:
2849 tso->stackobj->sp = p;
2850 return STOP_FRAME;
2851
2852 case CATCH_RETRY_FRAME: {
2853 StgTRecHeader *trec = tso -> trec;
2854 StgTRecHeader *outer = trec -> enclosing_trec;
2855 debugTrace(DEBUG_stm,
2856 "found CATCH_RETRY_FRAME at %p during raise", p);
2857 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2858 stmAbortTransaction(cap, trec);
2859 stmFreeAbortedTRec(cap, trec);
2860 tso -> trec = outer;
2861 p = next;
2862 continue;
2863 }
2864
2865 default:
2866 p = next;
2867 continue;
2868 }
2869 }
2870 }
2871
2872
2873 /* -----------------------------------------------------------------------------
2874 findRetryFrameHelper
2875
2876 This function is called by the retry# primitive. It traverses the stack
2877 leaving tso->sp referring to the frame which should handle the retry.
2878
2879 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2880 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2881
2882 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2883 create) because retries are not considered to be exceptions, despite the
2884 similar implementation.
2885
2886 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2887 not be created within memory transactions.
2888 -------------------------------------------------------------------------- */
2889
2890 StgWord
2891 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2892 {
2893 StgPtr p, next;
2894 StgRetInfoTable *info;
2895
2896 p = tso->stackobj->sp;
2897 while (1) {
2898 info = get_ret_itbl((StgClosure *)p);
2899 next = p + stack_frame_sizeW((StgClosure *)p);
2900 switch (info->i.type) {
2901
2902 case ATOMICALLY_FRAME:
2903 debugTrace(DEBUG_stm,
2904 "found ATOMICALLY_FRAME at %p during retry", p);
2905 tso->stackobj->sp = p;
2906 return ATOMICALLY_FRAME;
2907
2908 case CATCH_RETRY_FRAME:
2909 debugTrace(DEBUG_stm,
2910 "found CATCH_RETRY_FRAME at %p during retry", p);
2911 tso->stackobj->sp = p;
2912 return CATCH_RETRY_FRAME;
2913
2914 case CATCH_STM_FRAME: {
2915 StgTRecHeader *trec = tso -> trec;
2916 StgTRecHeader *outer = trec -> enclosing_trec;
2917 debugTrace(DEBUG_stm,
2918 "found CATCH_STM_FRAME at %p during retry", p);
2919 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2920 stmAbortTransaction(cap, trec);
2921 stmFreeAbortedTRec(cap, trec);
2922 tso -> trec = outer;
2923 p = next;
2924 continue;
2925 }
2926
2927 case UNDERFLOW_FRAME:
2928 tso->stackobj->sp = p;
2929 threadStackUnderflow(cap,tso);
2930 p = tso->stackobj->sp;
2931 continue;
2932
2933 default:
2934 ASSERT(info->i.type != CATCH_FRAME);
2935 ASSERT(info->i.type != STOP_FRAME);
2936 p = next;
2937 continue;
2938 }
2939 }
2940 }
2941
2942 /* -----------------------------------------------------------------------------
2943 resurrectThreads is called after garbage collection on the list of
2944 threads found to be garbage. Each of these threads will be woken
2945 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2946 on an MVar, or NonTermination if the thread was blocked on a Black
2947 Hole.
2948
2949 Locks: assumes we hold *all* the capabilities.
2950 -------------------------------------------------------------------------- */
2951
2952 void
2953 resurrectThreads (StgTSO *threads)
2954 {
2955 StgTSO *tso, *next;
2956 Capability *cap;
2957 generation *gen;
2958
2959 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2960 next = tso->global_link;
2961
2962 gen = Bdescr((P_)tso)->gen;
2963 tso->global_link = gen->threads;
2964 gen->threads = tso;
2965
2966 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2967
2968 // Wake up the thread on the Capability it was last on
2969 cap = tso->cap;
2970
2971 switch (tso->why_blocked) {
2972 case BlockedOnMVar:
2973 case BlockedOnMVarRead:
2974 /* Called by GC - sched_mutex lock is currently held. */
2975 throwToSingleThreaded(cap, tso,
2976 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2977 break;
2978 case BlockedOnBlackHole:
2979 throwToSingleThreaded(cap, tso,
2980 (StgClosure *)nonTermination_closure);
2981 break;
2982 case BlockedOnSTM:
2983 throwToSingleThreaded(cap, tso,
2984 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2985 break;
2986 case NotBlocked:
2987 /* This might happen if the thread was blocked on a black hole
2988 * belonging to a thread that we've just woken up (raiseAsync
2989 * can wake up threads, remember...).
2990 */
2991 continue;
2992 case BlockedOnMsgThrowTo:
2993 // This can happen if the target is masking, blocks on a
2994 // black hole, and then is found to be unreachable. In
2995 // this case, we want to let the target wake up and carry
2996 // on, and do nothing to this thread.
2997 continue;
2998 default:
2999 barf("resurrectThreads: thread blocked in a strange way: %d",
3000 tso->why_blocked);
3001 }
3002 }
3003 }