Fix double-free in T5644 (#12208)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 /* -----------------------------------------------------------------------------
107 * static function prototypes
108 * -------------------------------------------------------------------------- */
109
110 static Capability *schedule (Capability *initialCapability, Task *task);
111
112 //
113 // These functions all encapsulate parts of the scheduler loop, and are
114 // abstracted only to make the structure and control flow of the
115 // scheduler clearer.
116 //
117 static void schedulePreLoop (void);
118 static void scheduleFindWork (Capability **pcap);
119 #if defined(THREADED_RTS)
120 static void scheduleYield (Capability **pcap, Task *task);
121 #endif
122 #if defined(THREADED_RTS)
123 static rtsBool requestSync (Capability **pcap, Task *task,
124 PendingSync *sync_type, SyncType *prev_sync_type);
125 static void acquireAllCapabilities(Capability *cap, Task *task);
126 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
127 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
128 uint32_t to USED_IF_THREADS);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability **cap);
133 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141 uint32_t prev_what_next );
142 static void scheduleHandleThreadBlocked( StgTSO *t );
143 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144 StgTSO *t );
145 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148 static void deleteThread (Capability *cap, StgTSO *tso);
149 static void deleteAllThreads (Capability *cap);
150
151 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
152 static void deleteThread_(Capability *cap, StgTSO *tso);
153 #endif
154
155 /* ---------------------------------------------------------------------------
156 Main scheduling loop.
157
158 We use round-robin scheduling, each thread returning to the
159 scheduler loop when one of these conditions is detected:
160
161 * out of heap space
162 * timer expires (thread yields)
163 * thread blocks
164 * thread ends
165 * stack overflow
166
167 ------------------------------------------------------------------------ */
168
169 static Capability *
170 schedule (Capability *initialCapability, Task *task)
171 {
172 StgTSO *t;
173 Capability *cap;
174 StgThreadReturnCode ret;
175 uint32_t prev_what_next;
176 rtsBool ready_to_gc;
177 #if defined(THREADED_RTS)
178 rtsBool first = rtsTrue;
179 #endif
180
181 cap = initialCapability;
182
183 // Pre-condition: this task owns initialCapability.
184 // The sched_mutex is *NOT* held
185 // NB. on return, we still hold a capability.
186
187 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188
189 schedulePreLoop();
190
191 // -----------------------------------------------------------
192 // Scheduler loop starts here:
193
194 while (1) {
195
196 // Check whether we have re-entered the RTS from Haskell without
197 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
198 // call).
199 if (cap->in_haskell) {
200 errorBelch("schedule: re-entered unsafely.\n"
201 " Perhaps a 'foreign import unsafe' should be 'safe'?");
202 stg_exit(EXIT_FAILURE);
203 }
204
205 // Note [shutdown]: The interruption / shutdown sequence.
206 //
207 // In order to cleanly shut down the runtime, we want to:
208 // * make sure that all main threads return to their callers
209 // with the state 'Interrupted'.
210 // * clean up all OS threads assocated with the runtime
211 // * free all memory etc.
212 //
213 // So the sequence goes like this:
214 //
215 // * The shutdown sequence is initiated by calling hs_exit(),
216 // interruptStgRts(), or running out of memory in the GC.
217 //
218 // * Set sched_state = SCHED_INTERRUPTING
219 //
220 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
221 // scheduleDoGC(), which halts the whole runtime by acquiring all the
222 // capabilities, does a GC and then calls deleteAllThreads() to kill all
223 // the remaining threads. The zombies are left on the run queue for
224 // cleaning up. We can't kill threads involved in foreign calls.
225 //
226 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
227 //
228 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
229 // enforced by the scheduler, which won't run any Haskell code when
230 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
231 // other capabilities by doing the GC earlier.
232 //
233 // * all workers exit when the run queue on their capability
234 // drains. All main threads will also exit when their TSO
235 // reaches the head of the run queue and they can return.
236 //
237 // * eventually all Capabilities will shut down, and the RTS can
238 // exit.
239 //
240 // * We might be left with threads blocked in foreign calls,
241 // we should really attempt to kill these somehow (TODO).
242
243 switch (sched_state) {
244 case SCHED_RUNNING:
245 break;
246 case SCHED_INTERRUPTING:
247 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248 /* scheduleDoGC() deletes all the threads */
249 scheduleDoGC(&cap,task,rtsTrue);
250
251 // after scheduleDoGC(), we must be shutting down. Either some
252 // other Capability did the final GC, or we did it above,
253 // either way we can fall through to the SCHED_SHUTTING_DOWN
254 // case now.
255 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
256 // fall through
257
258 case SCHED_SHUTTING_DOWN:
259 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
260 // If we are a worker, just exit. If we're a bound thread
261 // then we will exit below when we've removed our TSO from
262 // the run queue.
263 if (!isBoundTask(task) && emptyRunQueue(cap)) {
264 return cap;
265 }
266 break;
267 default:
268 barf("sched_state: %d", sched_state);
269 }
270
271 scheduleFindWork(&cap);
272
273 /* work pushing, currently relevant only for THREADED_RTS:
274 (pushes threads, wakes up idle capabilities for stealing) */
275 schedulePushWork(cap,task);
276
277 scheduleDetectDeadlock(&cap,task);
278
279 // Normally, the only way we can get here with no threads to
280 // run is if a keyboard interrupt received during
281 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
282 // Additionally, it is not fatal for the
283 // threaded RTS to reach here with no threads to run.
284 //
285 // win32: might be here due to awaitEvent() being abandoned
286 // as a result of a console event having been delivered.
287
288 #if defined(THREADED_RTS)
289 if (first)
290 {
291 // XXX: ToDo
292 // // don't yield the first time, we want a chance to run this
293 // // thread for a bit, even if there are others banging at the
294 // // door.
295 // first = rtsFalse;
296 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
297 }
298
299 scheduleYield(&cap,task);
300
301 if (emptyRunQueue(cap)) continue; // look for work again
302 #endif
303
304 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305 if ( emptyRunQueue(cap) ) {
306 ASSERT(sched_state >= SCHED_INTERRUPTING);
307 }
308 #endif
309
310 //
311 // Get a thread to run
312 //
313 t = popRunQueue(cap);
314
315 // Sanity check the thread we're about to run. This can be
316 // expensive if there is lots of thread switching going on...
317 IF_DEBUG(sanity,checkTSO(t));
318
319 #if defined(THREADED_RTS)
320 // Check whether we can run this thread in the current task.
321 // If not, we have to pass our capability to the right task.
322 {
323 InCall *bound = t->bound;
324
325 if (bound) {
326 if (bound->task == task) {
327 // yes, the Haskell thread is bound to the current native thread
328 } else {
329 debugTrace(DEBUG_sched,
330 "thread %lu bound to another OS thread",
331 (unsigned long)t->id);
332 // no, bound to a different Haskell thread: pass to that thread
333 pushOnRunQueue(cap,t);
334 continue;
335 }
336 } else {
337 // The thread we want to run is unbound.
338 if (task->incall->tso) {
339 debugTrace(DEBUG_sched,
340 "this OS thread cannot run thread %lu",
341 (unsigned long)t->id);
342 // no, the current native thread is bound to a different
343 // Haskell thread, so pass it to any worker thread
344 pushOnRunQueue(cap,t);
345 continue;
346 }
347 }
348 }
349 #endif
350
351 // If we're shutting down, and this thread has not yet been
352 // killed, kill it now. This sometimes happens when a finalizer
353 // thread is created by the final GC, or a thread previously
354 // in a foreign call returns.
355 if (sched_state >= SCHED_INTERRUPTING &&
356 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357 deleteThread(cap,t);
358 }
359
360 // If this capability is disabled, migrate the thread away rather
361 // than running it. NB. but not if the thread is bound: it is
362 // really hard for a bound thread to migrate itself. Believe me,
363 // I tried several ways and couldn't find a way to do it.
364 // Instead, when everything is stopped for GC, we migrate all the
365 // threads on the run queue then (see scheduleDoGC()).
366 //
367 // ToDo: what about TSO_LOCKED? Currently we're migrating those
368 // when the number of capabilities drops, but we never migrate
369 // them back if it rises again. Presumably we should, but after
370 // the thread has been migrated we no longer know what capability
371 // it was originally on.
372 #ifdef THREADED_RTS
373 if (cap->disabled && !t->bound) {
374 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 migrateThread(cap, t, dest_cap);
376 continue;
377 }
378 #endif
379
380 /* context switches are initiated by the timer signal, unless
381 * the user specified "context switch as often as possible", with
382 * +RTS -C0
383 */
384 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 && !emptyThreadQueues(cap)) {
386 cap->context_switch = 1;
387 }
388
389 run_thread:
390
391 // CurrentTSO is the thread to run. It might be different if we
392 // loop back to run_thread, so make sure to set CurrentTSO after
393 // that.
394 cap->r.rCurrentTSO = t;
395
396 startHeapProfTimer();
397
398 // ----------------------------------------------------------------------
399 // Run the current thread
400
401 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 ASSERT(t->cap == cap);
403 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
404
405 prev_what_next = t->what_next;
406
407 errno = t->saved_errno;
408 #if mingw32_HOST_OS
409 SetLastError(t->saved_winerror);
410 #endif
411
412 // reset the interrupt flag before running Haskell code
413 cap->interrupt = 0;
414
415 cap->in_haskell = rtsTrue;
416 cap->idle = 0;
417
418 dirty_TSO(cap,t);
419 dirty_STACK(cap,t->stackobj);
420
421 switch (recent_activity)
422 {
423 case ACTIVITY_DONE_GC: {
424 // ACTIVITY_DONE_GC means we turned off the timer signal to
425 // conserve power (see #1623). Re-enable it here.
426 uint32_t prev;
427 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428 if (prev == ACTIVITY_DONE_GC) {
429 #ifndef PROFILING
430 startTimer();
431 #endif
432 }
433 break;
434 }
435 case ACTIVITY_INACTIVE:
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 break;
441 default:
442 recent_activity = ACTIVITY_YES;
443 }
444
445 traceEventRunThread(cap, t);
446
447 switch (prev_what_next) {
448
449 case ThreadKilled:
450 case ThreadComplete:
451 /* Thread already finished, return to scheduler. */
452 ret = ThreadFinished;
453 break;
454
455 case ThreadRunGHC:
456 {
457 StgRegTable *r;
458 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459 cap = regTableToCapability(r);
460 ret = r->rRet;
461 break;
462 }
463
464 case ThreadInterpret:
465 cap = interpretBCO(cap);
466 ret = cap->r.rRet;
467 break;
468
469 default:
470 barf("schedule: invalid what_next field");
471 }
472
473 cap->in_haskell = rtsFalse;
474
475 // The TSO might have moved, eg. if it re-entered the RTS and a GC
476 // happened. So find the new location:
477 t = cap->r.rCurrentTSO;
478
479 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
480 // don't want it set when not running a Haskell thread.
481 cap->r.rCurrentTSO = NULL;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 scheduleDoGC(&cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 static void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
578 IF_DEBUG(sanity, checkRunQueue(cap));
579 }
580
581 void
582 promoteInRunQueue (Capability *cap, StgTSO *tso)
583 {
584 removeFromRunQueue(cap, tso);
585 pushOnRunQueue(cap, tso);
586 }
587
588 /* ----------------------------------------------------------------------------
589 * Setting up the scheduler loop
590 * ------------------------------------------------------------------------- */
591
592 static void
593 schedulePreLoop(void)
594 {
595 // initialisation for scheduler - what cannot go into initScheduler()
596
597 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
598 win32AllocStack();
599 #endif
600 }
601
602 /* -----------------------------------------------------------------------------
603 * scheduleFindWork()
604 *
605 * Search for work to do, and handle messages from elsewhere.
606 * -------------------------------------------------------------------------- */
607
608 static void
609 scheduleFindWork (Capability **pcap)
610 {
611 scheduleStartSignalHandlers(*pcap);
612
613 scheduleProcessInbox(pcap);
614
615 scheduleCheckBlockedThreads(*pcap);
616
617 #if defined(THREADED_RTS)
618 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
619 #endif
620 }
621
622 #if defined(THREADED_RTS)
623 STATIC_INLINE rtsBool
624 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
625 {
626 // we need to yield this capability to someone else if..
627 // - another thread is initiating a GC, and we didn't just do a GC
628 // (see Note [GC livelock])
629 // - another Task is returning from a foreign call
630 // - the thread at the head of the run queue cannot be run
631 // by this Task (it is bound to another Task, or it is unbound
632 // and this task it bound).
633 //
634 // Note [GC livelock]
635 //
636 // If we are interrupted to do a GC, then we do not immediately do
637 // another one. This avoids a starvation situation where one
638 // Capability keeps forcing a GC and the other Capabilities make no
639 // progress at all.
640
641 return ((pending_sync && !didGcLast) ||
642 cap->returning_tasks_hd != NULL ||
643 (!emptyRunQueue(cap) && (task->incall->tso == NULL
644 ? peekRunQueue(cap)->bound != NULL
645 : peekRunQueue(cap)->bound != task->incall)));
646 }
647
648 // This is the single place where a Task goes to sleep. There are
649 // two reasons it might need to sleep:
650 // - there are no threads to run
651 // - we need to yield this Capability to someone else
652 // (see shouldYieldCapability())
653 //
654 // Careful: the scheduler loop is quite delicate. Make sure you run
655 // the tests in testsuite/concurrent (all ways) after modifying this,
656 // and also check the benchmarks in nofib/parallel for regressions.
657
658 static void
659 scheduleYield (Capability **pcap, Task *task)
660 {
661 Capability *cap = *pcap;
662 int didGcLast = rtsFalse;
663
664 // if we have work, and we don't need to give up the Capability, continue.
665 //
666 if (!shouldYieldCapability(cap,task,rtsFalse) &&
667 (!emptyRunQueue(cap) ||
668 !emptyInbox(cap) ||
669 sched_state >= SCHED_INTERRUPTING)) {
670 return;
671 }
672
673 // otherwise yield (sleep), and keep yielding if necessary.
674 do {
675 didGcLast = yieldCapability(&cap,task, !didGcLast);
676 }
677 while (shouldYieldCapability(cap,task,didGcLast));
678
679 // note there may still be no threads on the run queue at this
680 // point, the caller has to check.
681
682 *pcap = cap;
683 return;
684 }
685 #endif
686
687 /* -----------------------------------------------------------------------------
688 * schedulePushWork()
689 *
690 * Push work to other Capabilities if we have some.
691 * -------------------------------------------------------------------------- */
692
693 static void
694 schedulePushWork(Capability *cap USED_IF_THREADS,
695 Task *task USED_IF_THREADS)
696 {
697 /* following code not for PARALLEL_HASKELL. I kept the call general,
698 future GUM versions might use pushing in a distributed setup */
699 #if defined(THREADED_RTS)
700
701 Capability *free_caps[n_capabilities], *cap0;
702 uint32_t i, n_wanted_caps, n_free_caps;
703 StgTSO *t;
704
705 // migration can be turned off with +RTS -qm
706 if (!RtsFlags.ParFlags.migrate) return;
707
708 // Check whether we have more threads on our run queue, or sparks
709 // in our pool, that we could hand to another Capability.
710 if (emptyRunQueue(cap)) {
711 if (sparkPoolSizeCap(cap) < 2) return;
712 } else {
713 if (singletonRunQueue(cap) &&
714 sparkPoolSizeCap(cap) < 1) return;
715 }
716
717 // Figure out how many capabilities we want to wake up. We need at least
718 // sparkPoolSize(cap) plus the number of spare threads we have.
719 t = cap->run_queue_hd;
720 n_wanted_caps = sparkPoolSizeCap(cap);
721 if (t != END_TSO_QUEUE) {
722 do {
723 t = t->_link;
724 if (t == END_TSO_QUEUE) break;
725 n_wanted_caps++;
726 } while (n_wanted_caps < n_capabilities-1);
727 }
728
729 // First grab as many free Capabilities as we can. ToDo: we should use
730 // capabilities on the same NUMA node preferably, but not exclusively.
731 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
732 n_free_caps < n_wanted_caps && i != cap->no;
733 i = (i + 1) % n_capabilities) {
734 cap0 = capabilities[i];
735 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
736 if (!emptyRunQueue(cap0)
737 || cap0->returning_tasks_hd != NULL
738 || cap0->inbox != (Message*)END_TSO_QUEUE) {
739 // it already has some work, we just grabbed it at
740 // the wrong moment. Or maybe it's deadlocked!
741 releaseCapability(cap0);
742 } else {
743 free_caps[n_free_caps++] = cap0;
744 }
745 }
746 }
747
748 // we now have n_free_caps free capabilities stashed in
749 // free_caps[]. Share our run queue equally with them. This is
750 // probably the simplest thing we could do; improvements we might
751 // want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761 #ifdef SPARK_PUSHING
762 rtsBool pushed_to_all;
763 #endif
764
765 debugTrace(DEBUG_sched,
766 "cap %d: %s and %d free capabilities, sharing...",
767 cap->no,
768 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
769 "excess threads on run queue":"sparks to share (>=2)",
770 n_free_caps);
771
772 i = 0;
773 #ifdef SPARK_PUSHING
774 pushed_to_all = rtsFalse;
775 #endif
776
777 if (cap->run_queue_hd != END_TSO_QUEUE) {
778 prev = cap->run_queue_hd;
779 t = prev->_link;
780 prev->_link = END_TSO_QUEUE;
781 for (; t != END_TSO_QUEUE; t = next) {
782 next = t->_link;
783 t->_link = END_TSO_QUEUE;
784 if (t->bound == task->incall // don't move my bound thread
785 || tsoLocked(t)) { // don't move a locked thread
786 setTSOLink(cap, prev, t);
787 setTSOPrev(cap, t, prev);
788 prev = t;
789 } else if (i == n_free_caps) {
790 #ifdef SPARK_PUSHING
791 pushed_to_all = rtsTrue;
792 #endif
793 i = 0;
794 // keep one for us
795 setTSOLink(cap, prev, t);
796 setTSOPrev(cap, t, prev);
797 prev = t;
798 } else {
799 appendToRunQueue(free_caps[i],t);
800
801 traceEventMigrateThread (cap, t, free_caps[i]->no);
802
803 if (t->bound) { t->bound->task->cap = free_caps[i]; }
804 t->cap = free_caps[i];
805 i++;
806 }
807 }
808 cap->run_queue_tl = prev;
809
810 IF_DEBUG(sanity, checkRunQueue(cap));
811 }
812
813 #ifdef SPARK_PUSHING
814 /* JB I left this code in place, it would work but is not necessary */
815
816 // If there are some free capabilities that we didn't push any
817 // threads to, then try to push a spark to each one.
818 if (!pushed_to_all) {
819 StgClosure *spark;
820 // i is the next free capability to push to
821 for (; i < n_free_caps; i++) {
822 if (emptySparkPoolCap(free_caps[i])) {
823 spark = tryStealSpark(cap->sparks);
824 if (spark != NULL) {
825 /* TODO: if anyone wants to re-enable this code then
826 * they must consider the fizzledSpark(spark) case
827 * and update the per-cap spark statistics.
828 */
829 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
830
831 traceEventStealSpark(free_caps[i], t, cap->no);
832
833 newSpark(&(free_caps[i]->r), spark);
834 }
835 }
836 }
837 }
838 #endif /* SPARK_PUSHING */
839
840 // release the capabilities
841 for (i = 0; i < n_free_caps; i++) {
842 task->cap = free_caps[i];
843 if (sparkPoolSizeCap(cap) > 0) {
844 // If we have sparks to steal, wake up a worker on the
845 // capability, even if it has no threads to run.
846 releaseAndWakeupCapability(free_caps[i]);
847 } else {
848 releaseCapability(free_caps[i]);
849 }
850 }
851 }
852 task->cap = cap; // reset to point to our Capability.
853
854 #endif /* THREADED_RTS */
855
856 }
857
858 /* ----------------------------------------------------------------------------
859 * Start any pending signal handlers
860 * ------------------------------------------------------------------------- */
861
862 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
863 static void
864 scheduleStartSignalHandlers(Capability *cap)
865 {
866 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
867 // safe outside the lock
868 startSignalHandlers(cap);
869 }
870 }
871 #else
872 static void
873 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
874 {
875 }
876 #endif
877
878 /* ----------------------------------------------------------------------------
879 * Check for blocked threads that can be woken up.
880 * ------------------------------------------------------------------------- */
881
882 static void
883 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
884 {
885 #if !defined(THREADED_RTS)
886 //
887 // Check whether any waiting threads need to be woken up. If the
888 // run queue is empty, and there are no other tasks running, we
889 // can wait indefinitely for something to happen.
890 //
891 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
892 {
893 awaitEvent (emptyRunQueue(cap));
894 }
895 #endif
896 }
897
898 /* ----------------------------------------------------------------------------
899 * Detect deadlock conditions and attempt to resolve them.
900 * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleDetectDeadlock (Capability **pcap, Task *task)
904 {
905 Capability *cap = *pcap;
906 /*
907 * Detect deadlock: when we have no threads to run, there are no
908 * threads blocked, waiting for I/O, or sleeping, and all the
909 * other tasks are waiting for work, we must have a deadlock of
910 * some description.
911 */
912 if ( emptyThreadQueues(cap) )
913 {
914 #if defined(THREADED_RTS)
915 /*
916 * In the threaded RTS, we only check for deadlock if there
917 * has been no activity in a complete timeslice. This means
918 * we won't eagerly start a full GC just because we don't have
919 * any threads to run currently.
920 */
921 if (recent_activity != ACTIVITY_INACTIVE) return;
922 #endif
923
924 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
925
926 // Garbage collection can release some new threads due to
927 // either (a) finalizers or (b) threads resurrected because
928 // they are unreachable and will therefore be sent an
929 // exception. Any threads thus released will be immediately
930 // runnable.
931 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
932 cap = *pcap;
933 // when force_major == rtsTrue. scheduleDoGC sets
934 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
935 // signal.
936
937 if ( !emptyRunQueue(cap) ) return;
938
939 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
940 /* If we have user-installed signal handlers, then wait
941 * for signals to arrive rather then bombing out with a
942 * deadlock.
943 */
944 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
945 debugTrace(DEBUG_sched,
946 "still deadlocked, waiting for signals...");
947
948 awaitUserSignals();
949
950 if (signals_pending()) {
951 startSignalHandlers(cap);
952 }
953
954 // either we have threads to run, or we were interrupted:
955 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
956
957 return;
958 }
959 #endif
960
961 #if !defined(THREADED_RTS)
962 /* Probably a real deadlock. Send the current main thread the
963 * Deadlock exception.
964 */
965 if (task->incall->tso) {
966 switch (task->incall->tso->why_blocked) {
967 case BlockedOnSTM:
968 case BlockedOnBlackHole:
969 case BlockedOnMsgThrowTo:
970 case BlockedOnMVar:
971 case BlockedOnMVarRead:
972 throwToSingleThreaded(cap, task->incall->tso,
973 (StgClosure *)nonTermination_closure);
974 return;
975 default:
976 barf("deadlock: main thread blocked in a strange way");
977 }
978 }
979 return;
980 #endif
981 }
982 }
983
984
985 /* ----------------------------------------------------------------------------
986 * Process message in the current Capability's inbox
987 * ------------------------------------------------------------------------- */
988
989 static void
990 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
991 {
992 #if defined(THREADED_RTS)
993 Message *m, *next;
994 int r;
995 Capability *cap = *pcap;
996
997 while (!emptyInbox(cap)) {
998 if (cap->r.rCurrentNursery->link == NULL ||
999 g0->n_new_large_words >= large_alloc_lim) {
1000 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1001 cap = *pcap;
1002 }
1003
1004 // don't use a blocking acquire; if the lock is held by
1005 // another thread then just carry on. This seems to avoid
1006 // getting stuck in a message ping-pong situation with other
1007 // processors. We'll check the inbox again later anyway.
1008 //
1009 // We should really use a more efficient queue data structure
1010 // here. The trickiness is that we must ensure a Capability
1011 // never goes idle if the inbox is non-empty, which is why we
1012 // use cap->lock (cap->lock is released as the last thing
1013 // before going idle; see Capability.c:releaseCapability()).
1014 r = TRY_ACQUIRE_LOCK(&cap->lock);
1015 if (r != 0) return;
1016
1017 m = cap->inbox;
1018 cap->inbox = (Message*)END_TSO_QUEUE;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027 }
1028 #endif
1029 }
1030
1031 /* ----------------------------------------------------------------------------
1032 * Activate spark threads (THREADED_RTS)
1033 * ------------------------------------------------------------------------- */
1034
1035 #if defined(THREADED_RTS)
1036 static void
1037 scheduleActivateSpark(Capability *cap)
1038 {
1039 if (anySparks() && !cap->disabled)
1040 {
1041 createSparkThread(cap);
1042 debugTrace(DEBUG_sched, "creating a spark thread");
1043 }
1044 }
1045 #endif // THREADED_RTS
1046
1047 /* ----------------------------------------------------------------------------
1048 * After running a thread...
1049 * ------------------------------------------------------------------------- */
1050
1051 static void
1052 schedulePostRunThread (Capability *cap, StgTSO *t)
1053 {
1054 // We have to be able to catch transactions that are in an
1055 // infinite loop as a result of seeing an inconsistent view of
1056 // memory, e.g.
1057 //
1058 // atomically $ do
1059 // [a,b] <- mapM readTVar [ta,tb]
1060 // when (a == b) loop
1061 //
1062 // and a is never equal to b given a consistent view of memory.
1063 //
1064 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1065 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1066 debugTrace(DEBUG_sched | DEBUG_stm,
1067 "trec %p found wasting its time", t);
1068
1069 // strip the stack back to the
1070 // ATOMICALLY_FRAME, aborting the (nested)
1071 // transaction, and saving the stack of any
1072 // partially-evaluated thunks on the heap.
1073 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1074
1075 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1076 }
1077 }
1078
1079 //
1080 // If the current thread's allocation limit has run out, send it
1081 // the AllocationLimitExceeded exception.
1082
1083 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1084 // Use a throwToSelf rather than a throwToSingleThreaded, because
1085 // it correctly handles the case where the thread is currently
1086 // inside mask. Also the thread might be blocked (e.g. on an
1087 // MVar), and throwToSingleThreaded doesn't unblock it
1088 // correctly in that case.
1089 throwToSelf(cap, t, allocationLimitExceeded_closure);
1090 ASSIGN_Int64((W_*)&(t->alloc_limit),
1091 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1092 }
1093
1094 /* some statistics gathering in the parallel case */
1095 }
1096
1097 /* -----------------------------------------------------------------------------
1098 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1099 * -------------------------------------------------------------------------- */
1100
1101 static rtsBool
1102 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1103 {
1104 if (cap->r.rHpLim == NULL || cap->context_switch) {
1105 // Sometimes we miss a context switch, e.g. when calling
1106 // primitives in a tight loop, MAYBE_GC() doesn't check the
1107 // context switch flag, and we end up waiting for a GC.
1108 // See #1984, and concurrent/should_run/1984
1109 cap->context_switch = 0;
1110 appendToRunQueue(cap,t);
1111 } else {
1112 pushOnRunQueue(cap,t);
1113 }
1114
1115 // did the task ask for a large block?
1116 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1117 // if so, get one and push it on the front of the nursery.
1118 bdescr *bd;
1119 W_ blocks;
1120
1121 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1122
1123 if (blocks > BLOCKS_PER_MBLOCK) {
1124 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1125 }
1126
1127 debugTrace(DEBUG_sched,
1128 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1129 (long)t->id, what_next_strs[t->what_next], blocks);
1130
1131 // don't do this if the nursery is (nearly) full, we'll GC first.
1132 if (cap->r.rCurrentNursery->link != NULL ||
1133 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1134 // infinite loop if the
1135 // nursery has only one
1136 // block.
1137
1138 bd = allocGroupOnNode_lock(cap->node,blocks);
1139 cap->r.rNursery->n_blocks += blocks;
1140
1141 // link the new group after CurrentNursery
1142 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1143
1144 // initialise it as a nursery block. We initialise the
1145 // step, gen_no, and flags field of *every* sub-block in
1146 // this large block, because this is easier than making
1147 // sure that we always find the block head of a large
1148 // block whenever we call Bdescr() (eg. evacuate() and
1149 // isAlive() in the GC would both have to do this, at
1150 // least).
1151 {
1152 bdescr *x;
1153 for (x = bd; x < bd + blocks; x++) {
1154 initBdescr(x,g0,g0);
1155 x->free = x->start;
1156 x->flags = 0;
1157 }
1158 }
1159
1160 // This assert can be a killer if the app is doing lots
1161 // of large block allocations.
1162 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1163
1164 // now update the nursery to point to the new block
1165 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1166 cap->r.rCurrentNursery = bd;
1167
1168 // we might be unlucky and have another thread get on the
1169 // run queue before us and steal the large block, but in that
1170 // case the thread will just end up requesting another large
1171 // block.
1172 return rtsFalse; /* not actually GC'ing */
1173 }
1174 }
1175
1176 // if we got here because we exceeded large_alloc_lim, then
1177 // proceed straight to GC.
1178 if (g0->n_new_large_words >= large_alloc_lim) {
1179 return rtsTrue;
1180 }
1181
1182 // Otherwise, we just ran out of space in the current nursery.
1183 // Grab another nursery if we can.
1184 if (getNewNursery(cap)) {
1185 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1186 return rtsFalse;
1187 }
1188
1189 return rtsTrue;
1190 /* actual GC is done at the end of the while loop in schedule() */
1191 }
1192
1193 /* -----------------------------------------------------------------------------
1194 * Handle a thread that returned to the scheduler with ThreadYielding
1195 * -------------------------------------------------------------------------- */
1196
1197 static rtsBool
1198 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1199 {
1200 /* put the thread back on the run queue. Then, if we're ready to
1201 * GC, check whether this is the last task to stop. If so, wake
1202 * up the GC thread. getThread will block during a GC until the
1203 * GC is finished.
1204 */
1205
1206 ASSERT(t->_link == END_TSO_QUEUE);
1207
1208 // Shortcut if we're just switching evaluators: don't bother
1209 // doing stack squeezing (which can be expensive), just run the
1210 // thread.
1211 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1212 debugTrace(DEBUG_sched,
1213 "--<< thread %ld (%s) stopped to switch evaluators",
1214 (long)t->id, what_next_strs[t->what_next]);
1215 return rtsTrue;
1216 }
1217
1218 // Reset the context switch flag. We don't do this just before
1219 // running the thread, because that would mean we would lose ticks
1220 // during GC, which can lead to unfair scheduling (a thread hogs
1221 // the CPU because the tick always arrives during GC). This way
1222 // penalises threads that do a lot of allocation, but that seems
1223 // better than the alternative.
1224 if (cap->context_switch != 0) {
1225 cap->context_switch = 0;
1226 appendToRunQueue(cap,t);
1227 } else {
1228 pushOnRunQueue(cap,t);
1229 }
1230
1231 IF_DEBUG(sanity,
1232 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1233 checkTSO(t));
1234
1235 return rtsFalse;
1236 }
1237
1238 /* -----------------------------------------------------------------------------
1239 * Handle a thread that returned to the scheduler with ThreadBlocked
1240 * -------------------------------------------------------------------------- */
1241
1242 static void
1243 scheduleHandleThreadBlocked( StgTSO *t
1244 #if !defined(DEBUG)
1245 STG_UNUSED
1246 #endif
1247 )
1248 {
1249
1250 // We don't need to do anything. The thread is blocked, and it
1251 // has tidied up its stack and placed itself on whatever queue
1252 // it needs to be on.
1253
1254 // ASSERT(t->why_blocked != NotBlocked);
1255 // Not true: for example,
1256 // - the thread may have woken itself up already, because
1257 // threadPaused() might have raised a blocked throwTo
1258 // exception, see maybePerformBlockedException().
1259
1260 #ifdef DEBUG
1261 traceThreadStatus(DEBUG_sched, t);
1262 #endif
1263 }
1264
1265 /* -----------------------------------------------------------------------------
1266 * Handle a thread that returned to the scheduler with ThreadFinished
1267 * -------------------------------------------------------------------------- */
1268
1269 static rtsBool
1270 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1271 {
1272 /* Need to check whether this was a main thread, and if so,
1273 * return with the return value.
1274 *
1275 * We also end up here if the thread kills itself with an
1276 * uncaught exception, see Exception.cmm.
1277 */
1278
1279 // blocked exceptions can now complete, even if the thread was in
1280 // blocked mode (see #2910).
1281 awakenBlockedExceptionQueue (cap, t);
1282
1283 //
1284 // Check whether the thread that just completed was a bound
1285 // thread, and if so return with the result.
1286 //
1287 // There is an assumption here that all thread completion goes
1288 // through this point; we need to make sure that if a thread
1289 // ends up in the ThreadKilled state, that it stays on the run
1290 // queue so it can be dealt with here.
1291 //
1292
1293 if (t->bound) {
1294
1295 if (t->bound != task->incall) {
1296 #if !defined(THREADED_RTS)
1297 // Must be a bound thread that is not the topmost one. Leave
1298 // it on the run queue until the stack has unwound to the
1299 // point where we can deal with this. Leaving it on the run
1300 // queue also ensures that the garbage collector knows about
1301 // this thread and its return value (it gets dropped from the
1302 // step->threads list so there's no other way to find it).
1303 appendToRunQueue(cap,t);
1304 return rtsFalse;
1305 #else
1306 // this cannot happen in the threaded RTS, because a
1307 // bound thread can only be run by the appropriate Task.
1308 barf("finished bound thread that isn't mine");
1309 #endif
1310 }
1311
1312 ASSERT(task->incall->tso == t);
1313
1314 if (t->what_next == ThreadComplete) {
1315 if (task->incall->ret) {
1316 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1317 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1318 }
1319 task->incall->rstat = Success;
1320 } else {
1321 if (task->incall->ret) {
1322 *(task->incall->ret) = NULL;
1323 }
1324 if (sched_state >= SCHED_INTERRUPTING) {
1325 if (heap_overflow) {
1326 task->incall->rstat = HeapExhausted;
1327 } else {
1328 task->incall->rstat = Interrupted;
1329 }
1330 } else {
1331 task->incall->rstat = Killed;
1332 }
1333 }
1334 #ifdef DEBUG
1335 removeThreadLabel((StgWord)task->incall->tso->id);
1336 #endif
1337
1338 // We no longer consider this thread and task to be bound to
1339 // each other. The TSO lives on until it is GC'd, but the
1340 // task is about to be released by the caller, and we don't
1341 // want anyone following the pointer from the TSO to the
1342 // defunct task (which might have already been
1343 // re-used). This was a real bug: the GC updated
1344 // tso->bound->tso which lead to a deadlock.
1345 t->bound = NULL;
1346 task->incall->tso = NULL;
1347
1348 return rtsTrue; // tells schedule() to return
1349 }
1350
1351 return rtsFalse;
1352 }
1353
1354 /* -----------------------------------------------------------------------------
1355 * Perform a heap census
1356 * -------------------------------------------------------------------------- */
1357
1358 static rtsBool
1359 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1360 {
1361 // When we have +RTS -i0 and we're heap profiling, do a census at
1362 // every GC. This lets us get repeatable runs for debugging.
1363 if (performHeapProfile ||
1364 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1365 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1366 return rtsTrue;
1367 } else {
1368 return rtsFalse;
1369 }
1370 }
1371
1372 /* -----------------------------------------------------------------------------
1373 * stopAllCapabilities()
1374 *
1375 * Stop all Haskell execution. This is used when we need to make some global
1376 * change to the system, such as altering the number of capabilities, or
1377 * forking.
1378 *
1379 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1380 * -------------------------------------------------------------------------- */
1381
1382 #if defined(THREADED_RTS)
1383 static void stopAllCapabilities (Capability **pCap, Task *task)
1384 {
1385 rtsBool was_syncing;
1386 SyncType prev_sync_type;
1387
1388 PendingSync sync = {
1389 .type = SYNC_OTHER,
1390 .idle = NULL,
1391 .task = task
1392 };
1393
1394 do {
1395 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1396 } while (was_syncing);
1397
1398 acquireAllCapabilities(*pCap,task);
1399
1400 pending_sync = 0;
1401 }
1402 #endif
1403
1404 /* -----------------------------------------------------------------------------
1405 * requestSync()
1406 *
1407 * Commence a synchronisation between all capabilities. Normally not called
1408 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1409 * has some special synchronisation requirements.
1410 *
1411 * Returns:
1412 * rtsFalse if we successfully got a sync
1413 * rtsTrue if there was another sync request in progress,
1414 * and we yielded to it. The value returned is the
1415 * type of the other sync request.
1416 * -------------------------------------------------------------------------- */
1417
1418 #if defined(THREADED_RTS)
1419 static rtsBool requestSync (
1420 Capability **pcap, Task *task, PendingSync *new_sync,
1421 SyncType *prev_sync_type)
1422 {
1423 PendingSync *sync;
1424
1425 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1426 (StgWord)NULL,
1427 (StgWord)new_sync);
1428
1429 if (sync != NULL)
1430 {
1431 // sync is valid until we have called yieldCapability().
1432 // After the sync is completed, we cannot read that struct any
1433 // more because it has been freed.
1434 *prev_sync_type = sync->type;
1435 do {
1436 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1437 sync->type);
1438 ASSERT(*pcap);
1439 yieldCapability(pcap,task,rtsTrue);
1440 sync = pending_sync;
1441 } while (sync != NULL);
1442
1443 // NOTE: task->cap might have changed now
1444 return rtsTrue;
1445 }
1446 else
1447 {
1448 return rtsFalse;
1449 }
1450 }
1451 #endif
1452
1453 /* -----------------------------------------------------------------------------
1454 * acquireAllCapabilities()
1455 *
1456 * Grab all the capabilities except the one we already hold. Used
1457 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1458 * before a fork (SYNC_OTHER).
1459 *
1460 * Only call this after requestSync(), otherwise a deadlock might
1461 * ensue if another thread is trying to synchronise.
1462 * -------------------------------------------------------------------------- */
1463
1464 #ifdef THREADED_RTS
1465 static void acquireAllCapabilities(Capability *cap, Task *task)
1466 {
1467 Capability *tmpcap;
1468 uint32_t i;
1469
1470 ASSERT(pending_sync != NULL);
1471 for (i=0; i < n_capabilities; i++) {
1472 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1473 i, n_capabilities);
1474 tmpcap = capabilities[i];
1475 if (tmpcap != cap) {
1476 // we better hope this task doesn't get migrated to
1477 // another Capability while we're waiting for this one.
1478 // It won't, because load balancing happens while we have
1479 // all the Capabilities, but even so it's a slightly
1480 // unsavoury invariant.
1481 task->cap = tmpcap;
1482 waitForCapability(&tmpcap, task);
1483 if (tmpcap->no != i) {
1484 barf("acquireAllCapabilities: got the wrong capability");
1485 }
1486 }
1487 }
1488 task->cap = cap;
1489 }
1490 #endif
1491
1492 /* -----------------------------------------------------------------------------
1493 * releaseAllcapabilities()
1494 *
1495 * Assuming this thread holds all the capabilities, release them all except for
1496 * the one passed in as cap.
1497 * -------------------------------------------------------------------------- */
1498
1499 #ifdef THREADED_RTS
1500 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1501 {
1502 uint32_t i;
1503
1504 for (i = 0; i < n; i++) {
1505 if (cap->no != i) {
1506 task->cap = capabilities[i];
1507 releaseCapability(capabilities[i]);
1508 }
1509 }
1510 task->cap = cap;
1511 }
1512 #endif
1513
1514 /* -----------------------------------------------------------------------------
1515 * Perform a garbage collection if necessary
1516 * -------------------------------------------------------------------------- */
1517
1518 static void
1519 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1520 rtsBool force_major)
1521 {
1522 Capability *cap = *pcap;
1523 rtsBool heap_census;
1524 uint32_t collect_gen;
1525 rtsBool major_gc;
1526 #ifdef THREADED_RTS
1527 uint32_t gc_type;
1528 uint32_t i;
1529 uint32_t need_idle;
1530 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1531 StgTSO *tso;
1532 rtsBool *idle_cap;
1533 #endif
1534
1535 if (sched_state == SCHED_SHUTTING_DOWN) {
1536 // The final GC has already been done, and the system is
1537 // shutting down. We'll probably deadlock if we try to GC
1538 // now.
1539 return;
1540 }
1541
1542 heap_census = scheduleNeedHeapProfile(rtsTrue);
1543
1544 // Figure out which generation we are collecting, so that we can
1545 // decide whether this is a parallel GC or not.
1546 collect_gen = calcNeeded(force_major || heap_census, NULL);
1547 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1548
1549 #ifdef THREADED_RTS
1550 if (sched_state < SCHED_INTERRUPTING
1551 && RtsFlags.ParFlags.parGcEnabled
1552 && collect_gen >= RtsFlags.ParFlags.parGcGen
1553 && ! oldest_gen->mark)
1554 {
1555 gc_type = SYNC_GC_PAR;
1556 } else {
1557 gc_type = SYNC_GC_SEQ;
1558 }
1559
1560 if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
1561 need_idle = stg_max(0, enabled_capabilities -
1562 RtsFlags.ParFlags.parGcThreads);
1563 } else {
1564 need_idle = 0;
1565 }
1566
1567 // In order to GC, there must be no threads running Haskell code.
1568 // Therefore, the GC thread needs to hold *all* the capabilities,
1569 // and release them after the GC has completed.
1570 //
1571 // This seems to be the simplest way: previous attempts involved
1572 // making all the threads with capabilities give up their
1573 // capabilities and sleep except for the *last* one, which
1574 // actually did the GC. But it's quite hard to arrange for all
1575 // the other tasks to sleep and stay asleep.
1576 //
1577
1578 /* Other capabilities are prevented from running yet more Haskell
1579 threads if pending_sync is set. Tested inside
1580 yieldCapability() and releaseCapability() in Capability.c */
1581
1582 PendingSync sync = {
1583 .type = gc_type,
1584 .idle = NULL,
1585 .task = task
1586 };
1587
1588 {
1589 SyncType prev_sync = 0;
1590 rtsBool was_syncing;
1591 do {
1592 // We need an array of size n_capabilities, but since this may
1593 // change each time around the loop we must allocate it afresh.
1594 idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
1595 sizeof(rtsBool),
1596 "scheduleDoGC");
1597 sync.idle = idle_cap;
1598
1599 // When using +RTS -qn, we need some capabilities to be idle during
1600 // GC. The best bet is to choose some inactive ones, so we look for
1601 // those first:
1602 uint32_t n_idle = need_idle;
1603 for (i=0; i < n_capabilities; i++) {
1604 if (capabilities[i]->disabled) {
1605 idle_cap[i] = rtsTrue;
1606 } else if (n_idle > 0 &&
1607 capabilities[i]->running_task == NULL) {
1608 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1609 n_idle--;
1610 idle_cap[i] = rtsTrue;
1611 } else {
1612 idle_cap[i] = rtsFalse;
1613 }
1614 }
1615 // If we didn't find enough inactive capabilities, just pick some
1616 // more to be idle.
1617 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1618 if (!idle_cap[i] && i != cap->no) {
1619 idle_cap[i] = rtsTrue;
1620 n_idle--;
1621 }
1622 }
1623 ASSERT(n_idle == 0);
1624
1625 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1626 cap = *pcap;
1627 if (was_syncing) {
1628 stgFree(idle_cap);
1629 }
1630 if (was_syncing &&
1631 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1632 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1633 // someone else had a pending sync request for a GC, so
1634 // let's assume GC has been done and we don't need to GC
1635 // again.
1636 // Exception to this: if SCHED_INTERRUPTING, then we still
1637 // need to do the final GC.
1638 return;
1639 }
1640 if (sched_state == SCHED_SHUTTING_DOWN) {
1641 // The scheduler might now be shutting down. We tested
1642 // this above, but it might have become true since then as
1643 // we yielded the capability in requestSync().
1644 return;
1645 }
1646 } while (was_syncing);
1647 }
1648
1649 #ifdef DEBUG
1650 unsigned int old_n_capabilities = n_capabilities;
1651 #endif
1652
1653 interruptAllCapabilities();
1654
1655 // The final shutdown GC is always single-threaded, because it's
1656 // possible that some of the Capabilities have no worker threads.
1657
1658 if (gc_type == SYNC_GC_SEQ) {
1659 traceEventRequestSeqGc(cap);
1660 } else {
1661 traceEventRequestParGc(cap);
1662 }
1663
1664 if (gc_type == SYNC_GC_SEQ) {
1665 // single-threaded GC: grab all the capabilities
1666 acquireAllCapabilities(cap,task);
1667 }
1668 else
1669 {
1670 // If we are load-balancing collections in this
1671 // generation, then we require all GC threads to participate
1672 // in the collection. Otherwise, we only require active
1673 // threads to participate, and we set gc_threads[i]->idle for
1674 // any idle capabilities. The rationale here is that waking
1675 // up an idle Capability takes much longer than just doing any
1676 // GC work on its behalf.
1677
1678 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1679 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1680 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1681 {
1682 for (i=0; i < n_capabilities; i++) {
1683 if (capabilities[i]->disabled) {
1684 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1685 if (idle_cap[i]) {
1686 n_idle_caps++;
1687 }
1688 } else {
1689 if (i != cap->no && idle_cap[i]) {
1690 Capability *tmpcap = capabilities[i];
1691 task->cap = tmpcap;
1692 waitForCapability(&tmpcap, task);
1693 n_idle_caps++;
1694 }
1695 }
1696 }
1697 }
1698 else
1699 {
1700 for (i=0; i < n_capabilities; i++) {
1701 if (capabilities[i]->disabled) {
1702 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1703 if (idle_cap[i]) {
1704 n_idle_caps++;
1705 }
1706 } else if (i != cap->no &&
1707 capabilities[i]->idle >=
1708 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1709 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1710 if (idle_cap[i]) {
1711 n_idle_caps++;
1712 } else {
1713 n_failed_trygrab_idles++;
1714 }
1715 }
1716 }
1717 }
1718 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1719
1720 // We set the gc_thread[i]->idle flag if that
1721 // capability/thread is not participating in this collection.
1722 // We also keep a local record of which capabilities are idle
1723 // in idle_cap[], because scheduleDoGC() is re-entrant:
1724 // another thread might start a GC as soon as we've finished
1725 // this one, and thus the gc_thread[]->idle flags are invalid
1726 // as soon as we release any threads after GC. Getting this
1727 // wrong leads to a rare and hard to debug deadlock!
1728
1729 for (i=0; i < n_capabilities; i++) {
1730 gc_threads[i]->idle = idle_cap[i];
1731 capabilities[i]->idle++;
1732 }
1733
1734 // For all capabilities participating in this GC, wait until
1735 // they have stopped mutating and are standing by for GC.
1736 waitForGcThreads(cap);
1737
1738 #if defined(THREADED_RTS)
1739 // Stable point where we can do a global check on our spark counters
1740 ASSERT(checkSparkCountInvariant());
1741 #endif
1742 }
1743
1744 #endif
1745
1746 IF_DEBUG(scheduler, printAllThreads());
1747
1748 delete_threads_and_gc:
1749 /*
1750 * We now have all the capabilities; if we're in an interrupting
1751 * state, then we should take the opportunity to delete all the
1752 * threads in the system.
1753 * Checking for major_gc ensures that the last GC is major.
1754 */
1755 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1756 deleteAllThreads(cap);
1757 #if defined(THREADED_RTS)
1758 // Discard all the sparks from every Capability. Why?
1759 // They'll probably be GC'd anyway since we've killed all the
1760 // threads. It just avoids the GC having to do any work to
1761 // figure out that any remaining sparks are garbage.
1762 for (i = 0; i < n_capabilities; i++) {
1763 capabilities[i]->spark_stats.gcd +=
1764 sparkPoolSize(capabilities[i]->sparks);
1765 // No race here since all Caps are stopped.
1766 discardSparksCap(capabilities[i]);
1767 }
1768 #endif
1769 sched_state = SCHED_SHUTTING_DOWN;
1770 }
1771
1772 /*
1773 * When there are disabled capabilities, we want to migrate any
1774 * threads away from them. Normally this happens in the
1775 * scheduler's loop, but only for unbound threads - it's really
1776 * hard for a bound thread to migrate itself. So we have another
1777 * go here.
1778 */
1779 #if defined(THREADED_RTS)
1780 for (i = enabled_capabilities; i < n_capabilities; i++) {
1781 Capability *tmp_cap, *dest_cap;
1782 tmp_cap = capabilities[i];
1783 ASSERT(tmp_cap->disabled);
1784 if (i != cap->no) {
1785 dest_cap = capabilities[i % enabled_capabilities];
1786 while (!emptyRunQueue(tmp_cap)) {
1787 tso = popRunQueue(tmp_cap);
1788 migrateThread(tmp_cap, tso, dest_cap);
1789 if (tso->bound) {
1790 traceTaskMigrate(tso->bound->task,
1791 tso->bound->task->cap,
1792 dest_cap);
1793 tso->bound->task->cap = dest_cap;
1794 }
1795 }
1796 }
1797 }
1798 #endif
1799
1800 #if defined(THREADED_RTS)
1801 // reset pending_sync *before* GC, so that when the GC threads
1802 // emerge they don't immediately re-enter the GC.
1803 pending_sync = 0;
1804 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1805 #else
1806 GarbageCollect(collect_gen, heap_census, 0, cap);
1807 #endif
1808
1809 traceSparkCounters(cap);
1810
1811 switch (recent_activity) {
1812 case ACTIVITY_INACTIVE:
1813 if (force_major) {
1814 // We are doing a GC because the system has been idle for a
1815 // timeslice and we need to check for deadlock. Record the
1816 // fact that we've done a GC and turn off the timer signal;
1817 // it will get re-enabled if we run any threads after the GC.
1818 recent_activity = ACTIVITY_DONE_GC;
1819 #ifndef PROFILING
1820 stopTimer();
1821 #endif
1822 break;
1823 }
1824 // fall through...
1825
1826 case ACTIVITY_MAYBE_NO:
1827 // the GC might have taken long enough for the timer to set
1828 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1829 // but we aren't necessarily deadlocked:
1830 recent_activity = ACTIVITY_YES;
1831 break;
1832
1833 case ACTIVITY_DONE_GC:
1834 // If we are actually active, the scheduler will reset the
1835 // recent_activity flag and re-enable the timer.
1836 break;
1837 }
1838
1839 #if defined(THREADED_RTS)
1840 // Stable point where we can do a global check on our spark counters
1841 ASSERT(checkSparkCountInvariant());
1842 #endif
1843
1844 // The heap census itself is done during GarbageCollect().
1845 if (heap_census) {
1846 performHeapProfile = rtsFalse;
1847 }
1848
1849 #if defined(THREADED_RTS)
1850
1851 // If n_capabilities has changed during GC, we're in trouble.
1852 ASSERT(n_capabilities == old_n_capabilities);
1853
1854 if (gc_type == SYNC_GC_PAR)
1855 {
1856 releaseGCThreads(cap);
1857 for (i = 0; i < n_capabilities; i++) {
1858 if (i != cap->no) {
1859 if (idle_cap[i]) {
1860 ASSERT(capabilities[i]->running_task == task);
1861 task->cap = capabilities[i];
1862 releaseCapability(capabilities[i]);
1863 } else {
1864 ASSERT(capabilities[i]->running_task != task);
1865 }
1866 }
1867 }
1868 task->cap = cap;
1869 }
1870 #endif
1871
1872 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1873 // GC set the heap_overflow flag, so we should proceed with
1874 // an orderly shutdown now. Ultimately we want the main
1875 // thread to return to its caller with HeapExhausted, at which
1876 // point the caller should call hs_exit(). The first step is
1877 // to delete all the threads.
1878 //
1879 // Another way to do this would be to raise an exception in
1880 // the main thread, which we really should do because it gives
1881 // the program a chance to clean up. But how do we find the
1882 // main thread? It should presumably be the same one that
1883 // gets ^C exceptions, but that's all done on the Haskell side
1884 // (GHC.TopHandler).
1885 sched_state = SCHED_INTERRUPTING;
1886 goto delete_threads_and_gc;
1887 }
1888
1889 #ifdef SPARKBALANCE
1890 /* JB
1891 Once we are all together... this would be the place to balance all
1892 spark pools. No concurrent stealing or adding of new sparks can
1893 occur. Should be defined in Sparks.c. */
1894 balanceSparkPoolsCaps(n_capabilities, capabilities);
1895 #endif
1896
1897 #if defined(THREADED_RTS)
1898 stgFree(idle_cap);
1899
1900 if (gc_type == SYNC_GC_SEQ) {
1901 // release our stash of capabilities.
1902 releaseAllCapabilities(n_capabilities, cap, task);
1903 }
1904 #endif
1905
1906 return;
1907 }
1908
1909 /* ---------------------------------------------------------------------------
1910 * Singleton fork(). Do not copy any running threads.
1911 * ------------------------------------------------------------------------- */
1912
1913 pid_t
1914 forkProcess(HsStablePtr *entry
1915 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1916 STG_UNUSED
1917 #endif
1918 )
1919 {
1920 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1921 pid_t pid;
1922 StgTSO* t,*next;
1923 Capability *cap;
1924 uint32_t g;
1925 Task *task = NULL;
1926 uint32_t i;
1927
1928 debugTrace(DEBUG_sched, "forking!");
1929
1930 task = newBoundTask();
1931
1932 cap = NULL;
1933 waitForCapability(&cap, task);
1934
1935 #ifdef THREADED_RTS
1936 stopAllCapabilities(&cap, task);
1937 #endif
1938
1939 // no funny business: hold locks while we fork, otherwise if some
1940 // other thread is holding a lock when the fork happens, the data
1941 // structure protected by the lock will forever be in an
1942 // inconsistent state in the child. See also #1391.
1943 ACQUIRE_LOCK(&sched_mutex);
1944 ACQUIRE_LOCK(&sm_mutex);
1945 ACQUIRE_LOCK(&stable_mutex);
1946 ACQUIRE_LOCK(&task->lock);
1947
1948 for (i=0; i < n_capabilities; i++) {
1949 ACQUIRE_LOCK(&capabilities[i]->lock);
1950 }
1951
1952 #ifdef THREADED_RTS
1953 ACQUIRE_LOCK(&all_tasks_mutex);
1954 #endif
1955
1956 stopTimer(); // See #4074
1957
1958 #if defined(TRACING)
1959 flushEventLog(); // so that child won't inherit dirty file buffers
1960 #endif
1961
1962 pid = fork();
1963
1964 if (pid) { // parent
1965
1966 startTimer(); // #4074
1967
1968 RELEASE_LOCK(&sched_mutex);
1969 RELEASE_LOCK(&sm_mutex);
1970 RELEASE_LOCK(&stable_mutex);
1971 RELEASE_LOCK(&task->lock);
1972
1973 for (i=0; i < n_capabilities; i++) {
1974 releaseCapability_(capabilities[i],rtsFalse);
1975 RELEASE_LOCK(&capabilities[i]->lock);
1976 }
1977
1978 #ifdef THREADED_RTS
1979 RELEASE_LOCK(&all_tasks_mutex);
1980 #endif
1981
1982 boundTaskExiting(task);
1983
1984 // just return the pid
1985 return pid;
1986
1987 } else { // child
1988
1989 #if defined(THREADED_RTS)
1990 initMutex(&sched_mutex);
1991 initMutex(&sm_mutex);
1992 initMutex(&stable_mutex);
1993 initMutex(&task->lock);
1994
1995 for (i=0; i < n_capabilities; i++) {
1996 initMutex(&capabilities[i]->lock);
1997 }
1998
1999 initMutex(&all_tasks_mutex);
2000 #endif
2001
2002 #ifdef TRACING
2003 resetTracing();
2004 #endif
2005
2006 // Now, all OS threads except the thread that forked are
2007 // stopped. We need to stop all Haskell threads, including
2008 // those involved in foreign calls. Also we need to delete
2009 // all Tasks, because they correspond to OS threads that are
2010 // now gone.
2011
2012 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2013 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2014 next = t->global_link;
2015 // don't allow threads to catch the ThreadKilled
2016 // exception, but we do want to raiseAsync() because these
2017 // threads may be evaluating thunks that we need later.
2018 deleteThread_(t->cap,t);
2019
2020 // stop the GC from updating the InCall to point to
2021 // the TSO. This is only necessary because the
2022 // OSThread bound to the TSO has been killed, and
2023 // won't get a chance to exit in the usual way (see
2024 // also scheduleHandleThreadFinished).
2025 t->bound = NULL;
2026 }
2027 }
2028
2029 discardTasksExcept(task);
2030
2031 for (i=0; i < n_capabilities; i++) {
2032 cap = capabilities[i];
2033
2034 // Empty the run queue. It seems tempting to let all the
2035 // killed threads stay on the run queue as zombies to be
2036 // cleaned up later, but some of them may correspond to
2037 // bound threads for which the corresponding Task does not
2038 // exist.
2039 truncateRunQueue(cap);
2040
2041 // Any suspended C-calling Tasks are no more, their OS threads
2042 // don't exist now:
2043 cap->suspended_ccalls = NULL;
2044
2045 #if defined(THREADED_RTS)
2046 // Wipe our spare workers list, they no longer exist. New
2047 // workers will be created if necessary.
2048 cap->spare_workers = NULL;
2049 cap->n_spare_workers = 0;
2050 cap->returning_tasks_hd = NULL;
2051 cap->returning_tasks_tl = NULL;
2052 #endif
2053
2054 // Release all caps except 0, we'll use that for starting
2055 // the IO manager and running the client action below.
2056 if (cap->no != 0) {
2057 task->cap = cap;
2058 releaseCapability(cap);
2059 }
2060 }
2061 cap = capabilities[0];
2062 task->cap = cap;
2063
2064 // Empty the threads lists. Otherwise, the garbage
2065 // collector may attempt to resurrect some of these threads.
2066 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2067 generations[g].threads = END_TSO_QUEUE;
2068 }
2069
2070 // On Unix, all timers are reset in the child, so we need to start
2071 // the timer again.
2072 initTimer();
2073 startTimer();
2074
2075 // TODO: need to trace various other things in the child
2076 // like startup event, capabilities, process info etc
2077 traceTaskCreate(task, cap);
2078
2079 #if defined(THREADED_RTS)
2080 ioManagerStartCap(&cap);
2081 #endif
2082
2083 rts_evalStableIO(&cap, entry, NULL); // run the action
2084 rts_checkSchedStatus("forkProcess",cap);
2085
2086 rts_unlock(cap);
2087 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2088 }
2089 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2090 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2091 #endif
2092 }
2093
2094 /* ---------------------------------------------------------------------------
2095 * Changing the number of Capabilities
2096 *
2097 * Changing the number of Capabilities is very tricky! We can only do
2098 * it with the system fully stopped, so we do a full sync with
2099 * requestSync(SYNC_OTHER) and grab all the capabilities.
2100 *
2101 * Then we resize the appropriate data structures, and update all
2102 * references to the old data structures which have now moved.
2103 * Finally we release the Capabilities we are holding, and start
2104 * worker Tasks on the new Capabilities we created.
2105 *
2106 * ------------------------------------------------------------------------- */
2107
2108 void
2109 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2110 {
2111 #if !defined(THREADED_RTS)
2112 if (new_n_capabilities != 1) {
2113 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2114 }
2115 return;
2116 #elif defined(NOSMP)
2117 if (new_n_capabilities != 1) {
2118 errorBelch("setNumCapabilities: not supported on this platform");
2119 }
2120 return;
2121 #else
2122 Task *task;
2123 Capability *cap;
2124 uint32_t n;
2125 Capability *old_capabilities = NULL;
2126 uint32_t old_n_capabilities = n_capabilities;
2127
2128 if (new_n_capabilities == enabled_capabilities) return;
2129
2130 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2131 enabled_capabilities, new_n_capabilities);
2132
2133 cap = rts_lock();
2134 task = cap->running_task;
2135
2136 stopAllCapabilities(&cap, task);
2137
2138 if (new_n_capabilities < enabled_capabilities)
2139 {
2140 // Reducing the number of capabilities: we do not actually
2141 // remove the extra capabilities, we just mark them as
2142 // "disabled". This has the following effects:
2143 //
2144 // - threads on a disabled capability are migrated away by the
2145 // scheduler loop
2146 //
2147 // - disabled capabilities do not participate in GC
2148 // (see scheduleDoGC())
2149 //
2150 // - No spark threads are created on this capability
2151 // (see scheduleActivateSpark())
2152 //
2153 // - We do not attempt to migrate threads *to* a disabled
2154 // capability (see schedulePushWork()).
2155 //
2156 // but in other respects, a disabled capability remains
2157 // alive. Threads may be woken up on a disabled capability,
2158 // but they will be immediately migrated away.
2159 //
2160 // This approach is much easier than trying to actually remove
2161 // the capability; we don't have to worry about GC data
2162 // structures, the nursery, etc.
2163 //
2164 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2165 capabilities[n]->disabled = rtsTrue;
2166 traceCapDisable(capabilities[n]);
2167 }
2168 enabled_capabilities = new_n_capabilities;
2169 }
2170 else
2171 {
2172 // Increasing the number of enabled capabilities.
2173 //
2174 // enable any disabled capabilities, up to the required number
2175 for (n = enabled_capabilities;
2176 n < new_n_capabilities && n < n_capabilities; n++) {
2177 capabilities[n]->disabled = rtsFalse;
2178 traceCapEnable(capabilities[n]);
2179 }
2180 enabled_capabilities = n;
2181
2182 if (new_n_capabilities > n_capabilities) {
2183 #if defined(TRACING)
2184 // Allocate eventlog buffers for the new capabilities. Note this
2185 // must be done before calling moreCapabilities(), because that
2186 // will emit events about creating the new capabilities and adding
2187 // them to existing capsets.
2188 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2189 #endif
2190
2191 // Resize the capabilities array
2192 // NB. after this, capabilities points somewhere new. Any pointers
2193 // of type (Capability *) are now invalid.
2194 moreCapabilities(n_capabilities, new_n_capabilities);
2195
2196 // Resize and update storage manager data structures
2197 storageAddCapabilities(n_capabilities, new_n_capabilities);
2198 }
2199 }
2200
2201 // update n_capabilities before things start running
2202 if (new_n_capabilities > n_capabilities) {
2203 n_capabilities = enabled_capabilities = new_n_capabilities;
2204 }
2205
2206 // Start worker tasks on the new Capabilities
2207 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2208
2209 // We're done: release the original Capabilities
2210 releaseAllCapabilities(old_n_capabilities, cap,task);
2211
2212 // We can't free the old array until now, because we access it
2213 // while updating pointers in updateCapabilityRefs().
2214 if (old_capabilities) {
2215 stgFree(old_capabilities);
2216 }
2217
2218 // Notify IO manager that the number of capabilities has changed.
2219 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2220
2221 rts_unlock(cap);
2222
2223 #endif // THREADED_RTS
2224 }
2225
2226
2227
2228 /* ---------------------------------------------------------------------------
2229 * Delete all the threads in the system
2230 * ------------------------------------------------------------------------- */
2231
2232 static void
2233 deleteAllThreads ( Capability *cap )
2234 {
2235 // NOTE: only safe to call if we own all capabilities.
2236
2237 StgTSO* t, *next;
2238 uint32_t g;
2239
2240 debugTrace(DEBUG_sched,"deleting all threads");
2241 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2242 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2243 next = t->global_link;
2244 deleteThread(cap,t);
2245 }
2246 }
2247
2248 // The run queue now contains a bunch of ThreadKilled threads. We
2249 // must not throw these away: the main thread(s) will be in there
2250 // somewhere, and the main scheduler loop has to deal with it.
2251 // Also, the run queue is the only thing keeping these threads from
2252 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2253
2254 #if !defined(THREADED_RTS)
2255 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2256 ASSERT(sleeping_queue == END_TSO_QUEUE);
2257 #endif
2258 }
2259
2260 /* -----------------------------------------------------------------------------
2261 Managing the suspended_ccalls list.
2262 Locks required: sched_mutex
2263 -------------------------------------------------------------------------- */
2264
2265 STATIC_INLINE void
2266 suspendTask (Capability *cap, Task *task)
2267 {
2268 InCall *incall;
2269
2270 incall = task->incall;
2271 ASSERT(incall->next == NULL && incall->prev == NULL);
2272 incall->next = cap->suspended_ccalls;
2273 incall->prev = NULL;
2274 if (cap->suspended_ccalls) {
2275 cap->suspended_ccalls->prev = incall;
2276 }
2277 cap->suspended_ccalls = incall;
2278 }
2279
2280 STATIC_INLINE void
2281 recoverSuspendedTask (Capability *cap, Task *task)
2282 {
2283 InCall *incall;
2284
2285 incall = task->incall;
2286 if (incall->prev) {
2287 incall->prev->next = incall->next;
2288 } else {
2289 ASSERT(cap->suspended_ccalls == incall);
2290 cap->suspended_ccalls = incall->next;
2291 }
2292 if (incall->next) {
2293 incall->next->prev = incall->prev;
2294 }
2295 incall->next = incall->prev = NULL;
2296 }
2297
2298 /* ---------------------------------------------------------------------------
2299 * Suspending & resuming Haskell threads.
2300 *
2301 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2302 * its capability before calling the C function. This allows another
2303 * task to pick up the capability and carry on running Haskell
2304 * threads. It also means that if the C call blocks, it won't lock
2305 * the whole system.
2306 *
2307 * The Haskell thread making the C call is put to sleep for the
2308 * duration of the call, on the suspended_ccalling_threads queue. We
2309 * give out a token to the task, which it can use to resume the thread
2310 * on return from the C function.
2311 *
2312 * If this is an interruptible C call, this means that the FFI call may be
2313 * unceremoniously terminated and should be scheduled on an
2314 * unbound worker thread.
2315 * ------------------------------------------------------------------------- */
2316
2317 void *
2318 suspendThread (StgRegTable *reg, rtsBool interruptible)
2319 {
2320 Capability *cap;
2321 int saved_errno;
2322 StgTSO *tso;
2323 Task *task;
2324 #if mingw32_HOST_OS
2325 StgWord32 saved_winerror;
2326 #endif
2327
2328 saved_errno = errno;
2329 #if mingw32_HOST_OS
2330 saved_winerror = GetLastError();
2331 #endif
2332
2333 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2334 */
2335 cap = regTableToCapability(reg);
2336
2337 task = cap->running_task;
2338 tso = cap->r.rCurrentTSO;
2339
2340 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2341
2342 // XXX this might not be necessary --SDM
2343 tso->what_next = ThreadRunGHC;
2344
2345 threadPaused(cap,tso);
2346
2347 if (interruptible) {
2348 tso->why_blocked = BlockedOnCCall_Interruptible;
2349 } else {
2350 tso->why_blocked = BlockedOnCCall;
2351 }
2352
2353 // Hand back capability
2354 task->incall->suspended_tso = tso;
2355 task->incall->suspended_cap = cap;
2356
2357 // Otherwise allocate() will write to invalid memory.
2358 cap->r.rCurrentTSO = NULL;
2359
2360 ACQUIRE_LOCK(&cap->lock);
2361
2362 suspendTask(cap,task);
2363 cap->in_haskell = rtsFalse;
2364 releaseCapability_(cap,rtsFalse);
2365
2366 RELEASE_LOCK(&cap->lock);
2367
2368 errno = saved_errno;
2369 #if mingw32_HOST_OS
2370 SetLastError(saved_winerror);
2371 #endif
2372 return task;
2373 }
2374
2375 StgRegTable *
2376 resumeThread (void *task_)
2377 {
2378 StgTSO *tso;
2379 InCall *incall;
2380 Capability *cap;
2381 Task *task = task_;
2382 int saved_errno;
2383 #if mingw32_HOST_OS
2384 StgWord32 saved_winerror;
2385 #endif
2386
2387 saved_errno = errno;
2388 #if mingw32_HOST_OS
2389 saved_winerror = GetLastError();
2390 #endif
2391
2392 incall = task->incall;
2393 cap = incall->suspended_cap;
2394 task->cap = cap;
2395
2396 // Wait for permission to re-enter the RTS with the result.
2397 waitForCapability(&cap,task);
2398 // we might be on a different capability now... but if so, our
2399 // entry on the suspended_ccalls list will also have been
2400 // migrated.
2401
2402 // Remove the thread from the suspended list
2403 recoverSuspendedTask(cap,task);
2404
2405 tso = incall->suspended_tso;
2406 incall->suspended_tso = NULL;
2407 incall->suspended_cap = NULL;
2408 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2409
2410 traceEventRunThread(cap, tso);
2411
2412 /* Reset blocking status */
2413 tso->why_blocked = NotBlocked;
2414
2415 if ((tso->flags & TSO_BLOCKEX) == 0) {
2416 // avoid locking the TSO if we don't have to
2417 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2418 maybePerformBlockedException(cap,tso);
2419 }
2420 }
2421
2422 cap->r.rCurrentTSO = tso;
2423 cap->in_haskell = rtsTrue;
2424 errno = saved_errno;
2425 #if mingw32_HOST_OS
2426 SetLastError(saved_winerror);
2427 #endif
2428
2429 /* We might have GC'd, mark the TSO dirty again */
2430 dirty_TSO(cap,tso);
2431 dirty_STACK(cap,tso->stackobj);
2432
2433 IF_DEBUG(sanity, checkTSO(tso));
2434
2435 return &cap->r;
2436 }
2437
2438 /* ---------------------------------------------------------------------------
2439 * scheduleThread()
2440 *
2441 * scheduleThread puts a thread on the end of the runnable queue.
2442 * This will usually be done immediately after a thread is created.
2443 * The caller of scheduleThread must create the thread using e.g.
2444 * createThread and push an appropriate closure
2445 * on this thread's stack before the scheduler is invoked.
2446 * ------------------------------------------------------------------------ */
2447
2448 void
2449 scheduleThread(Capability *cap, StgTSO *tso)
2450 {
2451 // The thread goes at the *end* of the run-queue, to avoid possible
2452 // starvation of any threads already on the queue.
2453 appendToRunQueue(cap,tso);
2454 }
2455
2456 void
2457 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2458 {
2459 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2460 // move this thread from now on.
2461 #if defined(THREADED_RTS)
2462 cpu %= enabled_capabilities;
2463 if (cpu == cap->no) {
2464 appendToRunQueue(cap,tso);
2465 } else {
2466 migrateThread(cap, tso, capabilities[cpu]);
2467 }
2468 #else
2469 appendToRunQueue(cap,tso);
2470 #endif
2471 }
2472
2473 void
2474 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2475 {
2476 Task *task;
2477 DEBUG_ONLY( StgThreadID id );
2478 Capability *cap;
2479
2480 cap = *pcap;
2481
2482 // We already created/initialised the Task
2483 task = cap->running_task;
2484
2485 // This TSO is now a bound thread; make the Task and TSO
2486 // point to each other.
2487 tso->bound = task->incall;
2488 tso->cap = cap;
2489
2490 task->incall->tso = tso;
2491 task->incall->ret = ret;
2492 task->incall->rstat = NoStatus;
2493
2494 appendToRunQueue(cap,tso);
2495
2496 DEBUG_ONLY( id = tso->id );
2497 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2498
2499 cap = schedule(cap,task);
2500
2501 ASSERT(task->incall->rstat != NoStatus);
2502 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2503
2504 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2505 *pcap = cap;
2506 }
2507
2508 /* ----------------------------------------------------------------------------
2509 * Starting Tasks
2510 * ------------------------------------------------------------------------- */
2511
2512 #if defined(THREADED_RTS)
2513 void scheduleWorker (Capability *cap, Task *task)
2514 {
2515 // schedule() runs without a lock.
2516 cap = schedule(cap,task);
2517
2518 // On exit from schedule(), we have a Capability, but possibly not
2519 // the same one we started with.
2520
2521 // During shutdown, the requirement is that after all the
2522 // Capabilities are shut down, all workers that are shutting down
2523 // have finished workerTaskStop(). This is why we hold on to
2524 // cap->lock until we've finished workerTaskStop() below.
2525 //
2526 // There may be workers still involved in foreign calls; those
2527 // will just block in waitForCapability() because the
2528 // Capability has been shut down.
2529 //
2530 ACQUIRE_LOCK(&cap->lock);
2531 releaseCapability_(cap,rtsFalse);
2532 workerTaskStop(task);
2533 RELEASE_LOCK(&cap->lock);
2534 }
2535 #endif
2536
2537 /* ---------------------------------------------------------------------------
2538 * Start new worker tasks on Capabilities from--to
2539 * -------------------------------------------------------------------------- */
2540
2541 static void
2542 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2543 {
2544 #if defined(THREADED_RTS)
2545 uint32_t i;
2546 Capability *cap;
2547
2548 for (i = from; i < to; i++) {
2549 cap = capabilities[i];
2550 ACQUIRE_LOCK(&cap->lock);
2551 startWorkerTask(cap);
2552 RELEASE_LOCK(&cap->lock);
2553 }
2554 #endif
2555 }
2556
2557 /* ---------------------------------------------------------------------------
2558 * initScheduler()
2559 *
2560 * Initialise the scheduler. This resets all the queues - if the
2561 * queues contained any threads, they'll be garbage collected at the
2562 * next pass.
2563 *
2564 * ------------------------------------------------------------------------ */
2565
2566 void
2567 initScheduler(void)
2568 {
2569 #if !defined(THREADED_RTS)
2570 blocked_queue_hd = END_TSO_QUEUE;
2571 blocked_queue_tl = END_TSO_QUEUE;
2572 sleeping_queue = END_TSO_QUEUE;
2573 #endif
2574
2575 sched_state = SCHED_RUNNING;
2576 recent_activity = ACTIVITY_YES;
2577
2578 #if defined(THREADED_RTS)
2579 /* Initialise the mutex and condition variables used by
2580 * the scheduler. */
2581 initMutex(&sched_mutex);
2582 #endif
2583
2584 ACQUIRE_LOCK(&sched_mutex);
2585
2586 /* A capability holds the state a native thread needs in
2587 * order to execute STG code. At least one capability is
2588 * floating around (only THREADED_RTS builds have more than one).
2589 */
2590 initCapabilities();
2591
2592 initTaskManager();
2593
2594 /*
2595 * Eagerly start one worker to run each Capability, except for
2596 * Capability 0. The idea is that we're probably going to start a
2597 * bound thread on Capability 0 pretty soon, so we don't want a
2598 * worker task hogging it.
2599 */
2600 startWorkerTasks(1, n_capabilities);
2601
2602 RELEASE_LOCK(&sched_mutex);
2603
2604 }
2605
2606 void
2607 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2608 /* see Capability.c, shutdownCapability() */
2609 {
2610 Task *task = NULL;
2611
2612 task = newBoundTask();
2613
2614 // If we haven't killed all the threads yet, do it now.
2615 if (sched_state < SCHED_SHUTTING_DOWN) {
2616 sched_state = SCHED_INTERRUPTING;
2617 Capability *cap = task->cap;
2618 waitForCapability(&cap,task);
2619 scheduleDoGC(&cap,task,rtsTrue);
2620 ASSERT(task->incall->tso == NULL);
2621 releaseCapability(cap);
2622 }
2623 sched_state = SCHED_SHUTTING_DOWN;
2624
2625 shutdownCapabilities(task, wait_foreign);
2626
2627 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2628 // n_failed_trygrab_idles, n_idle_caps);
2629
2630 boundTaskExiting(task);
2631 }
2632
2633 void
2634 freeScheduler( void )
2635 {
2636 uint32_t still_running;
2637
2638 ACQUIRE_LOCK(&sched_mutex);
2639 still_running = freeTaskManager();
2640 // We can only free the Capabilities if there are no Tasks still
2641 // running. We might have a Task about to return from a foreign
2642 // call into waitForCapability(), for example (actually,
2643 // this should be the *only* thing that a still-running Task can
2644 // do at this point, and it will block waiting for the
2645 // Capability).
2646 if (still_running == 0) {
2647 freeCapabilities();
2648 }
2649 RELEASE_LOCK(&sched_mutex);
2650 #if defined(THREADED_RTS)
2651 closeMutex(&sched_mutex);
2652 #endif
2653 }
2654
2655 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2656 void *user USED_IF_NOT_THREADS)
2657 {
2658 #if !defined(THREADED_RTS)
2659 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2660 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2661 evac(user, (StgClosure **)(void *)&sleeping_queue);
2662 #endif
2663 }
2664
2665 /* -----------------------------------------------------------------------------
2666 performGC
2667
2668 This is the interface to the garbage collector from Haskell land.
2669 We provide this so that external C code can allocate and garbage
2670 collect when called from Haskell via _ccall_GC.
2671 -------------------------------------------------------------------------- */
2672
2673 static void
2674 performGC_(rtsBool force_major)
2675 {
2676 Task *task;
2677 Capability *cap = NULL;
2678
2679 // We must grab a new Task here, because the existing Task may be
2680 // associated with a particular Capability, and chained onto the
2681 // suspended_ccalls queue.
2682 task = newBoundTask();
2683
2684 // TODO: do we need to traceTask*() here?
2685
2686 waitForCapability(&cap,task);
2687 scheduleDoGC(&cap,task,force_major);
2688 releaseCapability(cap);
2689 boundTaskExiting(task);
2690 }
2691
2692 void
2693 performGC(void)
2694 {
2695 performGC_(rtsFalse);
2696 }
2697
2698 void
2699 performMajorGC(void)
2700 {
2701 performGC_(rtsTrue);
2702 }
2703
2704 /* ---------------------------------------------------------------------------
2705 Interrupt execution.
2706 Might be called inside a signal handler so it mustn't do anything fancy.
2707 ------------------------------------------------------------------------ */
2708
2709 void
2710 interruptStgRts(void)
2711 {
2712 sched_state = SCHED_INTERRUPTING;
2713 interruptAllCapabilities();
2714 #if defined(THREADED_RTS)
2715 wakeUpRts();
2716 #endif
2717 }
2718
2719 /* -----------------------------------------------------------------------------
2720 Wake up the RTS
2721
2722 This function causes at least one OS thread to wake up and run the
2723 scheduler loop. It is invoked when the RTS might be deadlocked, or
2724 an external event has arrived that may need servicing (eg. a
2725 keyboard interrupt).
2726
2727 In the single-threaded RTS we don't do anything here; we only have
2728 one thread anyway, and the event that caused us to want to wake up
2729 will have interrupted any blocking system call in progress anyway.
2730 -------------------------------------------------------------------------- */
2731
2732 #if defined(THREADED_RTS)
2733 void wakeUpRts(void)
2734 {
2735 // This forces the IO Manager thread to wakeup, which will
2736 // in turn ensure that some OS thread wakes up and runs the
2737 // scheduler loop, which will cause a GC and deadlock check.
2738 ioManagerWakeup();
2739 }
2740 #endif
2741
2742 /* -----------------------------------------------------------------------------
2743 Deleting threads
2744
2745 This is used for interruption (^C) and forking, and corresponds to
2746 raising an exception but without letting the thread catch the
2747 exception.
2748 -------------------------------------------------------------------------- */
2749
2750 static void
2751 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2752 {
2753 // NOTE: must only be called on a TSO that we have exclusive
2754 // access to, because we will call throwToSingleThreaded() below.
2755 // The TSO must be on the run queue of the Capability we own, or
2756 // we must own all Capabilities.
2757
2758 if (tso->why_blocked != BlockedOnCCall &&
2759 tso->why_blocked != BlockedOnCCall_Interruptible) {
2760 throwToSingleThreaded(tso->cap,tso,NULL);
2761 }
2762 }
2763
2764 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2765 static void
2766 deleteThread_(Capability *cap, StgTSO *tso)
2767 { // for forkProcess only:
2768 // like deleteThread(), but we delete threads in foreign calls, too.
2769
2770 if (tso->why_blocked == BlockedOnCCall ||
2771 tso->why_blocked == BlockedOnCCall_Interruptible) {
2772 tso->what_next = ThreadKilled;
2773 appendToRunQueue(tso->cap, tso);
2774 } else {
2775 deleteThread(cap,tso);
2776 }
2777 }
2778 #endif
2779
2780 /* -----------------------------------------------------------------------------
2781 raiseExceptionHelper
2782
2783 This function is called by the raise# primitve, just so that we can
2784 move some of the tricky bits of raising an exception from C-- into
2785 C. Who knows, it might be a useful re-useable thing here too.
2786 -------------------------------------------------------------------------- */
2787
2788 StgWord
2789 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2790 {
2791 Capability *cap = regTableToCapability(reg);
2792 StgThunk *raise_closure = NULL;
2793 StgPtr p, next;
2794 const StgRetInfoTable *info;
2795 //
2796 // This closure represents the expression 'raise# E' where E
2797 // is the exception raise. It is used to overwrite all the
2798 // thunks which are currently under evaluataion.
2799 //
2800
2801 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2802 // LDV profiling: stg_raise_info has THUNK as its closure
2803 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2804 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2805 // 1 does not cause any problem unless profiling is performed.
2806 // However, when LDV profiling goes on, we need to linearly scan
2807 // small object pool, where raise_closure is stored, so we should
2808 // use MIN_UPD_SIZE.
2809 //
2810 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2811 // sizeofW(StgClosure)+1);
2812 //
2813
2814 //
2815 // Walk up the stack, looking for the catch frame. On the way,
2816 // we update any closures pointed to from update frames with the
2817 // raise closure that we just built.
2818 //
2819 p = tso->stackobj->sp;
2820 while(1) {
2821 info = get_ret_itbl((StgClosure *)p);
2822 next = p + stack_frame_sizeW((StgClosure *)p);
2823 switch (info->i.type) {
2824
2825 case UPDATE_FRAME:
2826 // Only create raise_closure if we need to.
2827 if (raise_closure == NULL) {
2828 raise_closure =
2829 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2830 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2831 raise_closure->payload[0] = exception;
2832 }
2833 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2834 (StgClosure *)raise_closure);
2835 p = next;
2836 continue;
2837
2838 case ATOMICALLY_FRAME:
2839 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2840 tso->stackobj->sp = p;
2841 return ATOMICALLY_FRAME;
2842
2843 case CATCH_FRAME:
2844 tso->stackobj->sp = p;
2845 return CATCH_FRAME;
2846
2847 case CATCH_STM_FRAME:
2848 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2849 tso->stackobj->sp = p;
2850 return CATCH_STM_FRAME;
2851
2852 case UNDERFLOW_FRAME:
2853 tso->stackobj->sp = p;
2854 threadStackUnderflow(cap,tso);
2855 p = tso->stackobj->sp;
2856 continue;
2857
2858 case STOP_FRAME:
2859 tso->stackobj->sp = p;
2860 return STOP_FRAME;
2861
2862 case CATCH_RETRY_FRAME: {
2863 StgTRecHeader *trec = tso -> trec;
2864 StgTRecHeader *outer = trec -> enclosing_trec;
2865 debugTrace(DEBUG_stm,
2866 "found CATCH_RETRY_FRAME at %p during raise", p);
2867 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2868 stmAbortTransaction(cap, trec);
2869 stmFreeAbortedTRec(cap, trec);
2870 tso -> trec = outer;
2871 p = next;
2872 continue;
2873 }
2874
2875 default:
2876 p = next;
2877 continue;
2878 }
2879 }
2880 }
2881
2882
2883 /* -----------------------------------------------------------------------------
2884 findRetryFrameHelper
2885
2886 This function is called by the retry# primitive. It traverses the stack
2887 leaving tso->sp referring to the frame which should handle the retry.
2888
2889 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2890 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2891
2892 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2893 create) because retries are not considered to be exceptions, despite the
2894 similar implementation.
2895
2896 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2897 not be created within memory transactions.
2898 -------------------------------------------------------------------------- */
2899
2900 StgWord
2901 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2902 {
2903 const StgRetInfoTable *info;
2904 StgPtr p, next;
2905
2906 p = tso->stackobj->sp;
2907 while (1) {
2908 info = get_ret_itbl((const StgClosure *)p);
2909 next = p + stack_frame_sizeW((StgClosure *)p);
2910 switch (info->i.type) {
2911
2912 case ATOMICALLY_FRAME:
2913 debugTrace(DEBUG_stm,
2914 "found ATOMICALLY_FRAME at %p during retry", p);
2915 tso->stackobj->sp = p;
2916 return ATOMICALLY_FRAME;
2917
2918 case CATCH_RETRY_FRAME:
2919 debugTrace(DEBUG_stm,
2920 "found CATCH_RETRY_FRAME at %p during retry", p);
2921 tso->stackobj->sp = p;
2922 return CATCH_RETRY_FRAME;
2923
2924 case CATCH_STM_FRAME: {
2925 StgTRecHeader *trec = tso -> trec;
2926 StgTRecHeader *outer = trec -> enclosing_trec;
2927 debugTrace(DEBUG_stm,
2928 "found CATCH_STM_FRAME at %p during retry", p);
2929 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2930 stmAbortTransaction(cap, trec);
2931 stmFreeAbortedTRec(cap, trec);
2932 tso -> trec = outer;
2933 p = next;
2934 continue;
2935 }
2936
2937 case UNDERFLOW_FRAME:
2938 tso->stackobj->sp = p;
2939 threadStackUnderflow(cap,tso);
2940 p = tso->stackobj->sp;
2941 continue;
2942
2943 default:
2944 ASSERT(info->i.type != CATCH_FRAME);
2945 ASSERT(info->i.type != STOP_FRAME);
2946 p = next;
2947 continue;
2948 }
2949 }
2950 }
2951
2952 /* -----------------------------------------------------------------------------
2953 resurrectThreads is called after garbage collection on the list of
2954 threads found to be garbage. Each of these threads will be woken
2955 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2956 on an MVar, or NonTermination if the thread was blocked on a Black
2957 Hole.
2958
2959 Locks: assumes we hold *all* the capabilities.
2960 -------------------------------------------------------------------------- */
2961
2962 void
2963 resurrectThreads (StgTSO *threads)
2964 {
2965 StgTSO *tso, *next;
2966 Capability *cap;
2967 generation *gen;
2968
2969 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2970 next = tso->global_link;
2971
2972 gen = Bdescr((P_)tso)->gen;
2973 tso->global_link = gen->threads;
2974 gen->threads = tso;
2975
2976 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2977
2978 // Wake up the thread on the Capability it was last on
2979 cap = tso->cap;
2980
2981 switch (tso->why_blocked) {
2982 case BlockedOnMVar:
2983 case BlockedOnMVarRead:
2984 /* Called by GC - sched_mutex lock is currently held. */
2985 throwToSingleThreaded(cap, tso,
2986 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2987 break;
2988 case BlockedOnBlackHole:
2989 throwToSingleThreaded(cap, tso,
2990 (StgClosure *)nonTermination_closure);
2991 break;
2992 case BlockedOnSTM:
2993 throwToSingleThreaded(cap, tso,
2994 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2995 break;
2996 case NotBlocked:
2997 /* This might happen if the thread was blocked on a black hole
2998 * belonging to a thread that we've just woken up (raiseAsync
2999 * can wake up threads, remember...).
3000 */
3001 continue;
3002 case BlockedOnMsgThrowTo:
3003 // This can happen if the target is masking, blocks on a
3004 // black hole, and then is found to be unreachable. In
3005 // this case, we want to let the target wake up and carry
3006 // on, and do nothing to this thread.
3007 continue;
3008 default:
3009 barf("resurrectThreads: thread blocked in a strange way: %d",
3010 tso->why_blocked);
3011 }
3012 }
3013 }