Fix a crash in requestSync()
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
578 IF_DEBUG(sanity, checkRunQueue(cap));
579 }
580
581 void
582 promoteInRunQueue (Capability *cap, StgTSO *tso)
583 {
584 removeFromRunQueue(cap, tso);
585 pushOnRunQueue(cap, tso);
586 }
587
588 /* ----------------------------------------------------------------------------
589 * Setting up the scheduler loop
590 * ------------------------------------------------------------------------- */
591
592 static void
593 schedulePreLoop(void)
594 {
595 // initialisation for scheduler - what cannot go into initScheduler()
596
597 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
598 win32AllocStack();
599 #endif
600 }
601
602 /* -----------------------------------------------------------------------------
603 * scheduleFindWork()
604 *
605 * Search for work to do, and handle messages from elsewhere.
606 * -------------------------------------------------------------------------- */
607
608 static void
609 scheduleFindWork (Capability **pcap)
610 {
611 scheduleStartSignalHandlers(*pcap);
612
613 scheduleProcessInbox(pcap);
614
615 scheduleCheckBlockedThreads(*pcap);
616
617 #if defined(THREADED_RTS)
618 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
619 #endif
620 }
621
622 #if defined(THREADED_RTS)
623 STATIC_INLINE rtsBool
624 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
625 {
626 // we need to yield this capability to someone else if..
627 // - another thread is initiating a GC, and we didn't just do a GC
628 // (see Note [GC livelock])
629 // - another Task is returning from a foreign call
630 // - the thread at the head of the run queue cannot be run
631 // by this Task (it is bound to another Task, or it is unbound
632 // and this task it bound).
633 //
634 // Note [GC livelock]
635 //
636 // If we are interrupted to do a GC, then we do not immediately do
637 // another one. This avoids a starvation situation where one
638 // Capability keeps forcing a GC and the other Capabilities make no
639 // progress at all.
640
641 return ((pending_sync && !didGcLast) ||
642 cap->returning_tasks_hd != NULL ||
643 (!emptyRunQueue(cap) && (task->incall->tso == NULL
644 ? peekRunQueue(cap)->bound != NULL
645 : peekRunQueue(cap)->bound != task->incall)));
646 }
647
648 // This is the single place where a Task goes to sleep. There are
649 // two reasons it might need to sleep:
650 // - there are no threads to run
651 // - we need to yield this Capability to someone else
652 // (see shouldYieldCapability())
653 //
654 // Careful: the scheduler loop is quite delicate. Make sure you run
655 // the tests in testsuite/concurrent (all ways) after modifying this,
656 // and also check the benchmarks in nofib/parallel for regressions.
657
658 static void
659 scheduleYield (Capability **pcap, Task *task)
660 {
661 Capability *cap = *pcap;
662 int didGcLast = rtsFalse;
663
664 // if we have work, and we don't need to give up the Capability, continue.
665 //
666 if (!shouldYieldCapability(cap,task,rtsFalse) &&
667 (!emptyRunQueue(cap) ||
668 !emptyInbox(cap) ||
669 sched_state >= SCHED_INTERRUPTING)) {
670 return;
671 }
672
673 // otherwise yield (sleep), and keep yielding if necessary.
674 do {
675 didGcLast = yieldCapability(&cap,task, !didGcLast);
676 }
677 while (shouldYieldCapability(cap,task,didGcLast));
678
679 // note there may still be no threads on the run queue at this
680 // point, the caller has to check.
681
682 *pcap = cap;
683 return;
684 }
685 #endif
686
687 /* -----------------------------------------------------------------------------
688 * schedulePushWork()
689 *
690 * Push work to other Capabilities if we have some.
691 * -------------------------------------------------------------------------- */
692
693 static void
694 schedulePushWork(Capability *cap USED_IF_THREADS,
695 Task *task USED_IF_THREADS)
696 {
697 /* following code not for PARALLEL_HASKELL. I kept the call general,
698 future GUM versions might use pushing in a distributed setup */
699 #if defined(THREADED_RTS)
700
701 Capability *free_caps[n_capabilities], *cap0;
702 uint32_t i, n_wanted_caps, n_free_caps;
703 StgTSO *t;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Check whether we have more threads on our run queue, or sparks
709 // in our pool, that we could hand to another Capability.
710 if (emptyRunQueue(cap)) {
711 if (sparkPoolSizeCap(cap) < 2) return;
712 } else {
713 if (singletonRunQueue(cap) &&
714 sparkPoolSizeCap(cap) < 1) return;
715 }
716
717 // Figure out how many capabilities we want to wake up. We need at least
718 // sparkPoolSize(cap) plus the number of spare threads we have.
719 t = cap->run_queue_hd;
720 n_wanted_caps = sparkPoolSizeCap(cap);
721 if (t != END_TSO_QUEUE) {
722 do {
723 t = t->_link;
724 if (t == END_TSO_QUEUE) break;
725 n_wanted_caps++;
726 } while (n_wanted_caps < n_capabilities-1);
727 }
728
729 // Grab free capabilities, starting from cap->no+1.
730 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
731 n_free_caps < n_wanted_caps && i != cap->no;
732 i = (i + 1) % n_capabilities) {
733 cap0 = capabilities[i];
734 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
735 if (!emptyRunQueue(cap0)
736 || cap0->returning_tasks_hd != NULL
737 || cap0->inbox != (Message*)END_TSO_QUEUE) {
738 // it already has some work, we just grabbed it at
739 // the wrong moment. Or maybe it's deadlocked!
740 releaseCapability(cap0);
741 } else {
742 free_caps[n_free_caps++] = cap0;
743 }
744 }
745 }
746
747 // we now have n_free_caps free capabilities stashed in
748 // free_caps[]. Share our run queue equally with them. This is
749 // probably the simplest thing we could do; improvements we might
750 // want to do include:
751 //
752 // - giving high priority to moving relatively new threads, on
753 // the gournds that they haven't had time to build up a
754 // working set in the cache on this CPU/Capability.
755 //
756 // - giving low priority to moving long-lived threads
757
758 if (n_free_caps > 0) {
759 StgTSO *prev, *t, *next;
760 #ifdef SPARK_PUSHING
761 rtsBool pushed_to_all;
762 #endif
763
764 debugTrace(DEBUG_sched,
765 "cap %d: %s and %d free capabilities, sharing...",
766 cap->no,
767 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
768 "excess threads on run queue":"sparks to share (>=2)",
769 n_free_caps);
770
771 i = 0;
772 #ifdef SPARK_PUSHING
773 pushed_to_all = rtsFalse;
774 #endif
775
776 if (cap->run_queue_hd != END_TSO_QUEUE) {
777 prev = cap->run_queue_hd;
778 t = prev->_link;
779 prev->_link = END_TSO_QUEUE;
780 for (; t != END_TSO_QUEUE; t = next) {
781 next = t->_link;
782 t->_link = END_TSO_QUEUE;
783 if (t->bound == task->incall // don't move my bound thread
784 || tsoLocked(t)) { // don't move a locked thread
785 setTSOLink(cap, prev, t);
786 setTSOPrev(cap, t, prev);
787 prev = t;
788 } else if (i == n_free_caps) {
789 #ifdef SPARK_PUSHING
790 pushed_to_all = rtsTrue;
791 #endif
792 i = 0;
793 // keep one for us
794 setTSOLink(cap, prev, t);
795 setTSOPrev(cap, t, prev);
796 prev = t;
797 } else {
798 appendToRunQueue(free_caps[i],t);
799
800 traceEventMigrateThread (cap, t, free_caps[i]->no);
801
802 if (t->bound) { t->bound->task->cap = free_caps[i]; }
803 t->cap = free_caps[i];
804 i++;
805 }
806 }
807 cap->run_queue_tl = prev;
808
809 IF_DEBUG(sanity, checkRunQueue(cap));
810 }
811
812 #ifdef SPARK_PUSHING
813 /* JB I left this code in place, it would work but is not necessary */
814
815 // If there are some free capabilities that we didn't push any
816 // threads to, then try to push a spark to each one.
817 if (!pushed_to_all) {
818 StgClosure *spark;
819 // i is the next free capability to push to
820 for (; i < n_free_caps; i++) {
821 if (emptySparkPoolCap(free_caps[i])) {
822 spark = tryStealSpark(cap->sparks);
823 if (spark != NULL) {
824 /* TODO: if anyone wants to re-enable this code then
825 * they must consider the fizzledSpark(spark) case
826 * and update the per-cap spark statistics.
827 */
828 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
829
830 traceEventStealSpark(free_caps[i], t, cap->no);
831
832 newSpark(&(free_caps[i]->r), spark);
833 }
834 }
835 }
836 }
837 #endif /* SPARK_PUSHING */
838
839 // release the capabilities
840 for (i = 0; i < n_free_caps; i++) {
841 task->cap = free_caps[i];
842 if (sparkPoolSizeCap(cap) > 0) {
843 // If we have sparks to steal, wake up a worker on the
844 // capability, even if it has no threads to run.
845 releaseAndWakeupCapability(free_caps[i]);
846 } else {
847 releaseCapability(free_caps[i]);
848 }
849 }
850 }
851 task->cap = cap; // reset to point to our Capability.
852
853 #endif /* THREADED_RTS */
854
855 }
856
857 /* ----------------------------------------------------------------------------
858 * Start any pending signal handlers
859 * ------------------------------------------------------------------------- */
860
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
862 static void
863 scheduleStartSignalHandlers(Capability *cap)
864 {
865 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
866 // safe outside the lock
867 startSignalHandlers(cap);
868 }
869 }
870 #else
871 static void
872 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
873 {
874 }
875 #endif
876
877 /* ----------------------------------------------------------------------------
878 * Check for blocked threads that can be woken up.
879 * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
883 {
884 #if !defined(THREADED_RTS)
885 //
886 // Check whether any waiting threads need to be woken up. If the
887 // run queue is empty, and there are no other tasks running, we
888 // can wait indefinitely for something to happen.
889 //
890 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
891 {
892 awaitEvent (emptyRunQueue(cap));
893 }
894 #endif
895 }
896
897 /* ----------------------------------------------------------------------------
898 * Detect deadlock conditions and attempt to resolve them.
899 * ------------------------------------------------------------------------- */
900
901 static void
902 scheduleDetectDeadlock (Capability **pcap, Task *task)
903 {
904 Capability *cap = *pcap;
905 /*
906 * Detect deadlock: when we have no threads to run, there are no
907 * threads blocked, waiting for I/O, or sleeping, and all the
908 * other tasks are waiting for work, we must have a deadlock of
909 * some description.
910 */
911 if ( emptyThreadQueues(cap) )
912 {
913 #if defined(THREADED_RTS)
914 /*
915 * In the threaded RTS, we only check for deadlock if there
916 * has been no activity in a complete timeslice. This means
917 * we won't eagerly start a full GC just because we don't have
918 * any threads to run currently.
919 */
920 if (recent_activity != ACTIVITY_INACTIVE) return;
921 #endif
922
923 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
924
925 // Garbage collection can release some new threads due to
926 // either (a) finalizers or (b) threads resurrected because
927 // they are unreachable and will therefore be sent an
928 // exception. Any threads thus released will be immediately
929 // runnable.
930 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
931 cap = *pcap;
932 // when force_major == rtsTrue. scheduleDoGC sets
933 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
934 // signal.
935
936 if ( !emptyRunQueue(cap) ) return;
937
938 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
939 /* If we have user-installed signal handlers, then wait
940 * for signals to arrive rather then bombing out with a
941 * deadlock.
942 */
943 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
944 debugTrace(DEBUG_sched,
945 "still deadlocked, waiting for signals...");
946
947 awaitUserSignals();
948
949 if (signals_pending()) {
950 startSignalHandlers(cap);
951 }
952
953 // either we have threads to run, or we were interrupted:
954 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
955
956 return;
957 }
958 #endif
959
960 #if !defined(THREADED_RTS)
961 /* Probably a real deadlock. Send the current main thread the
962 * Deadlock exception.
963 */
964 if (task->incall->tso) {
965 switch (task->incall->tso->why_blocked) {
966 case BlockedOnSTM:
967 case BlockedOnBlackHole:
968 case BlockedOnMsgThrowTo:
969 case BlockedOnMVar:
970 case BlockedOnMVarRead:
971 throwToSingleThreaded(cap, task->incall->tso,
972 (StgClosure *)nonTermination_closure);
973 return;
974 default:
975 barf("deadlock: main thread blocked in a strange way");
976 }
977 }
978 return;
979 #endif
980 }
981 }
982
983
984 /* ----------------------------------------------------------------------------
985 * Process message in the current Capability's inbox
986 * ------------------------------------------------------------------------- */
987
988 static void
989 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
990 {
991 #if defined(THREADED_RTS)
992 Message *m, *next;
993 int r;
994 Capability *cap = *pcap;
995
996 while (!emptyInbox(cap)) {
997 if (cap->r.rCurrentNursery->link == NULL ||
998 g0->n_new_large_words >= large_alloc_lim) {
999 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1000 cap = *pcap;
1001 }
1002
1003 // don't use a blocking acquire; if the lock is held by
1004 // another thread then just carry on. This seems to avoid
1005 // getting stuck in a message ping-pong situation with other
1006 // processors. We'll check the inbox again later anyway.
1007 //
1008 // We should really use a more efficient queue data structure
1009 // here. The trickiness is that we must ensure a Capability
1010 // never goes idle if the inbox is non-empty, which is why we
1011 // use cap->lock (cap->lock is released as the last thing
1012 // before going idle; see Capability.c:releaseCapability()).
1013 r = TRY_ACQUIRE_LOCK(&cap->lock);
1014 if (r != 0) return;
1015
1016 m = cap->inbox;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018
1019 RELEASE_LOCK(&cap->lock);
1020
1021 while (m != (Message*)END_TSO_QUEUE) {
1022 next = m->link;
1023 executeMessage(cap, m);
1024 m = next;
1025 }
1026 }
1027 #endif
1028 }
1029
1030 /* ----------------------------------------------------------------------------
1031 * Activate spark threads (THREADED_RTS)
1032 * ------------------------------------------------------------------------- */
1033
1034 #if defined(THREADED_RTS)
1035 static void
1036 scheduleActivateSpark(Capability *cap)
1037 {
1038 if (anySparks() && !cap->disabled)
1039 {
1040 createSparkThread(cap);
1041 debugTrace(DEBUG_sched, "creating a spark thread");
1042 }
1043 }
1044 #endif // THREADED_RTS
1045
1046 /* ----------------------------------------------------------------------------
1047 * After running a thread...
1048 * ------------------------------------------------------------------------- */
1049
1050 static void
1051 schedulePostRunThread (Capability *cap, StgTSO *t)
1052 {
1053 // We have to be able to catch transactions that are in an
1054 // infinite loop as a result of seeing an inconsistent view of
1055 // memory, e.g.
1056 //
1057 // atomically $ do
1058 // [a,b] <- mapM readTVar [ta,tb]
1059 // when (a == b) loop
1060 //
1061 // and a is never equal to b given a consistent view of memory.
1062 //
1063 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1064 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1065 debugTrace(DEBUG_sched | DEBUG_stm,
1066 "trec %p found wasting its time", t);
1067
1068 // strip the stack back to the
1069 // ATOMICALLY_FRAME, aborting the (nested)
1070 // transaction, and saving the stack of any
1071 // partially-evaluated thunks on the heap.
1072 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1073
1074 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1075 }
1076 }
1077
1078 //
1079 // If the current thread's allocation limit has run out, send it
1080 // the AllocationLimitExceeded exception.
1081
1082 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1083 // Use a throwToSelf rather than a throwToSingleThreaded, because
1084 // it correctly handles the case where the thread is currently
1085 // inside mask. Also the thread might be blocked (e.g. on an
1086 // MVar), and throwToSingleThreaded doesn't unblock it
1087 // correctly in that case.
1088 throwToSelf(cap, t, allocationLimitExceeded_closure);
1089 ASSIGN_Int64((W_*)&(t->alloc_limit),
1090 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1091 }
1092
1093 /* some statistics gathering in the parallel case */
1094 }
1095
1096 /* -----------------------------------------------------------------------------
1097 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1098 * -------------------------------------------------------------------------- */
1099
1100 static rtsBool
1101 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1102 {
1103 if (cap->r.rHpLim == NULL || cap->context_switch) {
1104 // Sometimes we miss a context switch, e.g. when calling
1105 // primitives in a tight loop, MAYBE_GC() doesn't check the
1106 // context switch flag, and we end up waiting for a GC.
1107 // See #1984, and concurrent/should_run/1984
1108 cap->context_switch = 0;
1109 appendToRunQueue(cap,t);
1110 } else {
1111 pushOnRunQueue(cap,t);
1112 }
1113
1114 // did the task ask for a large block?
1115 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1116 // if so, get one and push it on the front of the nursery.
1117 bdescr *bd;
1118 W_ blocks;
1119
1120 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1121
1122 if (blocks > BLOCKS_PER_MBLOCK) {
1123 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1124 }
1125
1126 debugTrace(DEBUG_sched,
1127 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1128 (long)t->id, what_next_strs[t->what_next], blocks);
1129
1130 // don't do this if the nursery is (nearly) full, we'll GC first.
1131 if (cap->r.rCurrentNursery->link != NULL ||
1132 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1133 // infinite loop if the
1134 // nursery has only one
1135 // block.
1136
1137 bd = allocGroup_lock(blocks);
1138 cap->r.rNursery->n_blocks += blocks;
1139
1140 // link the new group after CurrentNursery
1141 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1142
1143 // initialise it as a nursery block. We initialise the
1144 // step, gen_no, and flags field of *every* sub-block in
1145 // this large block, because this is easier than making
1146 // sure that we always find the block head of a large
1147 // block whenever we call Bdescr() (eg. evacuate() and
1148 // isAlive() in the GC would both have to do this, at
1149 // least).
1150 {
1151 bdescr *x;
1152 for (x = bd; x < bd + blocks; x++) {
1153 initBdescr(x,g0,g0);
1154 x->free = x->start;
1155 x->flags = 0;
1156 }
1157 }
1158
1159 // This assert can be a killer if the app is doing lots
1160 // of large block allocations.
1161 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1162
1163 // now update the nursery to point to the new block
1164 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1165 cap->r.rCurrentNursery = bd;
1166
1167 // we might be unlucky and have another thread get on the
1168 // run queue before us and steal the large block, but in that
1169 // case the thread will just end up requesting another large
1170 // block.
1171 return rtsFalse; /* not actually GC'ing */
1172 }
1173 }
1174
1175 // if we got here because we exceeded large_alloc_lim, then
1176 // proceed straight to GC.
1177 if (g0->n_new_large_words >= large_alloc_lim) {
1178 return rtsTrue;
1179 }
1180
1181 // Otherwise, we just ran out of space in the current nursery.
1182 // Grab another nursery if we can.
1183 if (getNewNursery(cap)) {
1184 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1185 return rtsFalse;
1186 }
1187
1188 return rtsTrue;
1189 /* actual GC is done at the end of the while loop in schedule() */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1198 {
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1202 * GC is finished.
1203 */
1204
1205 ASSERT(t->_link == END_TSO_QUEUE);
1206
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1209 // thread.
1210 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1211 debugTrace(DEBUG_sched,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t->id, what_next_strs[t->what_next]);
1214 return rtsTrue;
1215 }
1216
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap->context_switch != 0) {
1224 cap->context_switch = 0;
1225 appendToRunQueue(cap,t);
1226 } else {
1227 pushOnRunQueue(cap,t);
1228 }
1229
1230 IF_DEBUG(sanity,
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232 checkTSO(t));
1233
1234 return rtsFalse;
1235 }
1236
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1240
1241 static void
1242 scheduleHandleThreadBlocked( StgTSO *t
1243 #if !defined(DEBUG)
1244 STG_UNUSED
1245 #endif
1246 )
1247 {
1248
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1252
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1258
1259 #ifdef DEBUG
1260 traceThreadStatus(DEBUG_sched, t);
1261 #endif
1262 }
1263
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1267
1268 static rtsBool
1269 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1270 {
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1273 *
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1276 */
1277
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap, t);
1281
1282 //
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1285 //
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1290 //
1291
1292 if (t->bound) {
1293
1294 if (t->bound != task->incall) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap,t);
1303 return rtsFalse;
1304 #else
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1308 #endif
1309 }
1310
1311 ASSERT(task->incall->tso == t);
1312
1313 if (t->what_next == ThreadComplete) {
1314 if (task->incall->ret) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1317 }
1318 task->incall->rstat = Success;
1319 } else {
1320 if (task->incall->ret) {
1321 *(task->incall->ret) = NULL;
1322 }
1323 if (sched_state >= SCHED_INTERRUPTING) {
1324 if (heap_overflow) {
1325 task->incall->rstat = HeapExhausted;
1326 } else {
1327 task->incall->rstat = Interrupted;
1328 }
1329 } else {
1330 task->incall->rstat = Killed;
1331 }
1332 }
1333 #ifdef DEBUG
1334 removeThreadLabel((StgWord)task->incall->tso->id);
1335 #endif
1336
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1344 t->bound = NULL;
1345 task->incall->tso = NULL;
1346
1347 return rtsTrue; // tells schedule() to return
1348 }
1349
1350 return rtsFalse;
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1356
1357 static rtsBool
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1359 {
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile ||
1363 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1364 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1365 return rtsTrue;
1366 } else {
1367 return rtsFalse;
1368 }
1369 }
1370
1371 /* -----------------------------------------------------------------------------
1372 * stopAllCapabilities()
1373 *
1374 * Stop all Haskell execution. This is used when we need to make some global
1375 * change to the system, such as altering the number of capabilities, or
1376 * forking.
1377 *
1378 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1379 * -------------------------------------------------------------------------- */
1380
1381 #if defined(THREADED_RTS)
1382 static void stopAllCapabilities (Capability **pCap, Task *task)
1383 {
1384 rtsBool was_syncing;
1385 SyncType prev_sync_type;
1386
1387 PendingSync sync = {
1388 .type = SYNC_OTHER,
1389 .idle = NULL,
1390 .task = task
1391 };
1392
1393 do {
1394 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1395 } while (was_syncing);
1396
1397 acquireAllCapabilities(*pCap,task);
1398
1399 pending_sync = 0;
1400 }
1401 #endif
1402
1403 /* -----------------------------------------------------------------------------
1404 * requestSync()
1405 *
1406 * Commence a synchronisation between all capabilities. Normally not called
1407 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1408 * has some special synchronisation requirements.
1409 *
1410 * Returns:
1411 * rtsFalse if we successfully got a sync
1412 * rtsTrue if there was another sync request in progress,
1413 * and we yielded to it. The value returned is the
1414 * type of the other sync request.
1415 * -------------------------------------------------------------------------- */
1416
1417 #if defined(THREADED_RTS)
1418 static rtsBool requestSync (
1419 Capability **pcap, Task *task, PendingSync *new_sync,
1420 SyncType *prev_sync_type)
1421 {
1422 PendingSync *sync;
1423
1424 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1425 (StgWord)NULL,
1426 (StgWord)new_sync);
1427
1428 if (sync != NULL)
1429 {
1430 // sync is valid until we have called yieldCapability().
1431 // After the sync is completed, we cannot read that struct any
1432 // more because it has been freed.
1433 *prev_sync_type = sync->type;
1434 do {
1435 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1436 sync->type);
1437 ASSERT(*pcap);
1438 yieldCapability(pcap,task,rtsTrue);
1439 sync = pending_sync;
1440 } while (sync != NULL);
1441
1442 // NOTE: task->cap might have changed now
1443 return rtsTrue;
1444 }
1445 else
1446 {
1447 return rtsFalse;
1448 }
1449 }
1450 #endif
1451
1452 /* -----------------------------------------------------------------------------
1453 * acquireAllCapabilities()
1454 *
1455 * Grab all the capabilities except the one we already hold. Used
1456 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1457 * before a fork (SYNC_OTHER).
1458 *
1459 * Only call this after requestSync(), otherwise a deadlock might
1460 * ensue if another thread is trying to synchronise.
1461 * -------------------------------------------------------------------------- */
1462
1463 #ifdef THREADED_RTS
1464 static void acquireAllCapabilities(Capability *cap, Task *task)
1465 {
1466 Capability *tmpcap;
1467 uint32_t i;
1468
1469 ASSERT(pending_sync != NULL);
1470 for (i=0; i < n_capabilities; i++) {
1471 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1472 i, n_capabilities);
1473 tmpcap = capabilities[i];
1474 if (tmpcap != cap) {
1475 // we better hope this task doesn't get migrated to
1476 // another Capability while we're waiting for this one.
1477 // It won't, because load balancing happens while we have
1478 // all the Capabilities, but even so it's a slightly
1479 // unsavoury invariant.
1480 task->cap = tmpcap;
1481 waitForCapability(&tmpcap, task);
1482 if (tmpcap->no != i) {
1483 barf("acquireAllCapabilities: got the wrong capability");
1484 }
1485 }
1486 }
1487 task->cap = cap;
1488 }
1489 #endif
1490
1491 /* -----------------------------------------------------------------------------
1492 * releaseAllcapabilities()
1493 *
1494 * Assuming this thread holds all the capabilities, release them all except for
1495 * the one passed in as cap.
1496 * -------------------------------------------------------------------------- */
1497
1498 #ifdef THREADED_RTS
1499 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1500 {
1501 uint32_t i;
1502
1503 for (i = 0; i < n; i++) {
1504 if (cap->no != i) {
1505 task->cap = capabilities[i];
1506 releaseCapability(capabilities[i]);
1507 }
1508 }
1509 task->cap = cap;
1510 }
1511 #endif
1512
1513 /* -----------------------------------------------------------------------------
1514 * Perform a garbage collection if necessary
1515 * -------------------------------------------------------------------------- */
1516
1517 static void
1518 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1519 rtsBool force_major)
1520 {
1521 Capability *cap = *pcap;
1522 rtsBool heap_census;
1523 uint32_t collect_gen;
1524 rtsBool major_gc;
1525 #ifdef THREADED_RTS
1526 uint32_t gc_type;
1527 uint32_t i;
1528 uint32_t need_idle;
1529 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1530 StgTSO *tso;
1531 rtsBool *idle_cap;
1532 #endif
1533
1534 if (sched_state == SCHED_SHUTTING_DOWN) {
1535 // The final GC has already been done, and the system is
1536 // shutting down. We'll probably deadlock if we try to GC
1537 // now.
1538 return;
1539 }
1540
1541 heap_census = scheduleNeedHeapProfile(rtsTrue);
1542
1543 // Figure out which generation we are collecting, so that we can
1544 // decide whether this is a parallel GC or not.
1545 collect_gen = calcNeeded(force_major || heap_census, NULL);
1546 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1547
1548 #ifdef THREADED_RTS
1549 if (sched_state < SCHED_INTERRUPTING
1550 && RtsFlags.ParFlags.parGcEnabled
1551 && collect_gen >= RtsFlags.ParFlags.parGcGen
1552 && ! oldest_gen->mark)
1553 {
1554 gc_type = SYNC_GC_PAR;
1555 } else {
1556 gc_type = SYNC_GC_SEQ;
1557 }
1558
1559 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1560 need_idle = stg_max(0, enabled_capabilities -
1561 RtsFlags.ParFlags.parGcThreads);
1562 } else {
1563 need_idle = 0;
1564 }
1565
1566 // In order to GC, there must be no threads running Haskell code.
1567 // Therefore, the GC thread needs to hold *all* the capabilities,
1568 // and release them after the GC has completed.
1569 //
1570 // This seems to be the simplest way: previous attempts involved
1571 // making all the threads with capabilities give up their
1572 // capabilities and sleep except for the *last* one, which
1573 // actually did the GC. But it's quite hard to arrange for all
1574 // the other tasks to sleep and stay asleep.
1575 //
1576
1577 /* Other capabilities are prevented from running yet more Haskell
1578 threads if pending_sync is set. Tested inside
1579 yieldCapability() and releaseCapability() in Capability.c */
1580
1581 PendingSync sync = {
1582 .type = gc_type,
1583 .idle = NULL,
1584 .task = task
1585 };
1586
1587 {
1588 SyncType prev_sync = 0;
1589 rtsBool was_syncing;
1590 do {
1591 // We need an array of size n_capabilities, but since this may
1592 // change each time around the loop we must allocate it afresh.
1593 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1594 sizeof(rtsBool),
1595 "scheduleDoGC");
1596 sync.idle = idle_cap;
1597
1598 // When using +RTS -qn, we need some capabilities to be idle during
1599 // GC. The best bet is to choose some inactive ones, so we look for
1600 // those first:
1601 uint32_t n_idle = need_idle;
1602 for (i=0; i < n_capabilities; i++) {
1603 if (capabilities[i]->disabled) {
1604 idle_cap[i] = rtsTrue;
1605 } else if (n_idle > 0 &&
1606 capabilities[i]->running_task == NULL) {
1607 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1608 n_idle--;
1609 idle_cap[i] = rtsTrue;
1610 } else {
1611 idle_cap[i] = rtsFalse;
1612 }
1613 }
1614 // If we didn't find enough inactive capabilities, just pick some
1615 // more to be idle.
1616 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1617 if (!idle_cap[i] && i != cap->no) {
1618 idle_cap[i] = rtsTrue;
1619 n_idle--;
1620 }
1621 }
1622 ASSERT(n_idle == 0);
1623
1624 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1625 cap = *pcap;
1626 if (was_syncing) {
1627 stgFree(idle_cap);
1628 }
1629 if (was_syncing && (prev_sync == SYNC_GC_SEQ ||
1630 prev_sync == SYNC_GC_PAR)) {
1631 // someone else had a pending sync request for a GC, so
1632 // let's assume GC has been done and we don't need to GC
1633 // again.
1634 return;
1635 }
1636 if (sched_state == SCHED_SHUTTING_DOWN) {
1637 // The scheduler might now be shutting down. We tested
1638 // this above, but it might have become true since then as
1639 // we yielded the capability in requestSync().
1640 return;
1641 }
1642 } while (was_syncing);
1643 }
1644
1645 #ifdef DEBUG
1646 unsigned int old_n_capabilities = n_capabilities;
1647 #endif
1648
1649 interruptAllCapabilities();
1650
1651 // The final shutdown GC is always single-threaded, because it's
1652 // possible that some of the Capabilities have no worker threads.
1653
1654 if (gc_type == SYNC_GC_SEQ) {
1655 traceEventRequestSeqGc(cap);
1656 } else {
1657 traceEventRequestParGc(cap);
1658 }
1659
1660 if (gc_type == SYNC_GC_SEQ) {
1661 // single-threaded GC: grab all the capabilities
1662 acquireAllCapabilities(cap,task);
1663 }
1664 else
1665 {
1666 // If we are load-balancing collections in this
1667 // generation, then we require all GC threads to participate
1668 // in the collection. Otherwise, we only require active
1669 // threads to participate, and we set gc_threads[i]->idle for
1670 // any idle capabilities. The rationale here is that waking
1671 // up an idle Capability takes much longer than just doing any
1672 // GC work on its behalf.
1673
1674 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1675 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1676 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1677 {
1678 for (i=0; i < n_capabilities; i++) {
1679 if (capabilities[i]->disabled) {
1680 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1681 if (idle_cap[i]) {
1682 n_idle_caps++;
1683 }
1684 } else {
1685 if (i != cap->no && idle_cap[i]) {
1686 Capability *tmpcap = capabilities[i];
1687 task->cap = tmpcap;
1688 waitForCapability(&tmpcap, task);
1689 n_idle_caps++;
1690 }
1691 }
1692 }
1693 }
1694 else
1695 {
1696 for (i=0; i < n_capabilities; i++) {
1697 if (capabilities[i]->disabled) {
1698 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1699 if (idle_cap[i]) {
1700 n_idle_caps++;
1701 }
1702 } else if (i != cap->no &&
1703 capabilities[i]->idle >=
1704 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1705 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1706 if (idle_cap[i]) {
1707 n_idle_caps++;
1708 } else {
1709 n_failed_trygrab_idles++;
1710 }
1711 }
1712 }
1713 }
1714 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1715
1716 // We set the gc_thread[i]->idle flag if that
1717 // capability/thread is not participating in this collection.
1718 // We also keep a local record of which capabilities are idle
1719 // in idle_cap[], because scheduleDoGC() is re-entrant:
1720 // another thread might start a GC as soon as we've finished
1721 // this one, and thus the gc_thread[]->idle flags are invalid
1722 // as soon as we release any threads after GC. Getting this
1723 // wrong leads to a rare and hard to debug deadlock!
1724
1725 for (i=0; i < n_capabilities; i++) {
1726 gc_threads[i]->idle = idle_cap[i];
1727 capabilities[i]->idle++;
1728 }
1729
1730 // For all capabilities participating in this GC, wait until
1731 // they have stopped mutating and are standing by for GC.
1732 waitForGcThreads(cap);
1733
1734 #if defined(THREADED_RTS)
1735 // Stable point where we can do a global check on our spark counters
1736 ASSERT(checkSparkCountInvariant());
1737 #endif
1738 }
1739
1740 #endif
1741
1742 IF_DEBUG(scheduler, printAllThreads());
1743
1744 delete_threads_and_gc:
1745 /*
1746 * We now have all the capabilities; if we're in an interrupting
1747 * state, then we should take the opportunity to delete all the
1748 * threads in the system.
1749 * Checking for major_gc ensures that the last GC is major.
1750 */
1751 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1752 deleteAllThreads(cap);
1753 #if defined(THREADED_RTS)
1754 // Discard all the sparks from every Capability. Why?
1755 // They'll probably be GC'd anyway since we've killed all the
1756 // threads. It just avoids the GC having to do any work to
1757 // figure out that any remaining sparks are garbage.
1758 for (i = 0; i < n_capabilities; i++) {
1759 capabilities[i]->spark_stats.gcd +=
1760 sparkPoolSize(capabilities[i]->sparks);
1761 // No race here since all Caps are stopped.
1762 discardSparksCap(capabilities[i]);
1763 }
1764 #endif
1765 sched_state = SCHED_SHUTTING_DOWN;
1766 }
1767
1768 /*
1769 * When there are disabled capabilities, we want to migrate any
1770 * threads away from them. Normally this happens in the
1771 * scheduler's loop, but only for unbound threads - it's really
1772 * hard for a bound thread to migrate itself. So we have another
1773 * go here.
1774 */
1775 #if defined(THREADED_RTS)
1776 for (i = enabled_capabilities; i < n_capabilities; i++) {
1777 Capability *tmp_cap, *dest_cap;
1778 tmp_cap = capabilities[i];
1779 ASSERT(tmp_cap->disabled);
1780 if (i != cap->no) {
1781 dest_cap = capabilities[i % enabled_capabilities];
1782 while (!emptyRunQueue(tmp_cap)) {
1783 tso = popRunQueue(tmp_cap);
1784 migrateThread(tmp_cap, tso, dest_cap);
1785 if (tso->bound) {
1786 traceTaskMigrate(tso->bound->task,
1787 tso->bound->task->cap,
1788 dest_cap);
1789 tso->bound->task->cap = dest_cap;
1790 }
1791 }
1792 }
1793 }
1794 #endif
1795
1796 #if defined(THREADED_RTS)
1797 // reset pending_sync *before* GC, so that when the GC threads
1798 // emerge they don't immediately re-enter the GC.
1799 pending_sync = 0;
1800 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1801 #else
1802 GarbageCollect(collect_gen, heap_census, 0, cap);
1803 #endif
1804
1805 traceSparkCounters(cap);
1806
1807 switch (recent_activity) {
1808 case ACTIVITY_INACTIVE:
1809 if (force_major) {
1810 // We are doing a GC because the system has been idle for a
1811 // timeslice and we need to check for deadlock. Record the
1812 // fact that we've done a GC and turn off the timer signal;
1813 // it will get re-enabled if we run any threads after the GC.
1814 recent_activity = ACTIVITY_DONE_GC;
1815 #ifndef PROFILING
1816 stopTimer();
1817 #endif
1818 break;
1819 }
1820 // fall through...
1821
1822 case ACTIVITY_MAYBE_NO:
1823 // the GC might have taken long enough for the timer to set
1824 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1825 // but we aren't necessarily deadlocked:
1826 recent_activity = ACTIVITY_YES;
1827 break;
1828
1829 case ACTIVITY_DONE_GC:
1830 // If we are actually active, the scheduler will reset the
1831 // recent_activity flag and re-enable the timer.
1832 break;
1833 }
1834
1835 #if defined(THREADED_RTS)
1836 // Stable point where we can do a global check on our spark counters
1837 ASSERT(checkSparkCountInvariant());
1838 #endif
1839
1840 // The heap census itself is done during GarbageCollect().
1841 if (heap_census) {
1842 performHeapProfile = rtsFalse;
1843 }
1844
1845 #if defined(THREADED_RTS)
1846
1847 // If n_capabilities has changed during GC, we're in trouble.
1848 ASSERT(n_capabilities == old_n_capabilities);
1849
1850 if (gc_type == SYNC_GC_PAR)
1851 {
1852 releaseGCThreads(cap);
1853 for (i = 0; i < n_capabilities; i++) {
1854 if (i != cap->no) {
1855 if (idle_cap[i]) {
1856 ASSERT(capabilities[i]->running_task == task);
1857 task->cap = capabilities[i];
1858 releaseCapability(capabilities[i]);
1859 } else {
1860 ASSERT(capabilities[i]->running_task != task);
1861 }
1862 }
1863 }
1864 task->cap = cap;
1865 }
1866
1867 stgFree(idle_cap);
1868 #endif
1869
1870 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1871 // GC set the heap_overflow flag, so we should proceed with
1872 // an orderly shutdown now. Ultimately we want the main
1873 // thread to return to its caller with HeapExhausted, at which
1874 // point the caller should call hs_exit(). The first step is
1875 // to delete all the threads.
1876 //
1877 // Another way to do this would be to raise an exception in
1878 // the main thread, which we really should do because it gives
1879 // the program a chance to clean up. But how do we find the
1880 // main thread? It should presumably be the same one that
1881 // gets ^C exceptions, but that's all done on the Haskell side
1882 // (GHC.TopHandler).
1883 sched_state = SCHED_INTERRUPTING;
1884 goto delete_threads_and_gc;
1885 }
1886
1887 #ifdef SPARKBALANCE
1888 /* JB
1889 Once we are all together... this would be the place to balance all
1890 spark pools. No concurrent stealing or adding of new sparks can
1891 occur. Should be defined in Sparks.c. */
1892 balanceSparkPoolsCaps(n_capabilities, capabilities);
1893 #endif
1894
1895 #if defined(THREADED_RTS)
1896 if (gc_type == SYNC_GC_SEQ) {
1897 // release our stash of capabilities.
1898 releaseAllCapabilities(n_capabilities, cap, task);
1899 }
1900 #endif
1901
1902 return;
1903 }
1904
1905 /* ---------------------------------------------------------------------------
1906 * Singleton fork(). Do not copy any running threads.
1907 * ------------------------------------------------------------------------- */
1908
1909 pid_t
1910 forkProcess(HsStablePtr *entry
1911 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1912 STG_UNUSED
1913 #endif
1914 )
1915 {
1916 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1917 pid_t pid;
1918 StgTSO* t,*next;
1919 Capability *cap;
1920 uint32_t g;
1921 Task *task = NULL;
1922 uint32_t i;
1923
1924 debugTrace(DEBUG_sched, "forking!");
1925
1926 task = newBoundTask();
1927
1928 cap = NULL;
1929 waitForCapability(&cap, task);
1930
1931 #ifdef THREADED_RTS
1932 stopAllCapabilities(&cap, task);
1933 #endif
1934
1935 // no funny business: hold locks while we fork, otherwise if some
1936 // other thread is holding a lock when the fork happens, the data
1937 // structure protected by the lock will forever be in an
1938 // inconsistent state in the child. See also #1391.
1939 ACQUIRE_LOCK(&sched_mutex);
1940 ACQUIRE_LOCK(&sm_mutex);
1941 ACQUIRE_LOCK(&stable_mutex);
1942 ACQUIRE_LOCK(&task->lock);
1943
1944 for (i=0; i < n_capabilities; i++) {
1945 ACQUIRE_LOCK(&capabilities[i]->lock);
1946 }
1947
1948 #ifdef THREADED_RTS
1949 ACQUIRE_LOCK(&all_tasks_mutex);
1950 #endif
1951
1952 stopTimer(); // See #4074
1953
1954 #if defined(TRACING)
1955 flushEventLog(); // so that child won't inherit dirty file buffers
1956 #endif
1957
1958 pid = fork();
1959
1960 if (pid) { // parent
1961
1962 startTimer(); // #4074
1963
1964 RELEASE_LOCK(&sched_mutex);
1965 RELEASE_LOCK(&sm_mutex);
1966 RELEASE_LOCK(&stable_mutex);
1967 RELEASE_LOCK(&task->lock);
1968
1969 for (i=0; i < n_capabilities; i++) {
1970 releaseCapability_(capabilities[i],rtsFalse);
1971 RELEASE_LOCK(&capabilities[i]->lock);
1972 }
1973
1974 #ifdef THREADED_RTS
1975 RELEASE_LOCK(&all_tasks_mutex);
1976 #endif
1977
1978 boundTaskExiting(task);
1979
1980 // just return the pid
1981 return pid;
1982
1983 } else { // child
1984
1985 #if defined(THREADED_RTS)
1986 initMutex(&sched_mutex);
1987 initMutex(&sm_mutex);
1988 initMutex(&stable_mutex);
1989 initMutex(&task->lock);
1990
1991 for (i=0; i < n_capabilities; i++) {
1992 initMutex(&capabilities[i]->lock);
1993 }
1994
1995 initMutex(&all_tasks_mutex);
1996 #endif
1997
1998 #ifdef TRACING
1999 resetTracing();
2000 #endif
2001
2002 // Now, all OS threads except the thread that forked are
2003 // stopped. We need to stop all Haskell threads, including
2004 // those involved in foreign calls. Also we need to delete
2005 // all Tasks, because they correspond to OS threads that are
2006 // now gone.
2007
2008 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2009 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2010 next = t->global_link;
2011 // don't allow threads to catch the ThreadKilled
2012 // exception, but we do want to raiseAsync() because these
2013 // threads may be evaluating thunks that we need later.
2014 deleteThread_(t->cap,t);
2015
2016 // stop the GC from updating the InCall to point to
2017 // the TSO. This is only necessary because the
2018 // OSThread bound to the TSO has been killed, and
2019 // won't get a chance to exit in the usual way (see
2020 // also scheduleHandleThreadFinished).
2021 t->bound = NULL;
2022 }
2023 }
2024
2025 discardTasksExcept(task);
2026
2027 for (i=0; i < n_capabilities; i++) {
2028 cap = capabilities[i];
2029
2030 // Empty the run queue. It seems tempting to let all the
2031 // killed threads stay on the run queue as zombies to be
2032 // cleaned up later, but some of them may correspond to
2033 // bound threads for which the corresponding Task does not
2034 // exist.
2035 truncateRunQueue(cap);
2036
2037 // Any suspended C-calling Tasks are no more, their OS threads
2038 // don't exist now:
2039 cap->suspended_ccalls = NULL;
2040
2041 #if defined(THREADED_RTS)
2042 // Wipe our spare workers list, they no longer exist. New
2043 // workers will be created if necessary.
2044 cap->spare_workers = NULL;
2045 cap->n_spare_workers = 0;
2046 cap->returning_tasks_hd = NULL;
2047 cap->returning_tasks_tl = NULL;
2048 #endif
2049
2050 // Release all caps except 0, we'll use that for starting
2051 // the IO manager and running the client action below.
2052 if (cap->no != 0) {
2053 task->cap = cap;
2054 releaseCapability(cap);
2055 }
2056 }
2057 cap = capabilities[0];
2058 task->cap = cap;
2059
2060 // Empty the threads lists. Otherwise, the garbage
2061 // collector may attempt to resurrect some of these threads.
2062 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2063 generations[g].threads = END_TSO_QUEUE;
2064 }
2065
2066 // On Unix, all timers are reset in the child, so we need to start
2067 // the timer again.
2068 initTimer();
2069 startTimer();
2070
2071 // TODO: need to trace various other things in the child
2072 // like startup event, capabilities, process info etc
2073 traceTaskCreate(task, cap);
2074
2075 #if defined(THREADED_RTS)
2076 ioManagerStartCap(&cap);
2077 #endif
2078
2079 rts_evalStableIO(&cap, entry, NULL); // run the action
2080 rts_checkSchedStatus("forkProcess",cap);
2081
2082 rts_unlock(cap);
2083 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2084 }
2085 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2086 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2087 #endif
2088 }
2089
2090 /* ---------------------------------------------------------------------------
2091 * Changing the number of Capabilities
2092 *
2093 * Changing the number of Capabilities is very tricky! We can only do
2094 * it with the system fully stopped, so we do a full sync with
2095 * requestSync(SYNC_OTHER) and grab all the capabilities.
2096 *
2097 * Then we resize the appropriate data structures, and update all
2098 * references to the old data structures which have now moved.
2099 * Finally we release the Capabilities we are holding, and start
2100 * worker Tasks on the new Capabilities we created.
2101 *
2102 * ------------------------------------------------------------------------- */
2103
2104 void
2105 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2106 {
2107 #if !defined(THREADED_RTS)
2108 if (new_n_capabilities != 1) {
2109 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2110 }
2111 return;
2112 #elif defined(NOSMP)
2113 if (new_n_capabilities != 1) {
2114 errorBelch("setNumCapabilities: not supported on this platform");
2115 }
2116 return;
2117 #else
2118 Task *task;
2119 Capability *cap;
2120 uint32_t n;
2121 Capability *old_capabilities = NULL;
2122 uint32_t old_n_capabilities = n_capabilities;
2123
2124 if (new_n_capabilities == enabled_capabilities) return;
2125
2126 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2127 enabled_capabilities, new_n_capabilities);
2128
2129 cap = rts_lock();
2130 task = cap->running_task;
2131
2132 stopAllCapabilities(&cap, task);
2133
2134 if (new_n_capabilities < enabled_capabilities)
2135 {
2136 // Reducing the number of capabilities: we do not actually
2137 // remove the extra capabilities, we just mark them as
2138 // "disabled". This has the following effects:
2139 //
2140 // - threads on a disabled capability are migrated away by the
2141 // scheduler loop
2142 //
2143 // - disabled capabilities do not participate in GC
2144 // (see scheduleDoGC())
2145 //
2146 // - No spark threads are created on this capability
2147 // (see scheduleActivateSpark())
2148 //
2149 // - We do not attempt to migrate threads *to* a disabled
2150 // capability (see schedulePushWork()).
2151 //
2152 // but in other respects, a disabled capability remains
2153 // alive. Threads may be woken up on a disabled capability,
2154 // but they will be immediately migrated away.
2155 //
2156 // This approach is much easier than trying to actually remove
2157 // the capability; we don't have to worry about GC data
2158 // structures, the nursery, etc.
2159 //
2160 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2161 capabilities[n]->disabled = rtsTrue;
2162 traceCapDisable(capabilities[n]);
2163 }
2164 enabled_capabilities = new_n_capabilities;
2165 }
2166 else
2167 {
2168 // Increasing the number of enabled capabilities.
2169 //
2170 // enable any disabled capabilities, up to the required number
2171 for (n = enabled_capabilities;
2172 n < new_n_capabilities && n < n_capabilities; n++) {
2173 capabilities[n]->disabled = rtsFalse;
2174 traceCapEnable(capabilities[n]);
2175 }
2176 enabled_capabilities = n;
2177
2178 if (new_n_capabilities > n_capabilities) {
2179 #if defined(TRACING)
2180 // Allocate eventlog buffers for the new capabilities. Note this
2181 // must be done before calling moreCapabilities(), because that
2182 // will emit events about creating the new capabilities and adding
2183 // them to existing capsets.
2184 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2185 #endif
2186
2187 // Resize the capabilities array
2188 // NB. after this, capabilities points somewhere new. Any pointers
2189 // of type (Capability *) are now invalid.
2190 moreCapabilities(n_capabilities, new_n_capabilities);
2191
2192 // Resize and update storage manager data structures
2193 storageAddCapabilities(n_capabilities, new_n_capabilities);
2194 }
2195 }
2196
2197 // update n_capabilities before things start running
2198 if (new_n_capabilities > n_capabilities) {
2199 n_capabilities = enabled_capabilities = new_n_capabilities;
2200 }
2201
2202 // Start worker tasks on the new Capabilities
2203 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2204
2205 // We're done: release the original Capabilities
2206 releaseAllCapabilities(old_n_capabilities, cap,task);
2207
2208 // We can't free the old array until now, because we access it
2209 // while updating pointers in updateCapabilityRefs().
2210 if (old_capabilities) {
2211 stgFree(old_capabilities);
2212 }
2213
2214 // Notify IO manager that the number of capabilities has changed.
2215 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2216
2217 rts_unlock(cap);
2218
2219 #endif // THREADED_RTS
2220 }
2221
2222
2223
2224 /* ---------------------------------------------------------------------------
2225 * Delete all the threads in the system
2226 * ------------------------------------------------------------------------- */
2227
2228 static void
2229 deleteAllThreads ( Capability *cap )
2230 {
2231 // NOTE: only safe to call if we own all capabilities.
2232
2233 StgTSO* t, *next;
2234 uint32_t g;
2235
2236 debugTrace(DEBUG_sched,"deleting all threads");
2237 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2238 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2239 next = t->global_link;
2240 deleteThread(cap,t);
2241 }
2242 }
2243
2244 // The run queue now contains a bunch of ThreadKilled threads. We
2245 // must not throw these away: the main thread(s) will be in there
2246 // somewhere, and the main scheduler loop has to deal with it.
2247 // Also, the run queue is the only thing keeping these threads from
2248 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2249
2250 #if !defined(THREADED_RTS)
2251 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2252 ASSERT(sleeping_queue == END_TSO_QUEUE);
2253 #endif
2254 }
2255
2256 /* -----------------------------------------------------------------------------
2257 Managing the suspended_ccalls list.
2258 Locks required: sched_mutex
2259 -------------------------------------------------------------------------- */
2260
2261 STATIC_INLINE void
2262 suspendTask (Capability *cap, Task *task)
2263 {
2264 InCall *incall;
2265
2266 incall = task->incall;
2267 ASSERT(incall->next == NULL && incall->prev == NULL);
2268 incall->next = cap->suspended_ccalls;
2269 incall->prev = NULL;
2270 if (cap->suspended_ccalls) {
2271 cap->suspended_ccalls->prev = incall;
2272 }
2273 cap->suspended_ccalls = incall;
2274 }
2275
2276 STATIC_INLINE void
2277 recoverSuspendedTask (Capability *cap, Task *task)
2278 {
2279 InCall *incall;
2280
2281 incall = task->incall;
2282 if (incall->prev) {
2283 incall->prev->next = incall->next;
2284 } else {
2285 ASSERT(cap->suspended_ccalls == incall);
2286 cap->suspended_ccalls = incall->next;
2287 }
2288 if (incall->next) {
2289 incall->next->prev = incall->prev;
2290 }
2291 incall->next = incall->prev = NULL;
2292 }
2293
2294 /* ---------------------------------------------------------------------------
2295 * Suspending & resuming Haskell threads.
2296 *
2297 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2298 * its capability before calling the C function. This allows another
2299 * task to pick up the capability and carry on running Haskell
2300 * threads. It also means that if the C call blocks, it won't lock
2301 * the whole system.
2302 *
2303 * The Haskell thread making the C call is put to sleep for the
2304 * duration of the call, on the suspended_ccalling_threads queue. We
2305 * give out a token to the task, which it can use to resume the thread
2306 * on return from the C function.
2307 *
2308 * If this is an interruptible C call, this means that the FFI call may be
2309 * unceremoniously terminated and should be scheduled on an
2310 * unbound worker thread.
2311 * ------------------------------------------------------------------------- */
2312
2313 void *
2314 suspendThread (StgRegTable *reg, rtsBool interruptible)
2315 {
2316 Capability *cap;
2317 int saved_errno;
2318 StgTSO *tso;
2319 Task *task;
2320 #if mingw32_HOST_OS
2321 StgWord32 saved_winerror;
2322 #endif
2323
2324 saved_errno = errno;
2325 #if mingw32_HOST_OS
2326 saved_winerror = GetLastError();
2327 #endif
2328
2329 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2330 */
2331 cap = regTableToCapability(reg);
2332
2333 task = cap->running_task;
2334 tso = cap->r.rCurrentTSO;
2335
2336 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2337
2338 // XXX this might not be necessary --SDM
2339 tso->what_next = ThreadRunGHC;
2340
2341 threadPaused(cap,tso);
2342
2343 if (interruptible) {
2344 tso->why_blocked = BlockedOnCCall_Interruptible;
2345 } else {
2346 tso->why_blocked = BlockedOnCCall;
2347 }
2348
2349 // Hand back capability
2350 task->incall->suspended_tso = tso;
2351 task->incall->suspended_cap = cap;
2352
2353 // Otherwise allocate() will write to invalid memory.
2354 cap->r.rCurrentTSO = NULL;
2355
2356 ACQUIRE_LOCK(&cap->lock);
2357
2358 suspendTask(cap,task);
2359 cap->in_haskell = rtsFalse;
2360 releaseCapability_(cap,rtsFalse);
2361
2362 RELEASE_LOCK(&cap->lock);
2363
2364 errno = saved_errno;
2365 #if mingw32_HOST_OS
2366 SetLastError(saved_winerror);
2367 #endif
2368 return task;
2369 }
2370
2371 StgRegTable *
2372 resumeThread (void *task_)
2373 {
2374 StgTSO *tso;
2375 InCall *incall;
2376 Capability *cap;
2377 Task *task = task_;
2378 int saved_errno;
2379 #if mingw32_HOST_OS
2380 StgWord32 saved_winerror;
2381 #endif
2382
2383 saved_errno = errno;
2384 #if mingw32_HOST_OS
2385 saved_winerror = GetLastError();
2386 #endif
2387
2388 incall = task->incall;
2389 cap = incall->suspended_cap;
2390 task->cap = cap;
2391
2392 // Wait for permission to re-enter the RTS with the result.
2393 waitForCapability(&cap,task);
2394 // we might be on a different capability now... but if so, our
2395 // entry on the suspended_ccalls list will also have been
2396 // migrated.
2397
2398 // Remove the thread from the suspended list
2399 recoverSuspendedTask(cap,task);
2400
2401 tso = incall->suspended_tso;
2402 incall->suspended_tso = NULL;
2403 incall->suspended_cap = NULL;
2404 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2405
2406 traceEventRunThread(cap, tso);
2407
2408 /* Reset blocking status */
2409 tso->why_blocked = NotBlocked;
2410
2411 if ((tso->flags & TSO_BLOCKEX) == 0) {
2412 // avoid locking the TSO if we don't have to
2413 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2414 maybePerformBlockedException(cap,tso);
2415 }
2416 }
2417
2418 cap->r.rCurrentTSO = tso;
2419 cap->in_haskell = rtsTrue;
2420 errno = saved_errno;
2421 #if mingw32_HOST_OS
2422 SetLastError(saved_winerror);
2423 #endif
2424
2425 /* We might have GC'd, mark the TSO dirty again */
2426 dirty_TSO(cap,tso);
2427 dirty_STACK(cap,tso->stackobj);
2428
2429 IF_DEBUG(sanity, checkTSO(tso));
2430
2431 return &cap->r;
2432 }
2433
2434 /* ---------------------------------------------------------------------------
2435 * scheduleThread()
2436 *
2437 * scheduleThread puts a thread on the end of the runnable queue.
2438 * This will usually be done immediately after a thread is created.
2439 * The caller of scheduleThread must create the thread using e.g.
2440 * createThread and push an appropriate closure
2441 * on this thread's stack before the scheduler is invoked.
2442 * ------------------------------------------------------------------------ */
2443
2444 void
2445 scheduleThread(Capability *cap, StgTSO *tso)
2446 {
2447 // The thread goes at the *end* of the run-queue, to avoid possible
2448 // starvation of any threads already on the queue.
2449 appendToRunQueue(cap,tso);
2450 }
2451
2452 void
2453 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2454 {
2455 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2456 // move this thread from now on.
2457 #if defined(THREADED_RTS)
2458 cpu %= enabled_capabilities;
2459 if (cpu == cap->no) {
2460 appendToRunQueue(cap,tso);
2461 } else {
2462 migrateThread(cap, tso, capabilities[cpu]);
2463 }
2464 #else
2465 appendToRunQueue(cap,tso);
2466 #endif
2467 }
2468
2469 void
2470 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2471 {
2472 Task *task;
2473 DEBUG_ONLY( StgThreadID id );
2474 Capability *cap;
2475
2476 cap = *pcap;
2477
2478 // We already created/initialised the Task
2479 task = cap->running_task;
2480
2481 // This TSO is now a bound thread; make the Task and TSO
2482 // point to each other.
2483 tso->bound = task->incall;
2484 tso->cap = cap;
2485
2486 task->incall->tso = tso;
2487 task->incall->ret = ret;
2488 task->incall->rstat = NoStatus;
2489
2490 appendToRunQueue(cap,tso);
2491
2492 DEBUG_ONLY( id = tso->id );
2493 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2494
2495 cap = schedule(cap,task);
2496
2497 ASSERT(task->incall->rstat != NoStatus);
2498 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2499
2500 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2501 *pcap = cap;
2502 }
2503
2504 /* ----------------------------------------------------------------------------
2505 * Starting Tasks
2506 * ------------------------------------------------------------------------- */
2507
2508 #if defined(THREADED_RTS)
2509 void scheduleWorker (Capability *cap, Task *task)
2510 {
2511 // schedule() runs without a lock.
2512 cap = schedule(cap,task);
2513
2514 // On exit from schedule(), we have a Capability, but possibly not
2515 // the same one we started with.
2516
2517 // During shutdown, the requirement is that after all the
2518 // Capabilities are shut down, all workers that are shutting down
2519 // have finished workerTaskStop(). This is why we hold on to
2520 // cap->lock until we've finished workerTaskStop() below.
2521 //
2522 // There may be workers still involved in foreign calls; those
2523 // will just block in waitForCapability() because the
2524 // Capability has been shut down.
2525 //
2526 ACQUIRE_LOCK(&cap->lock);
2527 releaseCapability_(cap,rtsFalse);
2528 workerTaskStop(task);
2529 RELEASE_LOCK(&cap->lock);
2530 }
2531 #endif
2532
2533 /* ---------------------------------------------------------------------------
2534 * Start new worker tasks on Capabilities from--to
2535 * -------------------------------------------------------------------------- */
2536
2537 static void
2538 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2539 {
2540 #if defined(THREADED_RTS)
2541 uint32_t i;
2542 Capability *cap;
2543
2544 for (i = from; i < to; i++) {
2545 cap = capabilities[i];
2546 ACQUIRE_LOCK(&cap->lock);
2547 startWorkerTask(cap);
2548 RELEASE_LOCK(&cap->lock);
2549 }
2550 #endif
2551 }
2552
2553 /* ---------------------------------------------------------------------------
2554 * initScheduler()
2555 *
2556 * Initialise the scheduler. This resets all the queues - if the
2557 * queues contained any threads, they'll be garbage collected at the
2558 * next pass.
2559 *
2560 * ------------------------------------------------------------------------ */
2561
2562 void
2563 initScheduler(void)
2564 {
2565 #if !defined(THREADED_RTS)
2566 blocked_queue_hd = END_TSO_QUEUE;
2567 blocked_queue_tl = END_TSO_QUEUE;
2568 sleeping_queue = END_TSO_QUEUE;
2569 #endif
2570
2571 sched_state = SCHED_RUNNING;
2572 recent_activity = ACTIVITY_YES;
2573
2574 #if defined(THREADED_RTS)
2575 /* Initialise the mutex and condition variables used by
2576 * the scheduler. */
2577 initMutex(&sched_mutex);
2578 #endif
2579
2580 ACQUIRE_LOCK(&sched_mutex);
2581
2582 /* A capability holds the state a native thread needs in
2583 * order to execute STG code. At least one capability is
2584 * floating around (only THREADED_RTS builds have more than one).
2585 */
2586 initCapabilities();
2587
2588 initTaskManager();
2589
2590 /*
2591 * Eagerly start one worker to run each Capability, except for
2592 * Capability 0. The idea is that we're probably going to start a
2593 * bound thread on Capability 0 pretty soon, so we don't want a
2594 * worker task hogging it.
2595 */
2596 startWorkerTasks(1, n_capabilities);
2597
2598 RELEASE_LOCK(&sched_mutex);
2599
2600 }
2601
2602 void
2603 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2604 /* see Capability.c, shutdownCapability() */
2605 {
2606 Task *task = NULL;
2607
2608 task = newBoundTask();
2609
2610 // If we haven't killed all the threads yet, do it now.
2611 if (sched_state < SCHED_SHUTTING_DOWN) {
2612 sched_state = SCHED_INTERRUPTING;
2613 Capability *cap = task->cap;
2614 waitForCapability(&cap,task);
2615 scheduleDoGC(&cap,task,rtsTrue);
2616 ASSERT(task->incall->tso == NULL);
2617 releaseCapability(cap);
2618 }
2619 sched_state = SCHED_SHUTTING_DOWN;
2620
2621 shutdownCapabilities(task, wait_foreign);
2622
2623 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2624 // n_failed_trygrab_idles, n_idle_caps);
2625
2626 boundTaskExiting(task);
2627 }
2628
2629 void
2630 freeScheduler( void )
2631 {
2632 uint32_t still_running;
2633
2634 ACQUIRE_LOCK(&sched_mutex);
2635 still_running = freeTaskManager();
2636 // We can only free the Capabilities if there are no Tasks still
2637 // running. We might have a Task about to return from a foreign
2638 // call into waitForCapability(), for example (actually,
2639 // this should be the *only* thing that a still-running Task can
2640 // do at this point, and it will block waiting for the
2641 // Capability).
2642 if (still_running == 0) {
2643 freeCapabilities();
2644 }
2645 RELEASE_LOCK(&sched_mutex);
2646 #if defined(THREADED_RTS)
2647 closeMutex(&sched_mutex);
2648 #endif
2649 }
2650
2651 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2652 void *user USED_IF_NOT_THREADS)
2653 {
2654 #if !defined(THREADED_RTS)
2655 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2656 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2657 evac(user, (StgClosure **)(void *)&sleeping_queue);
2658 #endif
2659 }
2660
2661 /* -----------------------------------------------------------------------------
2662 performGC
2663
2664 This is the interface to the garbage collector from Haskell land.
2665 We provide this so that external C code can allocate and garbage
2666 collect when called from Haskell via _ccall_GC.
2667 -------------------------------------------------------------------------- */
2668
2669 static void
2670 performGC_(rtsBool force_major)
2671 {
2672 Task *task;
2673 Capability *cap = NULL;
2674
2675 // We must grab a new Task here, because the existing Task may be
2676 // associated with a particular Capability, and chained onto the
2677 // suspended_ccalls queue.
2678 task = newBoundTask();
2679
2680 // TODO: do we need to traceTask*() here?
2681
2682 waitForCapability(&cap,task);
2683 scheduleDoGC(&cap,task,force_major);
2684 releaseCapability(cap);
2685 boundTaskExiting(task);
2686 }
2687
2688 void
2689 performGC(void)
2690 {
2691 performGC_(rtsFalse);
2692 }
2693
2694 void
2695 performMajorGC(void)
2696 {
2697 performGC_(rtsTrue);
2698 }
2699
2700 /* ---------------------------------------------------------------------------
2701 Interrupt execution.
2702 Might be called inside a signal handler so it mustn't do anything fancy.
2703 ------------------------------------------------------------------------ */
2704
2705 void
2706 interruptStgRts(void)
2707 {
2708 sched_state = SCHED_INTERRUPTING;
2709 interruptAllCapabilities();
2710 #if defined(THREADED_RTS)
2711 wakeUpRts();
2712 #endif
2713 }
2714
2715 /* -----------------------------------------------------------------------------
2716 Wake up the RTS
2717
2718 This function causes at least one OS thread to wake up and run the
2719 scheduler loop. It is invoked when the RTS might be deadlocked, or
2720 an external event has arrived that may need servicing (eg. a
2721 keyboard interrupt).
2722
2723 In the single-threaded RTS we don't do anything here; we only have
2724 one thread anyway, and the event that caused us to want to wake up
2725 will have interrupted any blocking system call in progress anyway.
2726 -------------------------------------------------------------------------- */
2727
2728 #if defined(THREADED_RTS)
2729 void wakeUpRts(void)
2730 {
2731 // This forces the IO Manager thread to wakeup, which will
2732 // in turn ensure that some OS thread wakes up and runs the
2733 // scheduler loop, which will cause a GC and deadlock check.
2734 ioManagerWakeup();
2735 }
2736 #endif
2737
2738 /* -----------------------------------------------------------------------------
2739 Deleting threads
2740
2741 This is used for interruption (^C) and forking, and corresponds to
2742 raising an exception but without letting the thread catch the
2743 exception.
2744 -------------------------------------------------------------------------- */
2745
2746 static void
2747 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2748 {
2749 // NOTE: must only be called on a TSO that we have exclusive
2750 // access to, because we will call throwToSingleThreaded() below.
2751 // The TSO must be on the run queue of the Capability we own, or
2752 // we must own all Capabilities.
2753
2754 if (tso->why_blocked != BlockedOnCCall &&
2755 tso->why_blocked != BlockedOnCCall_Interruptible) {
2756 throwToSingleThreaded(tso->cap,tso,NULL);
2757 }
2758 }
2759
2760 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2761 static void
2762 deleteThread_(Capability *cap, StgTSO *tso)
2763 { // for forkProcess only:
2764 // like deleteThread(), but we delete threads in foreign calls, too.
2765
2766 if (tso->why_blocked == BlockedOnCCall ||
2767 tso->why_blocked == BlockedOnCCall_Interruptible) {
2768 tso->what_next = ThreadKilled;
2769 appendToRunQueue(tso->cap, tso);
2770 } else {
2771 deleteThread(cap,tso);
2772 }
2773 }
2774 #endif
2775
2776 /* -----------------------------------------------------------------------------
2777 raiseExceptionHelper
2778
2779 This function is called by the raise# primitve, just so that we can
2780 move some of the tricky bits of raising an exception from C-- into
2781 C. Who knows, it might be a useful re-useable thing here too.
2782 -------------------------------------------------------------------------- */
2783
2784 StgWord
2785 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2786 {
2787 Capability *cap = regTableToCapability(reg);
2788 StgThunk *raise_closure = NULL;
2789 StgPtr p, next;
2790 StgRetInfoTable *info;
2791 //
2792 // This closure represents the expression 'raise# E' where E
2793 // is the exception raise. It is used to overwrite all the
2794 // thunks which are currently under evaluataion.
2795 //
2796
2797 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2798 // LDV profiling: stg_raise_info has THUNK as its closure
2799 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2800 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2801 // 1 does not cause any problem unless profiling is performed.
2802 // However, when LDV profiling goes on, we need to linearly scan
2803 // small object pool, where raise_closure is stored, so we should
2804 // use MIN_UPD_SIZE.
2805 //
2806 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2807 // sizeofW(StgClosure)+1);
2808 //
2809
2810 //
2811 // Walk up the stack, looking for the catch frame. On the way,
2812 // we update any closures pointed to from update frames with the
2813 // raise closure that we just built.
2814 //
2815 p = tso->stackobj->sp;
2816 while(1) {
2817 info = get_ret_itbl((StgClosure *)p);
2818 next = p + stack_frame_sizeW((StgClosure *)p);
2819 switch (info->i.type) {
2820
2821 case UPDATE_FRAME:
2822 // Only create raise_closure if we need to.
2823 if (raise_closure == NULL) {
2824 raise_closure =
2825 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2826 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2827 raise_closure->payload[0] = exception;
2828 }
2829 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2830 (StgClosure *)raise_closure);
2831 p = next;
2832 continue;
2833
2834 case ATOMICALLY_FRAME:
2835 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2836 tso->stackobj->sp = p;
2837 return ATOMICALLY_FRAME;
2838
2839 case CATCH_FRAME:
2840 tso->stackobj->sp = p;
2841 return CATCH_FRAME;
2842
2843 case CATCH_STM_FRAME:
2844 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2845 tso->stackobj->sp = p;
2846 return CATCH_STM_FRAME;
2847
2848 case UNDERFLOW_FRAME:
2849 tso->stackobj->sp = p;
2850 threadStackUnderflow(cap,tso);
2851 p = tso->stackobj->sp;
2852 continue;
2853
2854 case STOP_FRAME:
2855 tso->stackobj->sp = p;
2856 return STOP_FRAME;
2857
2858 case CATCH_RETRY_FRAME: {
2859 StgTRecHeader *trec = tso -> trec;
2860 StgTRecHeader *outer = trec -> enclosing_trec;
2861 debugTrace(DEBUG_stm,
2862 "found CATCH_RETRY_FRAME at %p during raise", p);
2863 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2864 stmAbortTransaction(cap, trec);
2865 stmFreeAbortedTRec(cap, trec);
2866 tso -> trec = outer;
2867 p = next;
2868 continue;
2869 }
2870
2871 default:
2872 p = next;
2873 continue;
2874 }
2875 }
2876 }
2877
2878
2879 /* -----------------------------------------------------------------------------
2880 findRetryFrameHelper
2881
2882 This function is called by the retry# primitive. It traverses the stack
2883 leaving tso->sp referring to the frame which should handle the retry.
2884
2885 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2886 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2887
2888 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2889 create) because retries are not considered to be exceptions, despite the
2890 similar implementation.
2891
2892 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2893 not be created within memory transactions.
2894 -------------------------------------------------------------------------- */
2895
2896 StgWord
2897 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2898 {
2899 StgPtr p, next;
2900 StgRetInfoTable *info;
2901
2902 p = tso->stackobj->sp;
2903 while (1) {
2904 info = get_ret_itbl((StgClosure *)p);
2905 next = p + stack_frame_sizeW((StgClosure *)p);
2906 switch (info->i.type) {
2907
2908 case ATOMICALLY_FRAME:
2909 debugTrace(DEBUG_stm,
2910 "found ATOMICALLY_FRAME at %p during retry", p);
2911 tso->stackobj->sp = p;
2912 return ATOMICALLY_FRAME;
2913
2914 case CATCH_RETRY_FRAME:
2915 debugTrace(DEBUG_stm,
2916 "found CATCH_RETRY_FRAME at %p during retry", p);
2917 tso->stackobj->sp = p;
2918 return CATCH_RETRY_FRAME;
2919
2920 case CATCH_STM_FRAME: {
2921 StgTRecHeader *trec = tso -> trec;
2922 StgTRecHeader *outer = trec -> enclosing_trec;
2923 debugTrace(DEBUG_stm,
2924 "found CATCH_STM_FRAME at %p during retry", p);
2925 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2926 stmAbortTransaction(cap, trec);
2927 stmFreeAbortedTRec(cap, trec);
2928 tso -> trec = outer;
2929 p = next;
2930 continue;
2931 }
2932
2933 case UNDERFLOW_FRAME:
2934 tso->stackobj->sp = p;
2935 threadStackUnderflow(cap,tso);
2936 p = tso->stackobj->sp;
2937 continue;
2938
2939 default:
2940 ASSERT(info->i.type != CATCH_FRAME);
2941 ASSERT(info->i.type != STOP_FRAME);
2942 p = next;
2943 continue;
2944 }
2945 }
2946 }
2947
2948 /* -----------------------------------------------------------------------------
2949 resurrectThreads is called after garbage collection on the list of
2950 threads found to be garbage. Each of these threads will be woken
2951 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2952 on an MVar, or NonTermination if the thread was blocked on a Black
2953 Hole.
2954
2955 Locks: assumes we hold *all* the capabilities.
2956 -------------------------------------------------------------------------- */
2957
2958 void
2959 resurrectThreads (StgTSO *threads)
2960 {
2961 StgTSO *tso, *next;
2962 Capability *cap;
2963 generation *gen;
2964
2965 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2966 next = tso->global_link;
2967
2968 gen = Bdescr((P_)tso)->gen;
2969 tso->global_link = gen->threads;
2970 gen->threads = tso;
2971
2972 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2973
2974 // Wake up the thread on the Capability it was last on
2975 cap = tso->cap;
2976
2977 switch (tso->why_blocked) {
2978 case BlockedOnMVar:
2979 case BlockedOnMVarRead:
2980 /* Called by GC - sched_mutex lock is currently held. */
2981 throwToSingleThreaded(cap, tso,
2982 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2983 break;
2984 case BlockedOnBlackHole:
2985 throwToSingleThreaded(cap, tso,
2986 (StgClosure *)nonTermination_closure);
2987 break;
2988 case BlockedOnSTM:
2989 throwToSingleThreaded(cap, tso,
2990 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2991 break;
2992 case NotBlocked:
2993 /* This might happen if the thread was blocked on a black hole
2994 * belonging to a thread that we've just woken up (raiseAsync
2995 * can wake up threads, remember...).
2996 */
2997 continue;
2998 case BlockedOnMsgThrowTo:
2999 // This can happen if the target is masking, blocks on a
3000 // black hole, and then is found to be unreachable. In
3001 // this case, we want to let the target wake up and carry
3002 // on, and do nothing to this thread.
3003 continue;
3004 default:
3005 barf("resurrectThreads: thread blocked in a strange way: %d",
3006 tso->why_blocked);
3007 }
3008 }
3009 }