8a08e35cc30afcfaf9baf238d2bdc2d9e9d89e0e
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
578 IF_DEBUG(sanity, checkRunQueue(cap));
579 }
580
581 void
582 promoteInRunQueue (Capability *cap, StgTSO *tso)
583 {
584 removeFromRunQueue(cap, tso);
585 pushOnRunQueue(cap, tso);
586 }
587
588 /* ----------------------------------------------------------------------------
589 * Setting up the scheduler loop
590 * ------------------------------------------------------------------------- */
591
592 static void
593 schedulePreLoop(void)
594 {
595 // initialisation for scheduler - what cannot go into initScheduler()
596
597 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
598 win32AllocStack();
599 #endif
600 }
601
602 /* -----------------------------------------------------------------------------
603 * scheduleFindWork()
604 *
605 * Search for work to do, and handle messages from elsewhere.
606 * -------------------------------------------------------------------------- */
607
608 static void
609 scheduleFindWork (Capability **pcap)
610 {
611 scheduleStartSignalHandlers(*pcap);
612
613 scheduleProcessInbox(pcap);
614
615 scheduleCheckBlockedThreads(*pcap);
616
617 #if defined(THREADED_RTS)
618 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
619 #endif
620 }
621
622 #if defined(THREADED_RTS)
623 STATIC_INLINE rtsBool
624 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
625 {
626 // we need to yield this capability to someone else if..
627 // - another thread is initiating a GC, and we didn't just do a GC
628 // (see Note [GC livelock])
629 // - another Task is returning from a foreign call
630 // - the thread at the head of the run queue cannot be run
631 // by this Task (it is bound to another Task, or it is unbound
632 // and this task it bound).
633 //
634 // Note [GC livelock]
635 //
636 // If we are interrupted to do a GC, then we do not immediately do
637 // another one. This avoids a starvation situation where one
638 // Capability keeps forcing a GC and the other Capabilities make no
639 // progress at all.
640
641 return ((pending_sync && !didGcLast) ||
642 cap->returning_tasks_hd != NULL ||
643 (!emptyRunQueue(cap) && (task->incall->tso == NULL
644 ? peekRunQueue(cap)->bound != NULL
645 : peekRunQueue(cap)->bound != task->incall)));
646 }
647
648 // This is the single place where a Task goes to sleep. There are
649 // two reasons it might need to sleep:
650 // - there are no threads to run
651 // - we need to yield this Capability to someone else
652 // (see shouldYieldCapability())
653 //
654 // Careful: the scheduler loop is quite delicate. Make sure you run
655 // the tests in testsuite/concurrent (all ways) after modifying this,
656 // and also check the benchmarks in nofib/parallel for regressions.
657
658 static void
659 scheduleYield (Capability **pcap, Task *task)
660 {
661 Capability *cap = *pcap;
662 int didGcLast = rtsFalse;
663
664 // if we have work, and we don't need to give up the Capability, continue.
665 //
666 if (!shouldYieldCapability(cap,task,rtsFalse) &&
667 (!emptyRunQueue(cap) ||
668 !emptyInbox(cap) ||
669 sched_state >= SCHED_INTERRUPTING)) {
670 return;
671 }
672
673 // otherwise yield (sleep), and keep yielding if necessary.
674 do {
675 didGcLast = yieldCapability(&cap,task, !didGcLast);
676 }
677 while (shouldYieldCapability(cap,task,didGcLast));
678
679 // note there may still be no threads on the run queue at this
680 // point, the caller has to check.
681
682 *pcap = cap;
683 return;
684 }
685 #endif
686
687 /* -----------------------------------------------------------------------------
688 * schedulePushWork()
689 *
690 * Push work to other Capabilities if we have some.
691 * -------------------------------------------------------------------------- */
692
693 static void
694 schedulePushWork(Capability *cap USED_IF_THREADS,
695 Task *task USED_IF_THREADS)
696 {
697 /* following code not for PARALLEL_HASKELL. I kept the call general,
698 future GUM versions might use pushing in a distributed setup */
699 #if defined(THREADED_RTS)
700
701 Capability *free_caps[n_capabilities], *cap0;
702 uint32_t i, n_wanted_caps, n_free_caps;
703 StgTSO *t;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Check whether we have more threads on our run queue, or sparks
709 // in our pool, that we could hand to another Capability.
710 if (emptyRunQueue(cap)) {
711 if (sparkPoolSizeCap(cap) < 2) return;
712 } else {
713 if (singletonRunQueue(cap) &&
714 sparkPoolSizeCap(cap) < 1) return;
715 }
716
717 // Figure out how many capabilities we want to wake up. We need at least
718 // sparkPoolSize(cap) plus the number of spare threads we have.
719 t = cap->run_queue_hd;
720 n_wanted_caps = sparkPoolSizeCap(cap);
721 if (t != END_TSO_QUEUE) {
722 do {
723 t = t->_link;
724 if (t == END_TSO_QUEUE) break;
725 n_wanted_caps++;
726 } while (n_wanted_caps < n_capabilities-1);
727 }
728
729 // Grab free capabilities, starting from cap->no+1.
730 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
731 n_free_caps < n_wanted_caps && i != cap->no;
732 i = (i + 1) % n_capabilities) {
733 cap0 = capabilities[i];
734 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
735 if (!emptyRunQueue(cap0)
736 || cap0->returning_tasks_hd != NULL
737 || cap0->inbox != (Message*)END_TSO_QUEUE) {
738 // it already has some work, we just grabbed it at
739 // the wrong moment. Or maybe it's deadlocked!
740 releaseCapability(cap0);
741 } else {
742 free_caps[n_free_caps++] = cap0;
743 }
744 }
745 }
746
747 // we now have n_free_caps free capabilities stashed in
748 // free_caps[]. Share our run queue equally with them. This is
749 // probably the simplest thing we could do; improvements we might
750 // want to do include:
751 //
752 // - giving high priority to moving relatively new threads, on
753 // the gournds that they haven't had time to build up a
754 // working set in the cache on this CPU/Capability.
755 //
756 // - giving low priority to moving long-lived threads
757
758 if (n_free_caps > 0) {
759 StgTSO *prev, *t, *next;
760 #ifdef SPARK_PUSHING
761 rtsBool pushed_to_all;
762 #endif
763
764 debugTrace(DEBUG_sched,
765 "cap %d: %s and %d free capabilities, sharing...",
766 cap->no,
767 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
768 "excess threads on run queue":"sparks to share (>=2)",
769 n_free_caps);
770
771 i = 0;
772 #ifdef SPARK_PUSHING
773 pushed_to_all = rtsFalse;
774 #endif
775
776 if (cap->run_queue_hd != END_TSO_QUEUE) {
777 prev = cap->run_queue_hd;
778 t = prev->_link;
779 prev->_link = END_TSO_QUEUE;
780 for (; t != END_TSO_QUEUE; t = next) {
781 next = t->_link;
782 t->_link = END_TSO_QUEUE;
783 if (t->bound == task->incall // don't move my bound thread
784 || tsoLocked(t)) { // don't move a locked thread
785 setTSOLink(cap, prev, t);
786 setTSOPrev(cap, t, prev);
787 prev = t;
788 } else if (i == n_free_caps) {
789 #ifdef SPARK_PUSHING
790 pushed_to_all = rtsTrue;
791 #endif
792 i = 0;
793 // keep one for us
794 setTSOLink(cap, prev, t);
795 setTSOPrev(cap, t, prev);
796 prev = t;
797 } else {
798 appendToRunQueue(free_caps[i],t);
799
800 traceEventMigrateThread (cap, t, free_caps[i]->no);
801
802 if (t->bound) { t->bound->task->cap = free_caps[i]; }
803 t->cap = free_caps[i];
804 i++;
805 }
806 }
807 cap->run_queue_tl = prev;
808
809 IF_DEBUG(sanity, checkRunQueue(cap));
810 }
811
812 #ifdef SPARK_PUSHING
813 /* JB I left this code in place, it would work but is not necessary */
814
815 // If there are some free capabilities that we didn't push any
816 // threads to, then try to push a spark to each one.
817 if (!pushed_to_all) {
818 StgClosure *spark;
819 // i is the next free capability to push to
820 for (; i < n_free_caps; i++) {
821 if (emptySparkPoolCap(free_caps[i])) {
822 spark = tryStealSpark(cap->sparks);
823 if (spark != NULL) {
824 /* TODO: if anyone wants to re-enable this code then
825 * they must consider the fizzledSpark(spark) case
826 * and update the per-cap spark statistics.
827 */
828 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
829
830 traceEventStealSpark(free_caps[i], t, cap->no);
831
832 newSpark(&(free_caps[i]->r), spark);
833 }
834 }
835 }
836 }
837 #endif /* SPARK_PUSHING */
838
839 // release the capabilities
840 for (i = 0; i < n_free_caps; i++) {
841 task->cap = free_caps[i];
842 if (sparkPoolSizeCap(cap) > 0) {
843 // If we have sparks to steal, wake up a worker on the
844 // capability, even if it has no threads to run.
845 releaseAndWakeupCapability(free_caps[i]);
846 } else {
847 releaseCapability(free_caps[i]);
848 }
849 }
850 }
851 task->cap = cap; // reset to point to our Capability.
852
853 #endif /* THREADED_RTS */
854
855 }
856
857 /* ----------------------------------------------------------------------------
858 * Start any pending signal handlers
859 * ------------------------------------------------------------------------- */
860
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
862 static void
863 scheduleStartSignalHandlers(Capability *cap)
864 {
865 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
866 // safe outside the lock
867 startSignalHandlers(cap);
868 }
869 }
870 #else
871 static void
872 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
873 {
874 }
875 #endif
876
877 /* ----------------------------------------------------------------------------
878 * Check for blocked threads that can be woken up.
879 * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
883 {
884 #if !defined(THREADED_RTS)
885 //
886 // Check whether any waiting threads need to be woken up. If the
887 // run queue is empty, and there are no other tasks running, we
888 // can wait indefinitely for something to happen.
889 //
890 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
891 {
892 awaitEvent (emptyRunQueue(cap));
893 }
894 #endif
895 }
896
897 /* ----------------------------------------------------------------------------
898 * Detect deadlock conditions and attempt to resolve them.
899 * ------------------------------------------------------------------------- */
900
901 static void
902 scheduleDetectDeadlock (Capability **pcap, Task *task)
903 {
904 Capability *cap = *pcap;
905 /*
906 * Detect deadlock: when we have no threads to run, there are no
907 * threads blocked, waiting for I/O, or sleeping, and all the
908 * other tasks are waiting for work, we must have a deadlock of
909 * some description.
910 */
911 if ( emptyThreadQueues(cap) )
912 {
913 #if defined(THREADED_RTS)
914 /*
915 * In the threaded RTS, we only check for deadlock if there
916 * has been no activity in a complete timeslice. This means
917 * we won't eagerly start a full GC just because we don't have
918 * any threads to run currently.
919 */
920 if (recent_activity != ACTIVITY_INACTIVE) return;
921 #endif
922
923 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
924
925 // Garbage collection can release some new threads due to
926 // either (a) finalizers or (b) threads resurrected because
927 // they are unreachable and will therefore be sent an
928 // exception. Any threads thus released will be immediately
929 // runnable.
930 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
931 cap = *pcap;
932 // when force_major == rtsTrue. scheduleDoGC sets
933 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
934 // signal.
935
936 if ( !emptyRunQueue(cap) ) return;
937
938 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
939 /* If we have user-installed signal handlers, then wait
940 * for signals to arrive rather then bombing out with a
941 * deadlock.
942 */
943 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
944 debugTrace(DEBUG_sched,
945 "still deadlocked, waiting for signals...");
946
947 awaitUserSignals();
948
949 if (signals_pending()) {
950 startSignalHandlers(cap);
951 }
952
953 // either we have threads to run, or we were interrupted:
954 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
955
956 return;
957 }
958 #endif
959
960 #if !defined(THREADED_RTS)
961 /* Probably a real deadlock. Send the current main thread the
962 * Deadlock exception.
963 */
964 if (task->incall->tso) {
965 switch (task->incall->tso->why_blocked) {
966 case BlockedOnSTM:
967 case BlockedOnBlackHole:
968 case BlockedOnMsgThrowTo:
969 case BlockedOnMVar:
970 case BlockedOnMVarRead:
971 throwToSingleThreaded(cap, task->incall->tso,
972 (StgClosure *)nonTermination_closure);
973 return;
974 default:
975 barf("deadlock: main thread blocked in a strange way");
976 }
977 }
978 return;
979 #endif
980 }
981 }
982
983
984 /* ----------------------------------------------------------------------------
985 * Process message in the current Capability's inbox
986 * ------------------------------------------------------------------------- */
987
988 static void
989 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
990 {
991 #if defined(THREADED_RTS)
992 Message *m, *next;
993 int r;
994 Capability *cap = *pcap;
995
996 while (!emptyInbox(cap)) {
997 if (cap->r.rCurrentNursery->link == NULL ||
998 g0->n_new_large_words >= large_alloc_lim) {
999 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1000 cap = *pcap;
1001 }
1002
1003 // don't use a blocking acquire; if the lock is held by
1004 // another thread then just carry on. This seems to avoid
1005 // getting stuck in a message ping-pong situation with other
1006 // processors. We'll check the inbox again later anyway.
1007 //
1008 // We should really use a more efficient queue data structure
1009 // here. The trickiness is that we must ensure a Capability
1010 // never goes idle if the inbox is non-empty, which is why we
1011 // use cap->lock (cap->lock is released as the last thing
1012 // before going idle; see Capability.c:releaseCapability()).
1013 r = TRY_ACQUIRE_LOCK(&cap->lock);
1014 if (r != 0) return;
1015
1016 m = cap->inbox;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018
1019 RELEASE_LOCK(&cap->lock);
1020
1021 while (m != (Message*)END_TSO_QUEUE) {
1022 next = m->link;
1023 executeMessage(cap, m);
1024 m = next;
1025 }
1026 }
1027 #endif
1028 }
1029
1030 /* ----------------------------------------------------------------------------
1031 * Activate spark threads (THREADED_RTS)
1032 * ------------------------------------------------------------------------- */
1033
1034 #if defined(THREADED_RTS)
1035 static void
1036 scheduleActivateSpark(Capability *cap)
1037 {
1038 if (anySparks() && !cap->disabled)
1039 {
1040 createSparkThread(cap);
1041 debugTrace(DEBUG_sched, "creating a spark thread");
1042 }
1043 }
1044 #endif // THREADED_RTS
1045
1046 /* ----------------------------------------------------------------------------
1047 * After running a thread...
1048 * ------------------------------------------------------------------------- */
1049
1050 static void
1051 schedulePostRunThread (Capability *cap, StgTSO *t)
1052 {
1053 // We have to be able to catch transactions that are in an
1054 // infinite loop as a result of seeing an inconsistent view of
1055 // memory, e.g.
1056 //
1057 // atomically $ do
1058 // [a,b] <- mapM readTVar [ta,tb]
1059 // when (a == b) loop
1060 //
1061 // and a is never equal to b given a consistent view of memory.
1062 //
1063 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1064 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1065 debugTrace(DEBUG_sched | DEBUG_stm,
1066 "trec %p found wasting its time", t);
1067
1068 // strip the stack back to the
1069 // ATOMICALLY_FRAME, aborting the (nested)
1070 // transaction, and saving the stack of any
1071 // partially-evaluated thunks on the heap.
1072 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1073
1074 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1075 }
1076 }
1077
1078 //
1079 // If the current thread's allocation limit has run out, send it
1080 // the AllocationLimitExceeded exception.
1081
1082 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1083 // Use a throwToSelf rather than a throwToSingleThreaded, because
1084 // it correctly handles the case where the thread is currently
1085 // inside mask. Also the thread might be blocked (e.g. on an
1086 // MVar), and throwToSingleThreaded doesn't unblock it
1087 // correctly in that case.
1088 throwToSelf(cap, t, allocationLimitExceeded_closure);
1089 ASSIGN_Int64((W_*)&(t->alloc_limit),
1090 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1091 }
1092
1093 /* some statistics gathering in the parallel case */
1094 }
1095
1096 /* -----------------------------------------------------------------------------
1097 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1098 * -------------------------------------------------------------------------- */
1099
1100 static rtsBool
1101 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1102 {
1103 if (cap->r.rHpLim == NULL || cap->context_switch) {
1104 // Sometimes we miss a context switch, e.g. when calling
1105 // primitives in a tight loop, MAYBE_GC() doesn't check the
1106 // context switch flag, and we end up waiting for a GC.
1107 // See #1984, and concurrent/should_run/1984
1108 cap->context_switch = 0;
1109 appendToRunQueue(cap,t);
1110 } else {
1111 pushOnRunQueue(cap,t);
1112 }
1113
1114 // did the task ask for a large block?
1115 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1116 // if so, get one and push it on the front of the nursery.
1117 bdescr *bd;
1118 W_ blocks;
1119
1120 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1121
1122 if (blocks > BLOCKS_PER_MBLOCK) {
1123 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1124 }
1125
1126 debugTrace(DEBUG_sched,
1127 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1128 (long)t->id, what_next_strs[t->what_next], blocks);
1129
1130 // don't do this if the nursery is (nearly) full, we'll GC first.
1131 if (cap->r.rCurrentNursery->link != NULL ||
1132 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1133 // infinite loop if the
1134 // nursery has only one
1135 // block.
1136
1137 bd = allocGroup_lock(blocks);
1138 cap->r.rNursery->n_blocks += blocks;
1139
1140 // link the new group after CurrentNursery
1141 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1142
1143 // initialise it as a nursery block. We initialise the
1144 // step, gen_no, and flags field of *every* sub-block in
1145 // this large block, because this is easier than making
1146 // sure that we always find the block head of a large
1147 // block whenever we call Bdescr() (eg. evacuate() and
1148 // isAlive() in the GC would both have to do this, at
1149 // least).
1150 {
1151 bdescr *x;
1152 for (x = bd; x < bd + blocks; x++) {
1153 initBdescr(x,g0,g0);
1154 x->free = x->start;
1155 x->flags = 0;
1156 }
1157 }
1158
1159 // This assert can be a killer if the app is doing lots
1160 // of large block allocations.
1161 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1162
1163 // now update the nursery to point to the new block
1164 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1165 cap->r.rCurrentNursery = bd;
1166
1167 // we might be unlucky and have another thread get on the
1168 // run queue before us and steal the large block, but in that
1169 // case the thread will just end up requesting another large
1170 // block.
1171 return rtsFalse; /* not actually GC'ing */
1172 }
1173 }
1174
1175 // if we got here because we exceeded large_alloc_lim, then
1176 // proceed straight to GC.
1177 if (g0->n_new_large_words >= large_alloc_lim) {
1178 return rtsTrue;
1179 }
1180
1181 // Otherwise, we just ran out of space in the current nursery.
1182 // Grab another nursery if we can.
1183 if (getNewNursery(cap)) {
1184 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1185 return rtsFalse;
1186 }
1187
1188 return rtsTrue;
1189 /* actual GC is done at the end of the while loop in schedule() */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1198 {
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1202 * GC is finished.
1203 */
1204
1205 ASSERT(t->_link == END_TSO_QUEUE);
1206
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1209 // thread.
1210 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1211 debugTrace(DEBUG_sched,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t->id, what_next_strs[t->what_next]);
1214 return rtsTrue;
1215 }
1216
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap->context_switch != 0) {
1224 cap->context_switch = 0;
1225 appendToRunQueue(cap,t);
1226 } else {
1227 pushOnRunQueue(cap,t);
1228 }
1229
1230 IF_DEBUG(sanity,
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232 checkTSO(t));
1233
1234 return rtsFalse;
1235 }
1236
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1240
1241 static void
1242 scheduleHandleThreadBlocked( StgTSO *t
1243 #if !defined(DEBUG)
1244 STG_UNUSED
1245 #endif
1246 )
1247 {
1248
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1252
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1258
1259 #ifdef DEBUG
1260 traceThreadStatus(DEBUG_sched, t);
1261 #endif
1262 }
1263
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1267
1268 static rtsBool
1269 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1270 {
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1273 *
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1276 */
1277
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap, t);
1281
1282 //
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1285 //
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1290 //
1291
1292 if (t->bound) {
1293
1294 if (t->bound != task->incall) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap,t);
1303 return rtsFalse;
1304 #else
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1308 #endif
1309 }
1310
1311 ASSERT(task->incall->tso == t);
1312
1313 if (t->what_next == ThreadComplete) {
1314 if (task->incall->ret) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1317 }
1318 task->incall->rstat = Success;
1319 } else {
1320 if (task->incall->ret) {
1321 *(task->incall->ret) = NULL;
1322 }
1323 if (sched_state >= SCHED_INTERRUPTING) {
1324 if (heap_overflow) {
1325 task->incall->rstat = HeapExhausted;
1326 } else {
1327 task->incall->rstat = Interrupted;
1328 }
1329 } else {
1330 task->incall->rstat = Killed;
1331 }
1332 }
1333 #ifdef DEBUG
1334 removeThreadLabel((StgWord)task->incall->tso->id);
1335 #endif
1336
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1344 t->bound = NULL;
1345 task->incall->tso = NULL;
1346
1347 return rtsTrue; // tells schedule() to return
1348 }
1349
1350 return rtsFalse;
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1356
1357 static rtsBool
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1359 {
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile ||
1363 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1364 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1365 return rtsTrue;
1366 } else {
1367 return rtsFalse;
1368 }
1369 }
1370
1371 /* -----------------------------------------------------------------------------
1372 * stopAllCapabilities()
1373 *
1374 * Stop all Haskell execution. This is used when we need to make some global
1375 * change to the system, such as altering the number of capabilities, or
1376 * forking.
1377 *
1378 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1379 * -------------------------------------------------------------------------- */
1380
1381 #if defined(THREADED_RTS)
1382 static void stopAllCapabilities (Capability **pCap, Task *task)
1383 {
1384 rtsBool was_syncing;
1385 SyncType prev_sync_type;
1386
1387 PendingSync sync = {
1388 .type = SYNC_OTHER,
1389 .idle = NULL,
1390 .task = task
1391 };
1392
1393 do {
1394 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1395 } while (was_syncing);
1396
1397 acquireAllCapabilities(*pCap,task);
1398
1399 pending_sync = 0;
1400 }
1401 #endif
1402
1403 /* -----------------------------------------------------------------------------
1404 * requestSync()
1405 *
1406 * Commence a synchronisation between all capabilities. Normally not called
1407 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1408 * has some special synchronisation requirements.
1409 *
1410 * Returns:
1411 * rtsFalse if we successfully got a sync
1412 * rtsTrue if there was another sync request in progress,
1413 * and we yielded to it. The value returned is the
1414 * type of the other sync request.
1415 * -------------------------------------------------------------------------- */
1416
1417 #if defined(THREADED_RTS)
1418 static rtsBool requestSync (
1419 Capability **pcap, Task *task, PendingSync *new_sync,
1420 SyncType *prev_sync_type)
1421 {
1422 PendingSync *sync;
1423
1424 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1425 (StgWord)NULL,
1426 (StgWord)new_sync);
1427
1428 if (sync != NULL)
1429 {
1430 // sync is valid until we have called yieldCapability().
1431 // After the sync is completed, we cannot read that struct any
1432 // more because it has been freed.
1433 *prev_sync_type = sync->type;
1434 do {
1435 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1436 sync->type);
1437 ASSERT(*pcap);
1438 yieldCapability(pcap,task,rtsTrue);
1439 sync = pending_sync;
1440 } while (sync != NULL);
1441
1442 // NOTE: task->cap might have changed now
1443 return rtsTrue;
1444 }
1445 else
1446 {
1447 return rtsFalse;
1448 }
1449 }
1450 #endif
1451
1452 /* -----------------------------------------------------------------------------
1453 * acquireAllCapabilities()
1454 *
1455 * Grab all the capabilities except the one we already hold. Used
1456 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1457 * before a fork (SYNC_OTHER).
1458 *
1459 * Only call this after requestSync(), otherwise a deadlock might
1460 * ensue if another thread is trying to synchronise.
1461 * -------------------------------------------------------------------------- */
1462
1463 #ifdef THREADED_RTS
1464 static void acquireAllCapabilities(Capability *cap, Task *task)
1465 {
1466 Capability *tmpcap;
1467 uint32_t i;
1468
1469 ASSERT(pending_sync != NULL);
1470 for (i=0; i < n_capabilities; i++) {
1471 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1472 i, n_capabilities);
1473 tmpcap = capabilities[i];
1474 if (tmpcap != cap) {
1475 // we better hope this task doesn't get migrated to
1476 // another Capability while we're waiting for this one.
1477 // It won't, because load balancing happens while we have
1478 // all the Capabilities, but even so it's a slightly
1479 // unsavoury invariant.
1480 task->cap = tmpcap;
1481 waitForCapability(&tmpcap, task);
1482 if (tmpcap->no != i) {
1483 barf("acquireAllCapabilities: got the wrong capability");
1484 }
1485 }
1486 }
1487 task->cap = cap;
1488 }
1489 #endif
1490
1491 /* -----------------------------------------------------------------------------
1492 * releaseAllcapabilities()
1493 *
1494 * Assuming this thread holds all the capabilities, release them all except for
1495 * the one passed in as cap.
1496 * -------------------------------------------------------------------------- */
1497
1498 #ifdef THREADED_RTS
1499 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1500 {
1501 uint32_t i;
1502
1503 for (i = 0; i < n; i++) {
1504 if (cap->no != i) {
1505 task->cap = capabilities[i];
1506 releaseCapability(capabilities[i]);
1507 }
1508 }
1509 task->cap = cap;
1510 }
1511 #endif
1512
1513 /* -----------------------------------------------------------------------------
1514 * Perform a garbage collection if necessary
1515 * -------------------------------------------------------------------------- */
1516
1517 static void
1518 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1519 rtsBool force_major)
1520 {
1521 Capability *cap = *pcap;
1522 rtsBool heap_census;
1523 uint32_t collect_gen;
1524 rtsBool major_gc;
1525 #ifdef THREADED_RTS
1526 uint32_t gc_type;
1527 uint32_t i;
1528 uint32_t need_idle;
1529 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1530 StgTSO *tso;
1531 rtsBool *idle_cap;
1532 #endif
1533
1534 if (sched_state == SCHED_SHUTTING_DOWN) {
1535 // The final GC has already been done, and the system is
1536 // shutting down. We'll probably deadlock if we try to GC
1537 // now.
1538 return;
1539 }
1540
1541 heap_census = scheduleNeedHeapProfile(rtsTrue);
1542
1543 // Figure out which generation we are collecting, so that we can
1544 // decide whether this is a parallel GC or not.
1545 collect_gen = calcNeeded(force_major || heap_census, NULL);
1546 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1547
1548 #ifdef THREADED_RTS
1549 if (sched_state < SCHED_INTERRUPTING
1550 && RtsFlags.ParFlags.parGcEnabled
1551 && collect_gen >= RtsFlags.ParFlags.parGcGen
1552 && ! oldest_gen->mark)
1553 {
1554 gc_type = SYNC_GC_PAR;
1555 } else {
1556 gc_type = SYNC_GC_SEQ;
1557 }
1558
1559 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1560 need_idle = stg_max(0, enabled_capabilities -
1561 RtsFlags.ParFlags.parGcThreads);
1562 } else {
1563 need_idle = 0;
1564 }
1565
1566 // In order to GC, there must be no threads running Haskell code.
1567 // Therefore, the GC thread needs to hold *all* the capabilities,
1568 // and release them after the GC has completed.
1569 //
1570 // This seems to be the simplest way: previous attempts involved
1571 // making all the threads with capabilities give up their
1572 // capabilities and sleep except for the *last* one, which
1573 // actually did the GC. But it's quite hard to arrange for all
1574 // the other tasks to sleep and stay asleep.
1575 //
1576
1577 /* Other capabilities are prevented from running yet more Haskell
1578 threads if pending_sync is set. Tested inside
1579 yieldCapability() and releaseCapability() in Capability.c */
1580
1581 PendingSync sync = {
1582 .type = gc_type,
1583 .idle = NULL,
1584 .task = task
1585 };
1586
1587 {
1588 SyncType prev_sync = 0;
1589 rtsBool was_syncing;
1590 do {
1591 // We need an array of size n_capabilities, but since this may
1592 // change each time around the loop we must allocate it afresh.
1593 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1594 sizeof(rtsBool),
1595 "scheduleDoGC");
1596 sync.idle = idle_cap;
1597
1598 // When using +RTS -qn, we need some capabilities to be idle during
1599 // GC. The best bet is to choose some inactive ones, so we look for
1600 // those first:
1601 uint32_t n_idle = need_idle;
1602 for (i=0; i < n_capabilities; i++) {
1603 if (capabilities[i]->disabled) {
1604 idle_cap[i] = rtsTrue;
1605 } else if (n_idle > 0 &&
1606 capabilities[i]->running_task == NULL) {
1607 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1608 n_idle--;
1609 idle_cap[i] = rtsTrue;
1610 } else {
1611 idle_cap[i] = rtsFalse;
1612 }
1613 }
1614 // If we didn't find enough inactive capabilities, just pick some
1615 // more to be idle.
1616 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1617 if (!idle_cap[i] && i != cap->no) {
1618 idle_cap[i] = rtsTrue;
1619 n_idle--;
1620 }
1621 }
1622 ASSERT(n_idle == 0);
1623
1624 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1625 cap = *pcap;
1626 if (was_syncing) {
1627 stgFree(idle_cap);
1628 }
1629 if (was_syncing &&
1630 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1631 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1632 // someone else had a pending sync request for a GC, so
1633 // let's assume GC has been done and we don't need to GC
1634 // again.
1635 // Exception to this: if SCHED_INTERRUPTING, then we still
1636 // need to do the final GC.
1637 return;
1638 }
1639 if (sched_state == SCHED_SHUTTING_DOWN) {
1640 // The scheduler might now be shutting down. We tested
1641 // this above, but it might have become true since then as
1642 // we yielded the capability in requestSync().
1643 return;
1644 }
1645 } while (was_syncing);
1646 }
1647
1648 #ifdef DEBUG
1649 unsigned int old_n_capabilities = n_capabilities;
1650 #endif
1651
1652 interruptAllCapabilities();
1653
1654 // The final shutdown GC is always single-threaded, because it's
1655 // possible that some of the Capabilities have no worker threads.
1656
1657 if (gc_type == SYNC_GC_SEQ) {
1658 traceEventRequestSeqGc(cap);
1659 } else {
1660 traceEventRequestParGc(cap);
1661 }
1662
1663 if (gc_type == SYNC_GC_SEQ) {
1664 // single-threaded GC: grab all the capabilities
1665 acquireAllCapabilities(cap,task);
1666 }
1667 else
1668 {
1669 // If we are load-balancing collections in this
1670 // generation, then we require all GC threads to participate
1671 // in the collection. Otherwise, we only require active
1672 // threads to participate, and we set gc_threads[i]->idle for
1673 // any idle capabilities. The rationale here is that waking
1674 // up an idle Capability takes much longer than just doing any
1675 // GC work on its behalf.
1676
1677 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1678 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1679 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1680 {
1681 for (i=0; i < n_capabilities; i++) {
1682 if (capabilities[i]->disabled) {
1683 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1684 if (idle_cap[i]) {
1685 n_idle_caps++;
1686 }
1687 } else {
1688 if (i != cap->no && idle_cap[i]) {
1689 Capability *tmpcap = capabilities[i];
1690 task->cap = tmpcap;
1691 waitForCapability(&tmpcap, task);
1692 n_idle_caps++;
1693 }
1694 }
1695 }
1696 }
1697 else
1698 {
1699 for (i=0; i < n_capabilities; i++) {
1700 if (capabilities[i]->disabled) {
1701 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1702 if (idle_cap[i]) {
1703 n_idle_caps++;
1704 }
1705 } else if (i != cap->no &&
1706 capabilities[i]->idle >=
1707 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1708 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1709 if (idle_cap[i]) {
1710 n_idle_caps++;
1711 } else {
1712 n_failed_trygrab_idles++;
1713 }
1714 }
1715 }
1716 }
1717 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1718
1719 // We set the gc_thread[i]->idle flag if that
1720 // capability/thread is not participating in this collection.
1721 // We also keep a local record of which capabilities are idle
1722 // in idle_cap[], because scheduleDoGC() is re-entrant:
1723 // another thread might start a GC as soon as we've finished
1724 // this one, and thus the gc_thread[]->idle flags are invalid
1725 // as soon as we release any threads after GC. Getting this
1726 // wrong leads to a rare and hard to debug deadlock!
1727
1728 for (i=0; i < n_capabilities; i++) {
1729 gc_threads[i]->idle = idle_cap[i];
1730 capabilities[i]->idle++;
1731 }
1732
1733 // For all capabilities participating in this GC, wait until
1734 // they have stopped mutating and are standing by for GC.
1735 waitForGcThreads(cap);
1736
1737 #if defined(THREADED_RTS)
1738 // Stable point where we can do a global check on our spark counters
1739 ASSERT(checkSparkCountInvariant());
1740 #endif
1741 }
1742
1743 #endif
1744
1745 IF_DEBUG(scheduler, printAllThreads());
1746
1747 delete_threads_and_gc:
1748 /*
1749 * We now have all the capabilities; if we're in an interrupting
1750 * state, then we should take the opportunity to delete all the
1751 * threads in the system.
1752 * Checking for major_gc ensures that the last GC is major.
1753 */
1754 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1755 deleteAllThreads(cap);
1756 #if defined(THREADED_RTS)
1757 // Discard all the sparks from every Capability. Why?
1758 // They'll probably be GC'd anyway since we've killed all the
1759 // threads. It just avoids the GC having to do any work to
1760 // figure out that any remaining sparks are garbage.
1761 for (i = 0; i < n_capabilities; i++) {
1762 capabilities[i]->spark_stats.gcd +=
1763 sparkPoolSize(capabilities[i]->sparks);
1764 // No race here since all Caps are stopped.
1765 discardSparksCap(capabilities[i]);
1766 }
1767 #endif
1768 sched_state = SCHED_SHUTTING_DOWN;
1769 }
1770
1771 /*
1772 * When there are disabled capabilities, we want to migrate any
1773 * threads away from them. Normally this happens in the
1774 * scheduler's loop, but only for unbound threads - it's really
1775 * hard for a bound thread to migrate itself. So we have another
1776 * go here.
1777 */
1778 #if defined(THREADED_RTS)
1779 for (i = enabled_capabilities; i < n_capabilities; i++) {
1780 Capability *tmp_cap, *dest_cap;
1781 tmp_cap = capabilities[i];
1782 ASSERT(tmp_cap->disabled);
1783 if (i != cap->no) {
1784 dest_cap = capabilities[i % enabled_capabilities];
1785 while (!emptyRunQueue(tmp_cap)) {
1786 tso = popRunQueue(tmp_cap);
1787 migrateThread(tmp_cap, tso, dest_cap);
1788 if (tso->bound) {
1789 traceTaskMigrate(tso->bound->task,
1790 tso->bound->task->cap,
1791 dest_cap);
1792 tso->bound->task->cap = dest_cap;
1793 }
1794 }
1795 }
1796 }
1797 #endif
1798
1799 #if defined(THREADED_RTS)
1800 // reset pending_sync *before* GC, so that when the GC threads
1801 // emerge they don't immediately re-enter the GC.
1802 pending_sync = 0;
1803 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1804 #else
1805 GarbageCollect(collect_gen, heap_census, 0, cap);
1806 #endif
1807
1808 traceSparkCounters(cap);
1809
1810 switch (recent_activity) {
1811 case ACTIVITY_INACTIVE:
1812 if (force_major) {
1813 // We are doing a GC because the system has been idle for a
1814 // timeslice and we need to check for deadlock. Record the
1815 // fact that we've done a GC and turn off the timer signal;
1816 // it will get re-enabled if we run any threads after the GC.
1817 recent_activity = ACTIVITY_DONE_GC;
1818 #ifndef PROFILING
1819 stopTimer();
1820 #endif
1821 break;
1822 }
1823 // fall through...
1824
1825 case ACTIVITY_MAYBE_NO:
1826 // the GC might have taken long enough for the timer to set
1827 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1828 // but we aren't necessarily deadlocked:
1829 recent_activity = ACTIVITY_YES;
1830 break;
1831
1832 case ACTIVITY_DONE_GC:
1833 // If we are actually active, the scheduler will reset the
1834 // recent_activity flag and re-enable the timer.
1835 break;
1836 }
1837
1838 #if defined(THREADED_RTS)
1839 // Stable point where we can do a global check on our spark counters
1840 ASSERT(checkSparkCountInvariant());
1841 #endif
1842
1843 // The heap census itself is done during GarbageCollect().
1844 if (heap_census) {
1845 performHeapProfile = rtsFalse;
1846 }
1847
1848 #if defined(THREADED_RTS)
1849
1850 // If n_capabilities has changed during GC, we're in trouble.
1851 ASSERT(n_capabilities == old_n_capabilities);
1852
1853 if (gc_type == SYNC_GC_PAR)
1854 {
1855 releaseGCThreads(cap);
1856 for (i = 0; i < n_capabilities; i++) {
1857 if (i != cap->no) {
1858 if (idle_cap[i]) {
1859 ASSERT(capabilities[i]->running_task == task);
1860 task->cap = capabilities[i];
1861 releaseCapability(capabilities[i]);
1862 } else {
1863 ASSERT(capabilities[i]->running_task != task);
1864 }
1865 }
1866 }
1867 task->cap = cap;
1868 }
1869
1870 stgFree(idle_cap);
1871 #endif
1872
1873 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1874 // GC set the heap_overflow flag, so we should proceed with
1875 // an orderly shutdown now. Ultimately we want the main
1876 // thread to return to its caller with HeapExhausted, at which
1877 // point the caller should call hs_exit(). The first step is
1878 // to delete all the threads.
1879 //
1880 // Another way to do this would be to raise an exception in
1881 // the main thread, which we really should do because it gives
1882 // the program a chance to clean up. But how do we find the
1883 // main thread? It should presumably be the same one that
1884 // gets ^C exceptions, but that's all done on the Haskell side
1885 // (GHC.TopHandler).
1886 sched_state = SCHED_INTERRUPTING;
1887 goto delete_threads_and_gc;
1888 }
1889
1890 #ifdef SPARKBALANCE
1891 /* JB
1892 Once we are all together... this would be the place to balance all
1893 spark pools. No concurrent stealing or adding of new sparks can
1894 occur. Should be defined in Sparks.c. */
1895 balanceSparkPoolsCaps(n_capabilities, capabilities);
1896 #endif
1897
1898 #if defined(THREADED_RTS)
1899 if (gc_type == SYNC_GC_SEQ) {
1900 // release our stash of capabilities.
1901 releaseAllCapabilities(n_capabilities, cap, task);
1902 }
1903 #endif
1904
1905 return;
1906 }
1907
1908 /* ---------------------------------------------------------------------------
1909 * Singleton fork(). Do not copy any running threads.
1910 * ------------------------------------------------------------------------- */
1911
1912 pid_t
1913 forkProcess(HsStablePtr *entry
1914 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1915 STG_UNUSED
1916 #endif
1917 )
1918 {
1919 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1920 pid_t pid;
1921 StgTSO* t,*next;
1922 Capability *cap;
1923 uint32_t g;
1924 Task *task = NULL;
1925 uint32_t i;
1926
1927 debugTrace(DEBUG_sched, "forking!");
1928
1929 task = newBoundTask();
1930
1931 cap = NULL;
1932 waitForCapability(&cap, task);
1933
1934 #ifdef THREADED_RTS
1935 stopAllCapabilities(&cap, task);
1936 #endif
1937
1938 // no funny business: hold locks while we fork, otherwise if some
1939 // other thread is holding a lock when the fork happens, the data
1940 // structure protected by the lock will forever be in an
1941 // inconsistent state in the child. See also #1391.
1942 ACQUIRE_LOCK(&sched_mutex);
1943 ACQUIRE_LOCK(&sm_mutex);
1944 ACQUIRE_LOCK(&stable_mutex);
1945 ACQUIRE_LOCK(&task->lock);
1946
1947 for (i=0; i < n_capabilities; i++) {
1948 ACQUIRE_LOCK(&capabilities[i]->lock);
1949 }
1950
1951 #ifdef THREADED_RTS
1952 ACQUIRE_LOCK(&all_tasks_mutex);
1953 #endif
1954
1955 stopTimer(); // See #4074
1956
1957 #if defined(TRACING)
1958 flushEventLog(); // so that child won't inherit dirty file buffers
1959 #endif
1960
1961 pid = fork();
1962
1963 if (pid) { // parent
1964
1965 startTimer(); // #4074
1966
1967 RELEASE_LOCK(&sched_mutex);
1968 RELEASE_LOCK(&sm_mutex);
1969 RELEASE_LOCK(&stable_mutex);
1970 RELEASE_LOCK(&task->lock);
1971
1972 for (i=0; i < n_capabilities; i++) {
1973 releaseCapability_(capabilities[i],rtsFalse);
1974 RELEASE_LOCK(&capabilities[i]->lock);
1975 }
1976
1977 #ifdef THREADED_RTS
1978 RELEASE_LOCK(&all_tasks_mutex);
1979 #endif
1980
1981 boundTaskExiting(task);
1982
1983 // just return the pid
1984 return pid;
1985
1986 } else { // child
1987
1988 #if defined(THREADED_RTS)
1989 initMutex(&sched_mutex);
1990 initMutex(&sm_mutex);
1991 initMutex(&stable_mutex);
1992 initMutex(&task->lock);
1993
1994 for (i=0; i < n_capabilities; i++) {
1995 initMutex(&capabilities[i]->lock);
1996 }
1997
1998 initMutex(&all_tasks_mutex);
1999 #endif
2000
2001 #ifdef TRACING
2002 resetTracing();
2003 #endif
2004
2005 // Now, all OS threads except the thread that forked are
2006 // stopped. We need to stop all Haskell threads, including
2007 // those involved in foreign calls. Also we need to delete
2008 // all Tasks, because they correspond to OS threads that are
2009 // now gone.
2010
2011 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2012 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2013 next = t->global_link;
2014 // don't allow threads to catch the ThreadKilled
2015 // exception, but we do want to raiseAsync() because these
2016 // threads may be evaluating thunks that we need later.
2017 deleteThread_(t->cap,t);
2018
2019 // stop the GC from updating the InCall to point to
2020 // the TSO. This is only necessary because the
2021 // OSThread bound to the TSO has been killed, and
2022 // won't get a chance to exit in the usual way (see
2023 // also scheduleHandleThreadFinished).
2024 t->bound = NULL;
2025 }
2026 }
2027
2028 discardTasksExcept(task);
2029
2030 for (i=0; i < n_capabilities; i++) {
2031 cap = capabilities[i];
2032
2033 // Empty the run queue. It seems tempting to let all the
2034 // killed threads stay on the run queue as zombies to be
2035 // cleaned up later, but some of them may correspond to
2036 // bound threads for which the corresponding Task does not
2037 // exist.
2038 truncateRunQueue(cap);
2039
2040 // Any suspended C-calling Tasks are no more, their OS threads
2041 // don't exist now:
2042 cap->suspended_ccalls = NULL;
2043
2044 #if defined(THREADED_RTS)
2045 // Wipe our spare workers list, they no longer exist. New
2046 // workers will be created if necessary.
2047 cap->spare_workers = NULL;
2048 cap->n_spare_workers = 0;
2049 cap->returning_tasks_hd = NULL;
2050 cap->returning_tasks_tl = NULL;
2051 #endif
2052
2053 // Release all caps except 0, we'll use that for starting
2054 // the IO manager and running the client action below.
2055 if (cap->no != 0) {
2056 task->cap = cap;
2057 releaseCapability(cap);
2058 }
2059 }
2060 cap = capabilities[0];
2061 task->cap = cap;
2062
2063 // Empty the threads lists. Otherwise, the garbage
2064 // collector may attempt to resurrect some of these threads.
2065 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2066 generations[g].threads = END_TSO_QUEUE;
2067 }
2068
2069 // On Unix, all timers are reset in the child, so we need to start
2070 // the timer again.
2071 initTimer();
2072 startTimer();
2073
2074 // TODO: need to trace various other things in the child
2075 // like startup event, capabilities, process info etc
2076 traceTaskCreate(task, cap);
2077
2078 #if defined(THREADED_RTS)
2079 ioManagerStartCap(&cap);
2080 #endif
2081
2082 rts_evalStableIO(&cap, entry, NULL); // run the action
2083 rts_checkSchedStatus("forkProcess",cap);
2084
2085 rts_unlock(cap);
2086 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2087 }
2088 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2089 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2090 #endif
2091 }
2092
2093 /* ---------------------------------------------------------------------------
2094 * Changing the number of Capabilities
2095 *
2096 * Changing the number of Capabilities is very tricky! We can only do
2097 * it with the system fully stopped, so we do a full sync with
2098 * requestSync(SYNC_OTHER) and grab all the capabilities.
2099 *
2100 * Then we resize the appropriate data structures, and update all
2101 * references to the old data structures which have now moved.
2102 * Finally we release the Capabilities we are holding, and start
2103 * worker Tasks on the new Capabilities we created.
2104 *
2105 * ------------------------------------------------------------------------- */
2106
2107 void
2108 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2109 {
2110 #if !defined(THREADED_RTS)
2111 if (new_n_capabilities != 1) {
2112 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2113 }
2114 return;
2115 #elif defined(NOSMP)
2116 if (new_n_capabilities != 1) {
2117 errorBelch("setNumCapabilities: not supported on this platform");
2118 }
2119 return;
2120 #else
2121 Task *task;
2122 Capability *cap;
2123 uint32_t n;
2124 Capability *old_capabilities = NULL;
2125 uint32_t old_n_capabilities = n_capabilities;
2126
2127 if (new_n_capabilities == enabled_capabilities) return;
2128
2129 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2130 enabled_capabilities, new_n_capabilities);
2131
2132 cap = rts_lock();
2133 task = cap->running_task;
2134
2135 stopAllCapabilities(&cap, task);
2136
2137 if (new_n_capabilities < enabled_capabilities)
2138 {
2139 // Reducing the number of capabilities: we do not actually
2140 // remove the extra capabilities, we just mark them as
2141 // "disabled". This has the following effects:
2142 //
2143 // - threads on a disabled capability are migrated away by the
2144 // scheduler loop
2145 //
2146 // - disabled capabilities do not participate in GC
2147 // (see scheduleDoGC())
2148 //
2149 // - No spark threads are created on this capability
2150 // (see scheduleActivateSpark())
2151 //
2152 // - We do not attempt to migrate threads *to* a disabled
2153 // capability (see schedulePushWork()).
2154 //
2155 // but in other respects, a disabled capability remains
2156 // alive. Threads may be woken up on a disabled capability,
2157 // but they will be immediately migrated away.
2158 //
2159 // This approach is much easier than trying to actually remove
2160 // the capability; we don't have to worry about GC data
2161 // structures, the nursery, etc.
2162 //
2163 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2164 capabilities[n]->disabled = rtsTrue;
2165 traceCapDisable(capabilities[n]);
2166 }
2167 enabled_capabilities = new_n_capabilities;
2168 }
2169 else
2170 {
2171 // Increasing the number of enabled capabilities.
2172 //
2173 // enable any disabled capabilities, up to the required number
2174 for (n = enabled_capabilities;
2175 n < new_n_capabilities && n < n_capabilities; n++) {
2176 capabilities[n]->disabled = rtsFalse;
2177 traceCapEnable(capabilities[n]);
2178 }
2179 enabled_capabilities = n;
2180
2181 if (new_n_capabilities > n_capabilities) {
2182 #if defined(TRACING)
2183 // Allocate eventlog buffers for the new capabilities. Note this
2184 // must be done before calling moreCapabilities(), because that
2185 // will emit events about creating the new capabilities and adding
2186 // them to existing capsets.
2187 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2188 #endif
2189
2190 // Resize the capabilities array
2191 // NB. after this, capabilities points somewhere new. Any pointers
2192 // of type (Capability *) are now invalid.
2193 moreCapabilities(n_capabilities, new_n_capabilities);
2194
2195 // Resize and update storage manager data structures
2196 storageAddCapabilities(n_capabilities, new_n_capabilities);
2197 }
2198 }
2199
2200 // update n_capabilities before things start running
2201 if (new_n_capabilities > n_capabilities) {
2202 n_capabilities = enabled_capabilities = new_n_capabilities;
2203 }
2204
2205 // Start worker tasks on the new Capabilities
2206 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2207
2208 // We're done: release the original Capabilities
2209 releaseAllCapabilities(old_n_capabilities, cap,task);
2210
2211 // We can't free the old array until now, because we access it
2212 // while updating pointers in updateCapabilityRefs().
2213 if (old_capabilities) {
2214 stgFree(old_capabilities);
2215 }
2216
2217 // Notify IO manager that the number of capabilities has changed.
2218 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2219
2220 rts_unlock(cap);
2221
2222 #endif // THREADED_RTS
2223 }
2224
2225
2226
2227 /* ---------------------------------------------------------------------------
2228 * Delete all the threads in the system
2229 * ------------------------------------------------------------------------- */
2230
2231 static void
2232 deleteAllThreads ( Capability *cap )
2233 {
2234 // NOTE: only safe to call if we own all capabilities.
2235
2236 StgTSO* t, *next;
2237 uint32_t g;
2238
2239 debugTrace(DEBUG_sched,"deleting all threads");
2240 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2241 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2242 next = t->global_link;
2243 deleteThread(cap,t);
2244 }
2245 }
2246
2247 // The run queue now contains a bunch of ThreadKilled threads. We
2248 // must not throw these away: the main thread(s) will be in there
2249 // somewhere, and the main scheduler loop has to deal with it.
2250 // Also, the run queue is the only thing keeping these threads from
2251 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2252
2253 #if !defined(THREADED_RTS)
2254 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2255 ASSERT(sleeping_queue == END_TSO_QUEUE);
2256 #endif
2257 }
2258
2259 /* -----------------------------------------------------------------------------
2260 Managing the suspended_ccalls list.
2261 Locks required: sched_mutex
2262 -------------------------------------------------------------------------- */
2263
2264 STATIC_INLINE void
2265 suspendTask (Capability *cap, Task *task)
2266 {
2267 InCall *incall;
2268
2269 incall = task->incall;
2270 ASSERT(incall->next == NULL && incall->prev == NULL);
2271 incall->next = cap->suspended_ccalls;
2272 incall->prev = NULL;
2273 if (cap->suspended_ccalls) {
2274 cap->suspended_ccalls->prev = incall;
2275 }
2276 cap->suspended_ccalls = incall;
2277 }
2278
2279 STATIC_INLINE void
2280 recoverSuspendedTask (Capability *cap, Task *task)
2281 {
2282 InCall *incall;
2283
2284 incall = task->incall;
2285 if (incall->prev) {
2286 incall->prev->next = incall->next;
2287 } else {
2288 ASSERT(cap->suspended_ccalls == incall);
2289 cap->suspended_ccalls = incall->next;
2290 }
2291 if (incall->next) {
2292 incall->next->prev = incall->prev;
2293 }
2294 incall->next = incall->prev = NULL;
2295 }
2296
2297 /* ---------------------------------------------------------------------------
2298 * Suspending & resuming Haskell threads.
2299 *
2300 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2301 * its capability before calling the C function. This allows another
2302 * task to pick up the capability and carry on running Haskell
2303 * threads. It also means that if the C call blocks, it won't lock
2304 * the whole system.
2305 *
2306 * The Haskell thread making the C call is put to sleep for the
2307 * duration of the call, on the suspended_ccalling_threads queue. We
2308 * give out a token to the task, which it can use to resume the thread
2309 * on return from the C function.
2310 *
2311 * If this is an interruptible C call, this means that the FFI call may be
2312 * unceremoniously terminated and should be scheduled on an
2313 * unbound worker thread.
2314 * ------------------------------------------------------------------------- */
2315
2316 void *
2317 suspendThread (StgRegTable *reg, rtsBool interruptible)
2318 {
2319 Capability *cap;
2320 int saved_errno;
2321 StgTSO *tso;
2322 Task *task;
2323 #if mingw32_HOST_OS
2324 StgWord32 saved_winerror;
2325 #endif
2326
2327 saved_errno = errno;
2328 #if mingw32_HOST_OS
2329 saved_winerror = GetLastError();
2330 #endif
2331
2332 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2333 */
2334 cap = regTableToCapability(reg);
2335
2336 task = cap->running_task;
2337 tso = cap->r.rCurrentTSO;
2338
2339 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2340
2341 // XXX this might not be necessary --SDM
2342 tso->what_next = ThreadRunGHC;
2343
2344 threadPaused(cap,tso);
2345
2346 if (interruptible) {
2347 tso->why_blocked = BlockedOnCCall_Interruptible;
2348 } else {
2349 tso->why_blocked = BlockedOnCCall;
2350 }
2351
2352 // Hand back capability
2353 task->incall->suspended_tso = tso;
2354 task->incall->suspended_cap = cap;
2355
2356 // Otherwise allocate() will write to invalid memory.
2357 cap->r.rCurrentTSO = NULL;
2358
2359 ACQUIRE_LOCK(&cap->lock);
2360
2361 suspendTask(cap,task);
2362 cap->in_haskell = rtsFalse;
2363 releaseCapability_(cap,rtsFalse);
2364
2365 RELEASE_LOCK(&cap->lock);
2366
2367 errno = saved_errno;
2368 #if mingw32_HOST_OS
2369 SetLastError(saved_winerror);
2370 #endif
2371 return task;
2372 }
2373
2374 StgRegTable *
2375 resumeThread (void *task_)
2376 {
2377 StgTSO *tso;
2378 InCall *incall;
2379 Capability *cap;
2380 Task *task = task_;
2381 int saved_errno;
2382 #if mingw32_HOST_OS
2383 StgWord32 saved_winerror;
2384 #endif
2385
2386 saved_errno = errno;
2387 #if mingw32_HOST_OS
2388 saved_winerror = GetLastError();
2389 #endif
2390
2391 incall = task->incall;
2392 cap = incall->suspended_cap;
2393 task->cap = cap;
2394
2395 // Wait for permission to re-enter the RTS with the result.
2396 waitForCapability(&cap,task);
2397 // we might be on a different capability now... but if so, our
2398 // entry on the suspended_ccalls list will also have been
2399 // migrated.
2400
2401 // Remove the thread from the suspended list
2402 recoverSuspendedTask(cap,task);
2403
2404 tso = incall->suspended_tso;
2405 incall->suspended_tso = NULL;
2406 incall->suspended_cap = NULL;
2407 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2408
2409 traceEventRunThread(cap, tso);
2410
2411 /* Reset blocking status */
2412 tso->why_blocked = NotBlocked;
2413
2414 if ((tso->flags & TSO_BLOCKEX) == 0) {
2415 // avoid locking the TSO if we don't have to
2416 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2417 maybePerformBlockedException(cap,tso);
2418 }
2419 }
2420
2421 cap->r.rCurrentTSO = tso;
2422 cap->in_haskell = rtsTrue;
2423 errno = saved_errno;
2424 #if mingw32_HOST_OS
2425 SetLastError(saved_winerror);
2426 #endif
2427
2428 /* We might have GC'd, mark the TSO dirty again */
2429 dirty_TSO(cap,tso);
2430 dirty_STACK(cap,tso->stackobj);
2431
2432 IF_DEBUG(sanity, checkTSO(tso));
2433
2434 return &cap->r;
2435 }
2436
2437 /* ---------------------------------------------------------------------------
2438 * scheduleThread()
2439 *
2440 * scheduleThread puts a thread on the end of the runnable queue.
2441 * This will usually be done immediately after a thread is created.
2442 * The caller of scheduleThread must create the thread using e.g.
2443 * createThread and push an appropriate closure
2444 * on this thread's stack before the scheduler is invoked.
2445 * ------------------------------------------------------------------------ */
2446
2447 void
2448 scheduleThread(Capability *cap, StgTSO *tso)
2449 {
2450 // The thread goes at the *end* of the run-queue, to avoid possible
2451 // starvation of any threads already on the queue.
2452 appendToRunQueue(cap,tso);
2453 }
2454
2455 void
2456 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2457 {
2458 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2459 // move this thread from now on.
2460 #if defined(THREADED_RTS)
2461 cpu %= enabled_capabilities;
2462 if (cpu == cap->no) {
2463 appendToRunQueue(cap,tso);
2464 } else {
2465 migrateThread(cap, tso, capabilities[cpu]);
2466 }
2467 #else
2468 appendToRunQueue(cap,tso);
2469 #endif
2470 }
2471
2472 void
2473 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2474 {
2475 Task *task;
2476 DEBUG_ONLY( StgThreadID id );
2477 Capability *cap;
2478
2479 cap = *pcap;
2480
2481 // We already created/initialised the Task
2482 task = cap->running_task;
2483
2484 // This TSO is now a bound thread; make the Task and TSO
2485 // point to each other.
2486 tso->bound = task->incall;
2487 tso->cap = cap;
2488
2489 task->incall->tso = tso;
2490 task->incall->ret = ret;
2491 task->incall->rstat = NoStatus;
2492
2493 appendToRunQueue(cap,tso);
2494
2495 DEBUG_ONLY( id = tso->id );
2496 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2497
2498 cap = schedule(cap,task);
2499
2500 ASSERT(task->incall->rstat != NoStatus);
2501 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2502
2503 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2504 *pcap = cap;
2505 }
2506
2507 /* ----------------------------------------------------------------------------
2508 * Starting Tasks
2509 * ------------------------------------------------------------------------- */
2510
2511 #if defined(THREADED_RTS)
2512 void scheduleWorker (Capability *cap, Task *task)
2513 {
2514 // schedule() runs without a lock.
2515 cap = schedule(cap,task);
2516
2517 // On exit from schedule(), we have a Capability, but possibly not
2518 // the same one we started with.
2519
2520 // During shutdown, the requirement is that after all the
2521 // Capabilities are shut down, all workers that are shutting down
2522 // have finished workerTaskStop(). This is why we hold on to
2523 // cap->lock until we've finished workerTaskStop() below.
2524 //
2525 // There may be workers still involved in foreign calls; those
2526 // will just block in waitForCapability() because the
2527 // Capability has been shut down.
2528 //
2529 ACQUIRE_LOCK(&cap->lock);
2530 releaseCapability_(cap,rtsFalse);
2531 workerTaskStop(task);
2532 RELEASE_LOCK(&cap->lock);
2533 }
2534 #endif
2535
2536 /* ---------------------------------------------------------------------------
2537 * Start new worker tasks on Capabilities from--to
2538 * -------------------------------------------------------------------------- */
2539
2540 static void
2541 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2542 {
2543 #if defined(THREADED_RTS)
2544 uint32_t i;
2545 Capability *cap;
2546
2547 for (i = from; i < to; i++) {
2548 cap = capabilities[i];
2549 ACQUIRE_LOCK(&cap->lock);
2550 startWorkerTask(cap);
2551 RELEASE_LOCK(&cap->lock);
2552 }
2553 #endif
2554 }
2555
2556 /* ---------------------------------------------------------------------------
2557 * initScheduler()
2558 *
2559 * Initialise the scheduler. This resets all the queues - if the
2560 * queues contained any threads, they'll be garbage collected at the
2561 * next pass.
2562 *
2563 * ------------------------------------------------------------------------ */
2564
2565 void
2566 initScheduler(void)
2567 {
2568 #if !defined(THREADED_RTS)
2569 blocked_queue_hd = END_TSO_QUEUE;
2570 blocked_queue_tl = END_TSO_QUEUE;
2571 sleeping_queue = END_TSO_QUEUE;
2572 #endif
2573
2574 sched_state = SCHED_RUNNING;
2575 recent_activity = ACTIVITY_YES;
2576
2577 #if defined(THREADED_RTS)
2578 /* Initialise the mutex and condition variables used by
2579 * the scheduler. */
2580 initMutex(&sched_mutex);
2581 #endif
2582
2583 ACQUIRE_LOCK(&sched_mutex);
2584
2585 /* A capability holds the state a native thread needs in
2586 * order to execute STG code. At least one capability is
2587 * floating around (only THREADED_RTS builds have more than one).
2588 */
2589 initCapabilities();
2590
2591 initTaskManager();
2592
2593 /*
2594 * Eagerly start one worker to run each Capability, except for
2595 * Capability 0. The idea is that we're probably going to start a
2596 * bound thread on Capability 0 pretty soon, so we don't want a
2597 * worker task hogging it.
2598 */
2599 startWorkerTasks(1, n_capabilities);
2600
2601 RELEASE_LOCK(&sched_mutex);
2602
2603 }
2604
2605 void
2606 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2607 /* see Capability.c, shutdownCapability() */
2608 {
2609 Task *task = NULL;
2610
2611 task = newBoundTask();
2612
2613 // If we haven't killed all the threads yet, do it now.
2614 if (sched_state < SCHED_SHUTTING_DOWN) {
2615 sched_state = SCHED_INTERRUPTING;
2616 Capability *cap = task->cap;
2617 waitForCapability(&cap,task);
2618 scheduleDoGC(&cap,task,rtsTrue);
2619 ASSERT(task->incall->tso == NULL);
2620 releaseCapability(cap);
2621 }
2622 sched_state = SCHED_SHUTTING_DOWN;
2623
2624 shutdownCapabilities(task, wait_foreign);
2625
2626 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2627 // n_failed_trygrab_idles, n_idle_caps);
2628
2629 boundTaskExiting(task);
2630 }
2631
2632 void
2633 freeScheduler( void )
2634 {
2635 uint32_t still_running;
2636
2637 ACQUIRE_LOCK(&sched_mutex);
2638 still_running = freeTaskManager();
2639 // We can only free the Capabilities if there are no Tasks still
2640 // running. We might have a Task about to return from a foreign
2641 // call into waitForCapability(), for example (actually,
2642 // this should be the *only* thing that a still-running Task can
2643 // do at this point, and it will block waiting for the
2644 // Capability).
2645 if (still_running == 0) {
2646 freeCapabilities();
2647 }
2648 RELEASE_LOCK(&sched_mutex);
2649 #if defined(THREADED_RTS)
2650 closeMutex(&sched_mutex);
2651 #endif
2652 }
2653
2654 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2655 void *user USED_IF_NOT_THREADS)
2656 {
2657 #if !defined(THREADED_RTS)
2658 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2659 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2660 evac(user, (StgClosure **)(void *)&sleeping_queue);
2661 #endif
2662 }
2663
2664 /* -----------------------------------------------------------------------------
2665 performGC
2666
2667 This is the interface to the garbage collector from Haskell land.
2668 We provide this so that external C code can allocate and garbage
2669 collect when called from Haskell via _ccall_GC.
2670 -------------------------------------------------------------------------- */
2671
2672 static void
2673 performGC_(rtsBool force_major)
2674 {
2675 Task *task;
2676 Capability *cap = NULL;
2677
2678 // We must grab a new Task here, because the existing Task may be
2679 // associated with a particular Capability, and chained onto the
2680 // suspended_ccalls queue.
2681 task = newBoundTask();
2682
2683 // TODO: do we need to traceTask*() here?
2684
2685 waitForCapability(&cap,task);
2686 scheduleDoGC(&cap,task,force_major);
2687 releaseCapability(cap);
2688 boundTaskExiting(task);
2689 }
2690
2691 void
2692 performGC(void)
2693 {
2694 performGC_(rtsFalse);
2695 }
2696
2697 void
2698 performMajorGC(void)
2699 {
2700 performGC_(rtsTrue);
2701 }
2702
2703 /* ---------------------------------------------------------------------------
2704 Interrupt execution.
2705 Might be called inside a signal handler so it mustn't do anything fancy.
2706 ------------------------------------------------------------------------ */
2707
2708 void
2709 interruptStgRts(void)
2710 {
2711 sched_state = SCHED_INTERRUPTING;
2712 interruptAllCapabilities();
2713 #if defined(THREADED_RTS)
2714 wakeUpRts();
2715 #endif
2716 }
2717
2718 /* -----------------------------------------------------------------------------
2719 Wake up the RTS
2720
2721 This function causes at least one OS thread to wake up and run the
2722 scheduler loop. It is invoked when the RTS might be deadlocked, or
2723 an external event has arrived that may need servicing (eg. a
2724 keyboard interrupt).
2725
2726 In the single-threaded RTS we don't do anything here; we only have
2727 one thread anyway, and the event that caused us to want to wake up
2728 will have interrupted any blocking system call in progress anyway.
2729 -------------------------------------------------------------------------- */
2730
2731 #if defined(THREADED_RTS)
2732 void wakeUpRts(void)
2733 {
2734 // This forces the IO Manager thread to wakeup, which will
2735 // in turn ensure that some OS thread wakes up and runs the
2736 // scheduler loop, which will cause a GC and deadlock check.
2737 ioManagerWakeup();
2738 }
2739 #endif
2740
2741 /* -----------------------------------------------------------------------------
2742 Deleting threads
2743
2744 This is used for interruption (^C) and forking, and corresponds to
2745 raising an exception but without letting the thread catch the
2746 exception.
2747 -------------------------------------------------------------------------- */
2748
2749 static void
2750 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2751 {
2752 // NOTE: must only be called on a TSO that we have exclusive
2753 // access to, because we will call throwToSingleThreaded() below.
2754 // The TSO must be on the run queue of the Capability we own, or
2755 // we must own all Capabilities.
2756
2757 if (tso->why_blocked != BlockedOnCCall &&
2758 tso->why_blocked != BlockedOnCCall_Interruptible) {
2759 throwToSingleThreaded(tso->cap,tso,NULL);
2760 }
2761 }
2762
2763 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2764 static void
2765 deleteThread_(Capability *cap, StgTSO *tso)
2766 { // for forkProcess only:
2767 // like deleteThread(), but we delete threads in foreign calls, too.
2768
2769 if (tso->why_blocked == BlockedOnCCall ||
2770 tso->why_blocked == BlockedOnCCall_Interruptible) {
2771 tso->what_next = ThreadKilled;
2772 appendToRunQueue(tso->cap, tso);
2773 } else {
2774 deleteThread(cap,tso);
2775 }
2776 }
2777 #endif
2778
2779 /* -----------------------------------------------------------------------------
2780 raiseExceptionHelper
2781
2782 This function is called by the raise# primitve, just so that we can
2783 move some of the tricky bits of raising an exception from C-- into
2784 C. Who knows, it might be a useful re-useable thing here too.
2785 -------------------------------------------------------------------------- */
2786
2787 StgWord
2788 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2789 {
2790 Capability *cap = regTableToCapability(reg);
2791 StgThunk *raise_closure = NULL;
2792 StgPtr p, next;
2793 const StgRetInfoTable *info;
2794 //
2795 // This closure represents the expression 'raise# E' where E
2796 // is the exception raise. It is used to overwrite all the
2797 // thunks which are currently under evaluataion.
2798 //
2799
2800 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2801 // LDV profiling: stg_raise_info has THUNK as its closure
2802 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2803 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2804 // 1 does not cause any problem unless profiling is performed.
2805 // However, when LDV profiling goes on, we need to linearly scan
2806 // small object pool, where raise_closure is stored, so we should
2807 // use MIN_UPD_SIZE.
2808 //
2809 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2810 // sizeofW(StgClosure)+1);
2811 //
2812
2813 //
2814 // Walk up the stack, looking for the catch frame. On the way,
2815 // we update any closures pointed to from update frames with the
2816 // raise closure that we just built.
2817 //
2818 p = tso->stackobj->sp;
2819 while(1) {
2820 info = get_ret_itbl((StgClosure *)p);
2821 next = p + stack_frame_sizeW((StgClosure *)p);
2822 switch (info->i.type) {
2823
2824 case UPDATE_FRAME:
2825 // Only create raise_closure if we need to.
2826 if (raise_closure == NULL) {
2827 raise_closure =
2828 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2829 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2830 raise_closure->payload[0] = exception;
2831 }
2832 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2833 (StgClosure *)raise_closure);
2834 p = next;
2835 continue;
2836
2837 case ATOMICALLY_FRAME:
2838 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2839 tso->stackobj->sp = p;
2840 return ATOMICALLY_FRAME;
2841
2842 case CATCH_FRAME:
2843 tso->stackobj->sp = p;
2844 return CATCH_FRAME;
2845
2846 case CATCH_STM_FRAME:
2847 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2848 tso->stackobj->sp = p;
2849 return CATCH_STM_FRAME;
2850
2851 case UNDERFLOW_FRAME:
2852 tso->stackobj->sp = p;
2853 threadStackUnderflow(cap,tso);
2854 p = tso->stackobj->sp;
2855 continue;
2856
2857 case STOP_FRAME:
2858 tso->stackobj->sp = p;
2859 return STOP_FRAME;
2860
2861 case CATCH_RETRY_FRAME: {
2862 StgTRecHeader *trec = tso -> trec;
2863 StgTRecHeader *outer = trec -> enclosing_trec;
2864 debugTrace(DEBUG_stm,
2865 "found CATCH_RETRY_FRAME at %p during raise", p);
2866 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2867 stmAbortTransaction(cap, trec);
2868 stmFreeAbortedTRec(cap, trec);
2869 tso -> trec = outer;
2870 p = next;
2871 continue;
2872 }
2873
2874 default:
2875 p = next;
2876 continue;
2877 }
2878 }
2879 }
2880
2881
2882 /* -----------------------------------------------------------------------------
2883 findRetryFrameHelper
2884
2885 This function is called by the retry# primitive. It traverses the stack
2886 leaving tso->sp referring to the frame which should handle the retry.
2887
2888 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2889 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2890
2891 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2892 create) because retries are not considered to be exceptions, despite the
2893 similar implementation.
2894
2895 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2896 not be created within memory transactions.
2897 -------------------------------------------------------------------------- */
2898
2899 StgWord
2900 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2901 {
2902 const StgRetInfoTable *info;
2903 StgPtr p, next;
2904
2905 p = tso->stackobj->sp;
2906 while (1) {
2907 info = get_ret_itbl((const StgClosure *)p);
2908 next = p + stack_frame_sizeW((StgClosure *)p);
2909 switch (info->i.type) {
2910
2911 case ATOMICALLY_FRAME:
2912 debugTrace(DEBUG_stm,
2913 "found ATOMICALLY_FRAME at %p during retry", p);
2914 tso->stackobj->sp = p;
2915 return ATOMICALLY_FRAME;
2916
2917 case CATCH_RETRY_FRAME:
2918 debugTrace(DEBUG_stm,
2919 "found CATCH_RETRY_FRAME at %p during retry", p);
2920 tso->stackobj->sp = p;
2921 return CATCH_RETRY_FRAME;
2922
2923 case CATCH_STM_FRAME: {
2924 StgTRecHeader *trec = tso -> trec;
2925 StgTRecHeader *outer = trec -> enclosing_trec;
2926 debugTrace(DEBUG_stm,
2927 "found CATCH_STM_FRAME at %p during retry", p);
2928 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2929 stmAbortTransaction(cap, trec);
2930 stmFreeAbortedTRec(cap, trec);
2931 tso -> trec = outer;
2932 p = next;
2933 continue;
2934 }
2935
2936 case UNDERFLOW_FRAME:
2937 tso->stackobj->sp = p;
2938 threadStackUnderflow(cap,tso);
2939 p = tso->stackobj->sp;
2940 continue;
2941
2942 default:
2943 ASSERT(info->i.type != CATCH_FRAME);
2944 ASSERT(info->i.type != STOP_FRAME);
2945 p = next;
2946 continue;
2947 }
2948 }
2949 }
2950
2951 /* -----------------------------------------------------------------------------
2952 resurrectThreads is called after garbage collection on the list of
2953 threads found to be garbage. Each of these threads will be woken
2954 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2955 on an MVar, or NonTermination if the thread was blocked on a Black
2956 Hole.
2957
2958 Locks: assumes we hold *all* the capabilities.
2959 -------------------------------------------------------------------------- */
2960
2961 void
2962 resurrectThreads (StgTSO *threads)
2963 {
2964 StgTSO *tso, *next;
2965 Capability *cap;
2966 generation *gen;
2967
2968 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2969 next = tso->global_link;
2970
2971 gen = Bdescr((P_)tso)->gen;
2972 tso->global_link = gen->threads;
2973 gen->threads = tso;
2974
2975 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2976
2977 // Wake up the thread on the Capability it was last on
2978 cap = tso->cap;
2979
2980 switch (tso->why_blocked) {
2981 case BlockedOnMVar:
2982 case BlockedOnMVarRead:
2983 /* Called by GC - sched_mutex lock is currently held. */
2984 throwToSingleThreaded(cap, tso,
2985 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2986 break;
2987 case BlockedOnBlackHole:
2988 throwToSingleThreaded(cap, tso,
2989 (StgClosure *)nonTermination_closure);
2990 break;
2991 case BlockedOnSTM:
2992 throwToSingleThreaded(cap, tso,
2993 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2994 break;
2995 case NotBlocked:
2996 /* This might happen if the thread was blocked on a black hole
2997 * belonging to a thread that we've just woken up (raiseAsync
2998 * can wake up threads, remember...).
2999 */
3000 continue;
3001 case BlockedOnMsgThrowTo:
3002 // This can happen if the target is masking, blocks on a
3003 // black hole, and then is found to be unreachable. In
3004 // this case, we want to let the target wake up and carry
3005 // on, and do nothing to this thread.
3006 continue;
3007 default:
3008 barf("resurrectThreads: thread blocked in a strange way: %d",
3009 tso->why_blocked);
3010 }
3011 }
3012 }