rts: Note functions which must take all_tasks_mutex.
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %" FMT_Word, sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #if defined(THREADED_RTS)
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #if defined(mingw32_HOST_OS)
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #if !defined(PROFILING)
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #if defined(mingw32_HOST_OS)
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 didGcLast = yieldCapability(&cap,task, !didGcLast);
683 }
684 while (shouldYieldCapability(cap,task,didGcLast));
685
686 // note there may still be no threads on the run queue at this
687 // point, the caller has to check.
688
689 *pcap = cap;
690 return;
691 }
692 #endif
693
694 /* -----------------------------------------------------------------------------
695 * schedulePushWork()
696 *
697 * Push work to other Capabilities if we have some.
698 * -------------------------------------------------------------------------- */
699
700 static void
701 schedulePushWork(Capability *cap USED_IF_THREADS,
702 Task *task USED_IF_THREADS)
703 {
704 /* following code not for PARALLEL_HASKELL. I kept the call general,
705 future GUM versions might use pushing in a distributed setup */
706 #if defined(THREADED_RTS)
707
708 Capability *free_caps[n_capabilities], *cap0;
709 uint32_t i, n_wanted_caps, n_free_caps;
710
711 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
712
713 // migration can be turned off with +RTS -qm
714 if (!RtsFlags.ParFlags.migrate) {
715 spare_threads = 0;
716 }
717
718 // Figure out how many capabilities we want to wake up. We need at least
719 // sparkPoolSize(cap) plus the number of spare threads we have.
720 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
721 if (n_wanted_caps == 0) return;
722
723 // First grab as many free Capabilities as we can. ToDo: we should use
724 // capabilities on the same NUMA node preferably, but not exclusively.
725 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
726 n_free_caps < n_wanted_caps && i != cap->no;
727 i = (i + 1) % n_capabilities) {
728 cap0 = capabilities[i];
729 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
730 if (!emptyRunQueue(cap0)
731 || cap0->n_returning_tasks != 0
732 || !emptyInbox(cap0)) {
733 // it already has some work, we just grabbed it at
734 // the wrong moment. Or maybe it's deadlocked!
735 releaseCapability(cap0);
736 } else {
737 free_caps[n_free_caps++] = cap0;
738 }
739 }
740 }
741
742 // We now have n_free_caps free capabilities stashed in
743 // free_caps[]. Attempt to share our run queue equally with them.
744 // This is complicated slightly by the fact that we can't move
745 // some threads:
746 //
747 // - threads that have TSO_LOCKED cannot migrate
748 // - a thread that is bound to the current Task cannot be migrated
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761
762 debugTrace(DEBUG_sched,
763 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
764 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
765 n_free_caps);
766
767 // There are n_free_caps+1 caps in total. We will share the threads
768 // evently between them, *except* that if the run queue does not divide
769 // evenly by n_free_caps+1 then we bias towards the current capability.
770 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
771 uint32_t keep_threads =
772 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
773
774 // This also ensures that we don't give away all our threads, since
775 // (x + y) / (y + 1) >= 1 when x >= 1.
776
777 // The number of threads we have left.
778 uint32_t n = cap->n_run_queue;
779
780 // prev = the previous thread on this cap's run queue
781 prev = END_TSO_QUEUE;
782
783 // We're going to walk through the run queue, migrating threads to other
784 // capabilities until we have only keep_threads left. We might
785 // encounter a thread that cannot be migrated, in which case we add it
786 // to the current run queue and decrement keep_threads.
787 for (t = cap->run_queue_hd, i = 0;
788 t != END_TSO_QUEUE && n > keep_threads;
789 t = next)
790 {
791 next = t->_link;
792 t->_link = END_TSO_QUEUE;
793
794 // Should we keep this thread?
795 if (t->bound == task->incall // don't move my bound thread
796 || tsoLocked(t) // don't move a locked thread
797 ) {
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 if (keep_threads > 0) keep_threads--;
806 }
807
808 // Or migrate it?
809 else {
810 appendToRunQueue(free_caps[i],t);
811 traceEventMigrateThread (cap, t, free_caps[i]->no);
812
813 if (t->bound) { t->bound->task->cap = free_caps[i]; }
814 t->cap = free_caps[i];
815 n--; // we have one fewer threads now
816 i++; // move on to the next free_cap
817 if (i == n_free_caps) i = 0;
818 }
819 }
820
821 // Join up the beginning of the queue (prev)
822 // with the rest of the queue (t)
823 if (t == END_TSO_QUEUE) {
824 cap->run_queue_tl = prev;
825 } else {
826 setTSOPrev(cap, t, prev);
827 }
828 if (prev == END_TSO_QUEUE) {
829 cap->run_queue_hd = t;
830 } else {
831 setTSOLink(cap, prev, t);
832 }
833 cap->n_run_queue = n;
834
835 IF_DEBUG(sanity, checkRunQueue(cap));
836
837 // release the capabilities
838 for (i = 0; i < n_free_caps; i++) {
839 task->cap = free_caps[i];
840 if (sparkPoolSizeCap(cap) > 0) {
841 // If we have sparks to steal, wake up a worker on the
842 // capability, even if it has no threads to run.
843 releaseAndWakeupCapability(free_caps[i]);
844 } else {
845 releaseCapability(free_caps[i]);
846 }
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850
851 #endif /* THREADED_RTS */
852
853 }
854
855 /* ----------------------------------------------------------------------------
856 * Start any pending signal handlers
857 * ------------------------------------------------------------------------- */
858
859 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
860 static void
861 scheduleStartSignalHandlers(Capability *cap)
862 {
863 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
864 // safe outside the lock
865 startSignalHandlers(cap);
866 }
867 }
868 #else
869 static void
870 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
871 {
872 }
873 #endif
874
875 /* ----------------------------------------------------------------------------
876 * Check for blocked threads that can be woken up.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
881 {
882 #if !defined(THREADED_RTS)
883 //
884 // Check whether any waiting threads need to be woken up. If the
885 // run queue is empty, and there are no other tasks running, we
886 // can wait indefinitely for something to happen.
887 //
888 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
889 {
890 awaitEvent (emptyRunQueue(cap));
891 }
892 #endif
893 }
894
895 /* ----------------------------------------------------------------------------
896 * Detect deadlock conditions and attempt to resolve them.
897 * ------------------------------------------------------------------------- */
898
899 static void
900 scheduleDetectDeadlock (Capability **pcap, Task *task)
901 {
902 Capability *cap = *pcap;
903 /*
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
907 * some description.
908 */
909 if ( emptyThreadQueues(cap) )
910 {
911 #if defined(THREADED_RTS)
912 /*
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
917 */
918 if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
922
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
927 // runnable.
928 scheduleDoGC (pcap, task, true/*force major GC*/);
929 cap = *pcap;
930 // when force_major == true. scheduleDoGC sets
931 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 // signal.
933
934 if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
937 /* If we have user-installed signal handlers, then wait
938 * for signals to arrive rather then bombing out with a
939 * deadlock.
940 */
941 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
942 debugTrace(DEBUG_sched,
943 "still deadlocked, waiting for signals...");
944
945 awaitUserSignals();
946
947 if (signals_pending()) {
948 startSignalHandlers(cap);
949 }
950
951 // either we have threads to run, or we were interrupted:
952 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
953
954 return;
955 }
956 #endif
957
958 #if !defined(THREADED_RTS)
959 /* Probably a real deadlock. Send the current main thread the
960 * Deadlock exception.
961 */
962 if (task->incall->tso) {
963 switch (task->incall->tso->why_blocked) {
964 case BlockedOnSTM:
965 case BlockedOnBlackHole:
966 case BlockedOnMsgThrowTo:
967 case BlockedOnMVar:
968 case BlockedOnMVarRead:
969 throwToSingleThreaded(cap, task->incall->tso,
970 (StgClosure *)nonTermination_closure);
971 return;
972 default:
973 barf("deadlock: main thread blocked in a strange way");
974 }
975 }
976 return;
977 #endif
978 }
979 }
980
981
982 /* ----------------------------------------------------------------------------
983 * Process message in the current Capability's inbox
984 * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
988 {
989 #if defined(THREADED_RTS)
990 Message *m, *next;
991 PutMVar *p, *pnext;
992 int r;
993 Capability *cap = *pcap;
994
995 while (!emptyInbox(cap)) {
996 // Executing messages might use heap, so we should check for GC.
997 if (doYouWantToGC(cap)) {
998 scheduleDoGC(pcap, cap->running_task, false);
999 cap = *pcap;
1000 }
1001
1002 // don't use a blocking acquire; if the lock is held by
1003 // another thread then just carry on. This seems to avoid
1004 // getting stuck in a message ping-pong situation with other
1005 // processors. We'll check the inbox again later anyway.
1006 //
1007 // We should really use a more efficient queue data structure
1008 // here. The trickiness is that we must ensure a Capability
1009 // never goes idle if the inbox is non-empty, which is why we
1010 // use cap->lock (cap->lock is released as the last thing
1011 // before going idle; see Capability.c:releaseCapability()).
1012 r = TRY_ACQUIRE_LOCK(&cap->lock);
1013 if (r != 0) return;
1014
1015 m = cap->inbox;
1016 p = cap->putMVars;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018 cap->putMVars = NULL;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027
1028 while (p != NULL) {
1029 pnext = p->link;
1030 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1031 Unit_closure);
1032 freeStablePtr(p->mvar);
1033 stgFree(p);
1034 p = pnext;
1035 }
1036 }
1037 #endif
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042 * Activate spark threads (THREADED_RTS)
1043 * ------------------------------------------------------------------------- */
1044
1045 #if defined(THREADED_RTS)
1046 static void
1047 scheduleActivateSpark(Capability *cap)
1048 {
1049 if (anySparks() && !cap->disabled)
1050 {
1051 createSparkThread(cap);
1052 debugTrace(DEBUG_sched, "creating a spark thread");
1053 }
1054 }
1055 #endif // THREADED_RTS
1056
1057 /* ----------------------------------------------------------------------------
1058 * After running a thread...
1059 * ------------------------------------------------------------------------- */
1060
1061 static void
1062 schedulePostRunThread (Capability *cap, StgTSO *t)
1063 {
1064 // We have to be able to catch transactions that are in an
1065 // infinite loop as a result of seeing an inconsistent view of
1066 // memory, e.g.
1067 //
1068 // atomically $ do
1069 // [a,b] <- mapM readTVar [ta,tb]
1070 // when (a == b) loop
1071 //
1072 // and a is never equal to b given a consistent view of memory.
1073 //
1074 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1075 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1076 debugTrace(DEBUG_sched | DEBUG_stm,
1077 "trec %p found wasting its time", t);
1078
1079 // strip the stack back to the
1080 // ATOMICALLY_FRAME, aborting the (nested)
1081 // transaction, and saving the stack of any
1082 // partially-evaluated thunks on the heap.
1083 throwToSingleThreaded_(cap, t, NULL, true);
1084
1085 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 }
1087 }
1088
1089 //
1090 // If the current thread's allocation limit has run out, send it
1091 // the AllocationLimitExceeded exception.
1092
1093 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1094 // Use a throwToSelf rather than a throwToSingleThreaded, because
1095 // it correctly handles the case where the thread is currently
1096 // inside mask. Also the thread might be blocked (e.g. on an
1097 // MVar), and throwToSingleThreaded doesn't unblock it
1098 // correctly in that case.
1099 throwToSelf(cap, t, allocationLimitExceeded_closure);
1100 ASSIGN_Int64((W_*)&(t->alloc_limit),
1101 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1102 }
1103
1104 /* some statistics gathering in the parallel case */
1105 }
1106
1107 /* -----------------------------------------------------------------------------
1108 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1109 * -------------------------------------------------------------------------- */
1110
1111 static bool
1112 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1113 {
1114 if (cap->r.rHpLim == NULL || cap->context_switch) {
1115 // Sometimes we miss a context switch, e.g. when calling
1116 // primitives in a tight loop, MAYBE_GC() doesn't check the
1117 // context switch flag, and we end up waiting for a GC.
1118 // See #1984, and concurrent/should_run/1984
1119 cap->context_switch = 0;
1120 appendToRunQueue(cap,t);
1121 } else {
1122 pushOnRunQueue(cap,t);
1123 }
1124
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1127 // if so, get one and push it on the front of the nursery.
1128 bdescr *bd;
1129 W_ blocks;
1130
1131 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1132
1133 if (blocks > BLOCKS_PER_MBLOCK) {
1134 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1135 }
1136
1137 debugTrace(DEBUG_sched,
1138 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1139 (long)t->id, what_next_strs[t->what_next], blocks);
1140
1141 // don't do this if the nursery is (nearly) full, we'll GC first.
1142 if (cap->r.rCurrentNursery->link != NULL ||
1143 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1144 // infinite loop if the
1145 // nursery has only one
1146 // block.
1147
1148 bd = allocGroupOnNode_lock(cap->node,blocks);
1149 cap->r.rNursery->n_blocks += blocks;
1150
1151 // link the new group after CurrentNursery
1152 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1153
1154 // initialise it as a nursery block. We initialise the
1155 // step, gen_no, and flags field of *every* sub-block in
1156 // this large block, because this is easier than making
1157 // sure that we always find the block head of a large
1158 // block whenever we call Bdescr() (eg. evacuate() and
1159 // isAlive() in the GC would both have to do this, at
1160 // least).
1161 {
1162 bdescr *x;
1163 for (x = bd; x < bd + blocks; x++) {
1164 initBdescr(x,g0,g0);
1165 x->free = x->start;
1166 x->flags = 0;
1167 }
1168 }
1169
1170 // This assert can be a killer if the app is doing lots
1171 // of large block allocations.
1172 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1173
1174 // now update the nursery to point to the new block
1175 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1176 cap->r.rCurrentNursery = bd;
1177
1178 // we might be unlucky and have another thread get on the
1179 // run queue before us and steal the large block, but in that
1180 // case the thread will just end up requesting another large
1181 // block.
1182 return false; /* not actually GC'ing */
1183 }
1184 }
1185
1186 return doYouWantToGC(cap);
1187 /* actual GC is done at the end of the while loop in schedule() */
1188 }
1189
1190 /* -----------------------------------------------------------------------------
1191 * Handle a thread that returned to the scheduler with ThreadYielding
1192 * -------------------------------------------------------------------------- */
1193
1194 static bool
1195 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1196 {
1197 /* put the thread back on the run queue. Then, if we're ready to
1198 * GC, check whether this is the last task to stop. If so, wake
1199 * up the GC thread. getThread will block during a GC until the
1200 * GC is finished.
1201 */
1202
1203 ASSERT(t->_link == END_TSO_QUEUE);
1204
1205 // Shortcut if we're just switching evaluators: just run the thread. See
1206 // Note [avoiding threadPaused] in Interpreter.c.
1207 if (t->what_next != prev_what_next) {
1208 debugTrace(DEBUG_sched,
1209 "--<< thread %ld (%s) stopped to switch evaluators",
1210 (long)t->id, what_next_strs[t->what_next]);
1211 return true;
1212 }
1213
1214 // Reset the context switch flag. We don't do this just before
1215 // running the thread, because that would mean we would lose ticks
1216 // during GC, which can lead to unfair scheduling (a thread hogs
1217 // the CPU because the tick always arrives during GC). This way
1218 // penalises threads that do a lot of allocation, but that seems
1219 // better than the alternative.
1220 if (cap->context_switch != 0) {
1221 cap->context_switch = 0;
1222 appendToRunQueue(cap,t);
1223 } else {
1224 pushOnRunQueue(cap,t);
1225 }
1226
1227 IF_DEBUG(sanity,
1228 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1229 checkTSO(t));
1230
1231 return false;
1232 }
1233
1234 /* -----------------------------------------------------------------------------
1235 * Handle a thread that returned to the scheduler with ThreadBlocked
1236 * -------------------------------------------------------------------------- */
1237
1238 static void
1239 scheduleHandleThreadBlocked( StgTSO *t
1240 #if !defined(DEBUG)
1241 STG_UNUSED
1242 #endif
1243 )
1244 {
1245
1246 // We don't need to do anything. The thread is blocked, and it
1247 // has tidied up its stack and placed itself on whatever queue
1248 // it needs to be on.
1249
1250 // ASSERT(t->why_blocked != NotBlocked);
1251 // Not true: for example,
1252 // - the thread may have woken itself up already, because
1253 // threadPaused() might have raised a blocked throwTo
1254 // exception, see maybePerformBlockedException().
1255
1256 #if defined(DEBUG)
1257 traceThreadStatus(DEBUG_sched, t);
1258 #endif
1259 }
1260
1261 /* -----------------------------------------------------------------------------
1262 * Handle a thread that returned to the scheduler with ThreadFinished
1263 * -------------------------------------------------------------------------- */
1264
1265 static bool
1266 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1267 {
1268 /* Need to check whether this was a main thread, and if so,
1269 * return with the return value.
1270 *
1271 * We also end up here if the thread kills itself with an
1272 * uncaught exception, see Exception.cmm.
1273 */
1274
1275 // blocked exceptions can now complete, even if the thread was in
1276 // blocked mode (see #2910).
1277 awakenBlockedExceptionQueue (cap, t);
1278
1279 //
1280 // Check whether the thread that just completed was a bound
1281 // thread, and if so return with the result.
1282 //
1283 // There is an assumption here that all thread completion goes
1284 // through this point; we need to make sure that if a thread
1285 // ends up in the ThreadKilled state, that it stays on the run
1286 // queue so it can be dealt with here.
1287 //
1288
1289 if (t->bound) {
1290
1291 if (t->bound != task->incall) {
1292 #if !defined(THREADED_RTS)
1293 // Must be a bound thread that is not the topmost one. Leave
1294 // it on the run queue until the stack has unwound to the
1295 // point where we can deal with this. Leaving it on the run
1296 // queue also ensures that the garbage collector knows about
1297 // this thread and its return value (it gets dropped from the
1298 // step->threads list so there's no other way to find it).
1299 appendToRunQueue(cap,t);
1300 return false;
1301 #else
1302 // this cannot happen in the threaded RTS, because a
1303 // bound thread can only be run by the appropriate Task.
1304 barf("finished bound thread that isn't mine");
1305 #endif
1306 }
1307
1308 ASSERT(task->incall->tso == t);
1309
1310 if (t->what_next == ThreadComplete) {
1311 if (task->incall->ret) {
1312 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1313 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1314 }
1315 task->incall->rstat = Success;
1316 } else {
1317 if (task->incall->ret) {
1318 *(task->incall->ret) = NULL;
1319 }
1320 if (sched_state >= SCHED_INTERRUPTING) {
1321 if (heap_overflow) {
1322 task->incall->rstat = HeapExhausted;
1323 } else {
1324 task->incall->rstat = Interrupted;
1325 }
1326 } else {
1327 task->incall->rstat = Killed;
1328 }
1329 }
1330 #if defined(DEBUG)
1331 removeThreadLabel((StgWord)task->incall->tso->id);
1332 #endif
1333
1334 // We no longer consider this thread and task to be bound to
1335 // each other. The TSO lives on until it is GC'd, but the
1336 // task is about to be released by the caller, and we don't
1337 // want anyone following the pointer from the TSO to the
1338 // defunct task (which might have already been
1339 // re-used). This was a real bug: the GC updated
1340 // tso->bound->tso which lead to a deadlock.
1341 t->bound = NULL;
1342 task->incall->tso = NULL;
1343
1344 return true; // tells schedule() to return
1345 }
1346
1347 return false;
1348 }
1349
1350 /* -----------------------------------------------------------------------------
1351 * Perform a heap census
1352 * -------------------------------------------------------------------------- */
1353
1354 static bool
1355 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1356 {
1357 // When we have +RTS -i0 and we're heap profiling, do a census at
1358 // every GC. This lets us get repeatable runs for debugging.
1359 if (performHeapProfile ||
1360 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1361 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1362 return true;
1363 } else {
1364 return false;
1365 }
1366 }
1367
1368 /* -----------------------------------------------------------------------------
1369 * stopAllCapabilities()
1370 *
1371 * Stop all Haskell execution. This is used when we need to make some global
1372 * change to the system, such as altering the number of capabilities, or
1373 * forking.
1374 *
1375 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1376 * -------------------------------------------------------------------------- */
1377
1378 #if defined(THREADED_RTS)
1379 static void stopAllCapabilities (Capability **pCap, Task *task)
1380 {
1381 bool was_syncing;
1382 SyncType prev_sync_type;
1383
1384 PendingSync sync = {
1385 .type = SYNC_OTHER,
1386 .idle = NULL,
1387 .task = task
1388 };
1389
1390 do {
1391 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1392 } while (was_syncing);
1393
1394 acquireAllCapabilities(*pCap,task);
1395
1396 pending_sync = 0;
1397 }
1398 #endif
1399
1400 /* -----------------------------------------------------------------------------
1401 * requestSync()
1402 *
1403 * Commence a synchronisation between all capabilities. Normally not called
1404 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1405 * has some special synchronisation requirements.
1406 *
1407 * Returns:
1408 * false if we successfully got a sync
1409 * true if there was another sync request in progress,
1410 * and we yielded to it. The value returned is the
1411 * type of the other sync request.
1412 * -------------------------------------------------------------------------- */
1413
1414 #if defined(THREADED_RTS)
1415 static bool requestSync (
1416 Capability **pcap, Task *task, PendingSync *new_sync,
1417 SyncType *prev_sync_type)
1418 {
1419 PendingSync *sync;
1420
1421 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1422 (StgWord)NULL,
1423 (StgWord)new_sync);
1424
1425 if (sync != NULL)
1426 {
1427 // sync is valid until we have called yieldCapability().
1428 // After the sync is completed, we cannot read that struct any
1429 // more because it has been freed.
1430 *prev_sync_type = sync->type;
1431 do {
1432 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1433 sync->type);
1434 ASSERT(*pcap);
1435 yieldCapability(pcap,task,true);
1436 sync = pending_sync;
1437 } while (sync != NULL);
1438
1439 // NOTE: task->cap might have changed now
1440 return true;
1441 }
1442 else
1443 {
1444 return false;
1445 }
1446 }
1447 #endif
1448
1449 /* -----------------------------------------------------------------------------
1450 * acquireAllCapabilities()
1451 *
1452 * Grab all the capabilities except the one we already hold. Used
1453 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1454 * before a fork (SYNC_OTHER).
1455 *
1456 * Only call this after requestSync(), otherwise a deadlock might
1457 * ensue if another thread is trying to synchronise.
1458 * -------------------------------------------------------------------------- */
1459
1460 #if defined(THREADED_RTS)
1461 static void acquireAllCapabilities(Capability *cap, Task *task)
1462 {
1463 Capability *tmpcap;
1464 uint32_t i;
1465
1466 ASSERT(pending_sync != NULL);
1467 for (i=0; i < n_capabilities; i++) {
1468 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1469 i, n_capabilities);
1470 tmpcap = capabilities[i];
1471 if (tmpcap != cap) {
1472 // we better hope this task doesn't get migrated to
1473 // another Capability while we're waiting for this one.
1474 // It won't, because load balancing happens while we have
1475 // all the Capabilities, but even so it's a slightly
1476 // unsavoury invariant.
1477 task->cap = tmpcap;
1478 waitForCapability(&tmpcap, task);
1479 if (tmpcap->no != i) {
1480 barf("acquireAllCapabilities: got the wrong capability");
1481 }
1482 }
1483 }
1484 task->cap = cap;
1485 }
1486 #endif
1487
1488 /* -----------------------------------------------------------------------------
1489 * releaseAllcapabilities()
1490 *
1491 * Assuming this thread holds all the capabilities, release them all except for
1492 * the one passed in as cap.
1493 * -------------------------------------------------------------------------- */
1494
1495 #if defined(THREADED_RTS)
1496 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1497 {
1498 uint32_t i;
1499
1500 for (i = 0; i < n; i++) {
1501 if (cap->no != i) {
1502 task->cap = capabilities[i];
1503 releaseCapability(capabilities[i]);
1504 }
1505 }
1506 task->cap = cap;
1507 }
1508 #endif
1509
1510 /* -----------------------------------------------------------------------------
1511 * Perform a garbage collection if necessary
1512 * -------------------------------------------------------------------------- */
1513
1514 static void
1515 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1516 bool force_major)
1517 {
1518 Capability *cap = *pcap;
1519 bool heap_census;
1520 uint32_t collect_gen;
1521 bool major_gc;
1522 #if defined(THREADED_RTS)
1523 uint32_t gc_type;
1524 uint32_t i;
1525 uint32_t need_idle;
1526 uint32_t n_gc_threads;
1527 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1528 StgTSO *tso;
1529 bool *idle_cap;
1530 // idle_cap is an array (allocated later) of size n_capabilities, where
1531 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1532 // cycle.
1533 #endif
1534
1535 if (sched_state == SCHED_SHUTTING_DOWN) {
1536 // The final GC has already been done, and the system is
1537 // shutting down. We'll probably deadlock if we try to GC
1538 // now.
1539 return;
1540 }
1541
1542 heap_census = scheduleNeedHeapProfile(true);
1543
1544 // Figure out which generation we are collecting, so that we can
1545 // decide whether this is a parallel GC or not.
1546 collect_gen = calcNeeded(force_major || heap_census, NULL);
1547 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1548
1549 #if defined(THREADED_RTS)
1550 if (sched_state < SCHED_INTERRUPTING
1551 && RtsFlags.ParFlags.parGcEnabled
1552 && collect_gen >= RtsFlags.ParFlags.parGcGen
1553 && ! oldest_gen->mark)
1554 {
1555 gc_type = SYNC_GC_PAR;
1556 } else {
1557 gc_type = SYNC_GC_SEQ;
1558 }
1559
1560 // In order to GC, there must be no threads running Haskell code.
1561 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1562 // capabilities, and release them after the GC has completed. For parallel
1563 // GC, we synchronise all the running threads using requestSync().
1564 //
1565 // Other capabilities are prevented from running yet more Haskell threads if
1566 // pending_sync is set. Tested inside yieldCapability() and
1567 // releaseCapability() in Capability.c
1568
1569 PendingSync sync = {
1570 .type = gc_type,
1571 .idle = NULL,
1572 .task = task
1573 };
1574
1575 {
1576 SyncType prev_sync = 0;
1577 bool was_syncing;
1578 do {
1579 // If -qn is not set and we have more capabilities than cores, set
1580 // the number of GC threads to #cores. We do this here rather than
1581 // in normaliseRtsOpts() because here it will work if the program
1582 // calls setNumCapabilities.
1583 //
1584 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1585 if (n_gc_threads == 0 &&
1586 enabled_capabilities > getNumberOfProcessors()) {
1587 n_gc_threads = getNumberOfProcessors();
1588 }
1589
1590 // This calculation must be inside the loop because
1591 // enabled_capabilities may change if requestSync() below fails and
1592 // we retry.
1593 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1594 if (n_gc_threads >= enabled_capabilities) {
1595 need_idle = 0;
1596 } else {
1597 need_idle = enabled_capabilities - n_gc_threads;
1598 }
1599 } else {
1600 need_idle = 0;
1601 }
1602
1603 // We need an array of size n_capabilities, but since this may
1604 // change each time around the loop we must allocate it afresh.
1605 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1606 sizeof(bool),
1607 "scheduleDoGC");
1608 sync.idle = idle_cap;
1609
1610 // When using +RTS -qn, we need some capabilities to be idle during
1611 // GC. The best bet is to choose some inactive ones, so we look for
1612 // those first:
1613 uint32_t n_idle = need_idle;
1614 for (i=0; i < n_capabilities; i++) {
1615 if (capabilities[i]->disabled) {
1616 idle_cap[i] = true;
1617 } else if (n_idle > 0 &&
1618 capabilities[i]->running_task == NULL) {
1619 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1620 n_idle--;
1621 idle_cap[i] = true;
1622 } else {
1623 idle_cap[i] = false;
1624 }
1625 }
1626 // If we didn't find enough inactive capabilities, just pick some
1627 // more to be idle.
1628 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1629 if (!idle_cap[i] && i != cap->no) {
1630 idle_cap[i] = true;
1631 n_idle--;
1632 }
1633 }
1634 ASSERT(n_idle == 0);
1635
1636 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1637 cap = *pcap;
1638 if (was_syncing) {
1639 stgFree(idle_cap);
1640 }
1641 if (was_syncing &&
1642 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1643 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1644 // someone else had a pending sync request for a GC, so
1645 // let's assume GC has been done and we don't need to GC
1646 // again.
1647 // Exception to this: if SCHED_INTERRUPTING, then we still
1648 // need to do the final GC.
1649 return;
1650 }
1651 if (sched_state == SCHED_SHUTTING_DOWN) {
1652 // The scheduler might now be shutting down. We tested
1653 // this above, but it might have become true since then as
1654 // we yielded the capability in requestSync().
1655 return;
1656 }
1657 } while (was_syncing);
1658 }
1659
1660 stat_startGCSync(gc_threads[cap->no]);
1661
1662 #if defined(DEBUG)
1663 unsigned int old_n_capabilities = n_capabilities;
1664 #endif
1665
1666 interruptAllCapabilities();
1667
1668 // The final shutdown GC is always single-threaded, because it's
1669 // possible that some of the Capabilities have no worker threads.
1670
1671 if (gc_type == SYNC_GC_SEQ) {
1672 traceEventRequestSeqGc(cap);
1673 } else {
1674 traceEventRequestParGc(cap);
1675 }
1676
1677 if (gc_type == SYNC_GC_SEQ) {
1678 // single-threaded GC: grab all the capabilities
1679 acquireAllCapabilities(cap,task);
1680 }
1681 else
1682 {
1683 // If we are load-balancing collections in this
1684 // generation, then we require all GC threads to participate
1685 // in the collection. Otherwise, we only require active
1686 // threads to participate, and we set gc_threads[i]->idle for
1687 // any idle capabilities. The rationale here is that waking
1688 // up an idle Capability takes much longer than just doing any
1689 // GC work on its behalf.
1690
1691 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1692 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1693 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1694 {
1695 for (i=0; i < n_capabilities; i++) {
1696 if (capabilities[i]->disabled) {
1697 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1698 if (idle_cap[i]) {
1699 n_idle_caps++;
1700 }
1701 } else {
1702 if (i != cap->no && idle_cap[i]) {
1703 Capability *tmpcap = capabilities[i];
1704 task->cap = tmpcap;
1705 waitForCapability(&tmpcap, task);
1706 n_idle_caps++;
1707 }
1708 }
1709 }
1710 }
1711 else
1712 {
1713 for (i=0; i < n_capabilities; i++) {
1714 if (capabilities[i]->disabled) {
1715 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1716 if (idle_cap[i]) {
1717 n_idle_caps++;
1718 }
1719 } else if (i != cap->no &&
1720 capabilities[i]->idle >=
1721 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1722 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1723 if (idle_cap[i]) {
1724 n_idle_caps++;
1725 } else {
1726 n_failed_trygrab_idles++;
1727 }
1728 }
1729 }
1730 }
1731 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1732
1733 for (i=0; i < n_capabilities; i++) {
1734 capabilities[i]->idle++;
1735 }
1736
1737 // For all capabilities participating in this GC, wait until
1738 // they have stopped mutating and are standing by for GC.
1739 waitForGcThreads(cap, idle_cap);
1740
1741 #if defined(THREADED_RTS)
1742 // Stable point where we can do a global check on our spark counters
1743 ASSERT(checkSparkCountInvariant());
1744 #endif
1745 }
1746
1747 #endif
1748
1749 IF_DEBUG(scheduler, printAllThreads());
1750
1751 delete_threads_and_gc:
1752 /*
1753 * We now have all the capabilities; if we're in an interrupting
1754 * state, then we should take the opportunity to delete all the
1755 * threads in the system.
1756 * Checking for major_gc ensures that the last GC is major.
1757 */
1758 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1759 deleteAllThreads(cap);
1760 #if defined(THREADED_RTS)
1761 // Discard all the sparks from every Capability. Why?
1762 // They'll probably be GC'd anyway since we've killed all the
1763 // threads. It just avoids the GC having to do any work to
1764 // figure out that any remaining sparks are garbage.
1765 for (i = 0; i < n_capabilities; i++) {
1766 capabilities[i]->spark_stats.gcd +=
1767 sparkPoolSize(capabilities[i]->sparks);
1768 // No race here since all Caps are stopped.
1769 discardSparksCap(capabilities[i]);
1770 }
1771 #endif
1772 sched_state = SCHED_SHUTTING_DOWN;
1773 }
1774
1775 /*
1776 * When there are disabled capabilities, we want to migrate any
1777 * threads away from them. Normally this happens in the
1778 * scheduler's loop, but only for unbound threads - it's really
1779 * hard for a bound thread to migrate itself. So we have another
1780 * go here.
1781 */
1782 #if defined(THREADED_RTS)
1783 for (i = enabled_capabilities; i < n_capabilities; i++) {
1784 Capability *tmp_cap, *dest_cap;
1785 tmp_cap = capabilities[i];
1786 ASSERT(tmp_cap->disabled);
1787 if (i != cap->no) {
1788 dest_cap = capabilities[i % enabled_capabilities];
1789 while (!emptyRunQueue(tmp_cap)) {
1790 tso = popRunQueue(tmp_cap);
1791 migrateThread(tmp_cap, tso, dest_cap);
1792 if (tso->bound) {
1793 traceTaskMigrate(tso->bound->task,
1794 tso->bound->task->cap,
1795 dest_cap);
1796 tso->bound->task->cap = dest_cap;
1797 }
1798 }
1799 }
1800 }
1801 #endif
1802
1803 #if defined(THREADED_RTS)
1804 // reset pending_sync *before* GC, so that when the GC threads
1805 // emerge they don't immediately re-enter the GC.
1806 pending_sync = 0;
1807 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1808 #else
1809 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1810 #endif
1811
1812 traceSparkCounters(cap);
1813
1814 switch (recent_activity) {
1815 case ACTIVITY_INACTIVE:
1816 if (force_major) {
1817 // We are doing a GC because the system has been idle for a
1818 // timeslice and we need to check for deadlock. Record the
1819 // fact that we've done a GC and turn off the timer signal;
1820 // it will get re-enabled if we run any threads after the GC.
1821 recent_activity = ACTIVITY_DONE_GC;
1822 #if !defined(PROFILING)
1823 stopTimer();
1824 #endif
1825 break;
1826 }
1827 // fall through...
1828
1829 case ACTIVITY_MAYBE_NO:
1830 // the GC might have taken long enough for the timer to set
1831 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1832 // but we aren't necessarily deadlocked:
1833 recent_activity = ACTIVITY_YES;
1834 break;
1835
1836 case ACTIVITY_DONE_GC:
1837 // If we are actually active, the scheduler will reset the
1838 // recent_activity flag and re-enable the timer.
1839 break;
1840 }
1841
1842 #if defined(THREADED_RTS)
1843 // Stable point where we can do a global check on our spark counters
1844 ASSERT(checkSparkCountInvariant());
1845 #endif
1846
1847 // The heap census itself is done during GarbageCollect().
1848 if (heap_census) {
1849 performHeapProfile = false;
1850 }
1851
1852 #if defined(THREADED_RTS)
1853
1854 // If n_capabilities has changed during GC, we're in trouble.
1855 ASSERT(n_capabilities == old_n_capabilities);
1856
1857 if (gc_type == SYNC_GC_PAR)
1858 {
1859 for (i = 0; i < n_capabilities; i++) {
1860 if (i != cap->no) {
1861 if (idle_cap[i]) {
1862 ASSERT(capabilities[i]->running_task == task);
1863 task->cap = capabilities[i];
1864 releaseCapability(capabilities[i]);
1865 } else {
1866 ASSERT(capabilities[i]->running_task != task);
1867 }
1868 }
1869 }
1870 task->cap = cap;
1871
1872 // releaseGCThreads() happens *after* we have released idle
1873 // capabilities. Otherwise what can happen is one of the released
1874 // threads starts a new GC, and finds that it can't acquire some of
1875 // the disabled capabilities, because the previous GC still holds
1876 // them, so those disabled capabilities will not be idle during the
1877 // next GC round. However, if we release the capabilities first,
1878 // then they will be free (because they're disabled) when the next
1879 // GC cycle happens.
1880 releaseGCThreads(cap, idle_cap);
1881 }
1882 #endif
1883 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1884 // GC set the heap_overflow flag. We should throw an exception if we
1885 // can, or shut down otherwise.
1886
1887 // Get the thread to which Ctrl-C is thrown
1888 StgTSO *main_thread = getTopHandlerThread();
1889 if (main_thread == NULL) {
1890 // GC set the heap_overflow flag, and there is no main thread to
1891 // throw an exception to, so we should proceed with an orderly
1892 // shutdown now. Ultimately we want the main thread to return to
1893 // its caller with HeapExhausted, at which point the caller should
1894 // call hs_exit(). The first step is to delete all the threads.
1895 sched_state = SCHED_INTERRUPTING;
1896 goto delete_threads_and_gc;
1897 }
1898
1899 heap_overflow = false;
1900 const uint64_t allocation_count = getAllocations();
1901 if (RtsFlags.GcFlags.heapLimitGrace <
1902 allocation_count - allocated_bytes_at_heapoverflow ||
1903 allocated_bytes_at_heapoverflow == 0) {
1904 allocated_bytes_at_heapoverflow = allocation_count;
1905 // We used to simply exit, but throwing an exception gives the
1906 // program a chance to clean up. It also lets the exception be
1907 // caught.
1908
1909 // FIXME this is not a good way to tell a program to release
1910 // resources. It is neither reliable (the RTS crashes if it fails
1911 // to allocate memory from the OS) nor very usable (it is always
1912 // thrown to the main thread, which might not be able to do anything
1913 // useful with it). We really should have a more general way to
1914 // release resources in low-memory conditions. Nevertheless, this
1915 // is still a big improvement over just exiting.
1916
1917 // FIXME again: perhaps we should throw a synchronous exception
1918 // instead an asynchronous one, or have a way for the program to
1919 // register a handler to be called when heap overflow happens.
1920 throwToSelf(cap, main_thread, heapOverflow_closure);
1921 }
1922 }
1923 #if defined(SPARKBALANCE)
1924 /* JB
1925 Once we are all together... this would be the place to balance all
1926 spark pools. No concurrent stealing or adding of new sparks can
1927 occur. Should be defined in Sparks.c. */
1928 balanceSparkPoolsCaps(n_capabilities, capabilities);
1929 #endif
1930
1931 #if defined(THREADED_RTS)
1932 stgFree(idle_cap);
1933
1934 if (gc_type == SYNC_GC_SEQ) {
1935 // release our stash of capabilities.
1936 releaseAllCapabilities(n_capabilities, cap, task);
1937 }
1938 #endif
1939
1940 return;
1941 }
1942
1943 /* ---------------------------------------------------------------------------
1944 * Singleton fork(). Do not copy any running threads.
1945 * ------------------------------------------------------------------------- */
1946
1947 pid_t
1948 forkProcess(HsStablePtr *entry
1949 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1950 STG_UNUSED
1951 #endif
1952 )
1953 {
1954 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1955 pid_t pid;
1956 StgTSO* t,*next;
1957 Capability *cap;
1958 uint32_t g;
1959 Task *task = NULL;
1960 uint32_t i;
1961
1962 debugTrace(DEBUG_sched, "forking!");
1963
1964 task = newBoundTask();
1965
1966 cap = NULL;
1967 waitForCapability(&cap, task);
1968
1969 #if defined(THREADED_RTS)
1970 stopAllCapabilities(&cap, task);
1971 #endif
1972
1973 // no funny business: hold locks while we fork, otherwise if some
1974 // other thread is holding a lock when the fork happens, the data
1975 // structure protected by the lock will forever be in an
1976 // inconsistent state in the child. See also #1391.
1977 ACQUIRE_LOCK(&sched_mutex);
1978 ACQUIRE_LOCK(&sm_mutex);
1979 ACQUIRE_LOCK(&stable_mutex);
1980 ACQUIRE_LOCK(&task->lock);
1981
1982 for (i=0; i < n_capabilities; i++) {
1983 ACQUIRE_LOCK(&capabilities[i]->lock);
1984 }
1985
1986 #if defined(THREADED_RTS)
1987 ACQUIRE_LOCK(&all_tasks_mutex);
1988 #endif
1989
1990 stopTimer(); // See #4074
1991
1992 #if defined(TRACING)
1993 flushEventLog(); // so that child won't inherit dirty file buffers
1994 #endif
1995
1996 pid = fork();
1997
1998 if (pid) { // parent
1999
2000 startTimer(); // #4074
2001
2002 RELEASE_LOCK(&sched_mutex);
2003 RELEASE_LOCK(&sm_mutex);
2004 RELEASE_LOCK(&stable_mutex);
2005 RELEASE_LOCK(&task->lock);
2006
2007 #if defined(THREADED_RTS)
2008 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2009 RELEASE_LOCK(&all_tasks_mutex);
2010 #endif
2011
2012 for (i=0; i < n_capabilities; i++) {
2013 releaseCapability_(capabilities[i],false);
2014 RELEASE_LOCK(&capabilities[i]->lock);
2015 }
2016
2017 boundTaskExiting(task);
2018
2019 // just return the pid
2020 return pid;
2021
2022 } else { // child
2023
2024 #if defined(THREADED_RTS)
2025 initMutex(&sched_mutex);
2026 initMutex(&sm_mutex);
2027 initMutex(&stable_mutex);
2028 initMutex(&task->lock);
2029
2030 for (i=0; i < n_capabilities; i++) {
2031 initMutex(&capabilities[i]->lock);
2032 }
2033
2034 initMutex(&all_tasks_mutex);
2035 #endif
2036
2037 #if defined(TRACING)
2038 resetTracing();
2039 #endif
2040
2041 // Now, all OS threads except the thread that forked are
2042 // stopped. We need to stop all Haskell threads, including
2043 // those involved in foreign calls. Also we need to delete
2044 // all Tasks, because they correspond to OS threads that are
2045 // now gone.
2046
2047 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2048 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2049 next = t->global_link;
2050 // don't allow threads to catch the ThreadKilled
2051 // exception, but we do want to raiseAsync() because these
2052 // threads may be evaluating thunks that we need later.
2053 deleteThread_(t->cap,t);
2054
2055 // stop the GC from updating the InCall to point to
2056 // the TSO. This is only necessary because the
2057 // OSThread bound to the TSO has been killed, and
2058 // won't get a chance to exit in the usual way (see
2059 // also scheduleHandleThreadFinished).
2060 t->bound = NULL;
2061 }
2062 }
2063
2064 discardTasksExcept(task);
2065
2066 for (i=0; i < n_capabilities; i++) {
2067 cap = capabilities[i];
2068
2069 // Empty the run queue. It seems tempting to let all the
2070 // killed threads stay on the run queue as zombies to be
2071 // cleaned up later, but some of them may correspond to
2072 // bound threads for which the corresponding Task does not
2073 // exist.
2074 truncateRunQueue(cap);
2075 cap->n_run_queue = 0;
2076
2077 // Any suspended C-calling Tasks are no more, their OS threads
2078 // don't exist now:
2079 cap->suspended_ccalls = NULL;
2080 cap->n_suspended_ccalls = 0;
2081
2082 #if defined(THREADED_RTS)
2083 // Wipe our spare workers list, they no longer exist. New
2084 // workers will be created if necessary.
2085 cap->spare_workers = NULL;
2086 cap->n_spare_workers = 0;
2087 cap->returning_tasks_hd = NULL;
2088 cap->returning_tasks_tl = NULL;
2089 cap->n_returning_tasks = 0;
2090 #endif
2091
2092 // Release all caps except 0, we'll use that for starting
2093 // the IO manager and running the client action below.
2094 if (cap->no != 0) {
2095 task->cap = cap;
2096 releaseCapability(cap);
2097 }
2098 }
2099 cap = capabilities[0];
2100 task->cap = cap;
2101
2102 // Empty the threads lists. Otherwise, the garbage
2103 // collector may attempt to resurrect some of these threads.
2104 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2105 generations[g].threads = END_TSO_QUEUE;
2106 }
2107
2108 // On Unix, all timers are reset in the child, so we need to start
2109 // the timer again.
2110 initTimer();
2111 startTimer();
2112
2113 // TODO: need to trace various other things in the child
2114 // like startup event, capabilities, process info etc
2115 traceTaskCreate(task, cap);
2116
2117 #if defined(THREADED_RTS)
2118 ioManagerStartCap(&cap);
2119 #endif
2120
2121 // Install toplevel exception handlers, so interruption
2122 // signal will be sent to the main thread.
2123 // See Trac #12903
2124 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2125 rts_checkSchedStatus("forkProcess",cap);
2126
2127 rts_unlock(cap);
2128 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2129 }
2130 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2131 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2132 #endif
2133 }
2134
2135 /* ---------------------------------------------------------------------------
2136 * Changing the number of Capabilities
2137 *
2138 * Changing the number of Capabilities is very tricky! We can only do
2139 * it with the system fully stopped, so we do a full sync with
2140 * requestSync(SYNC_OTHER) and grab all the capabilities.
2141 *
2142 * Then we resize the appropriate data structures, and update all
2143 * references to the old data structures which have now moved.
2144 * Finally we release the Capabilities we are holding, and start
2145 * worker Tasks on the new Capabilities we created.
2146 *
2147 * ------------------------------------------------------------------------- */
2148
2149 void
2150 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2151 {
2152 #if !defined(THREADED_RTS)
2153 if (new_n_capabilities != 1) {
2154 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2155 }
2156 return;
2157 #elif defined(NOSMP)
2158 if (new_n_capabilities != 1) {
2159 errorBelch("setNumCapabilities: not supported on this platform");
2160 }
2161 return;
2162 #else
2163 Task *task;
2164 Capability *cap;
2165 uint32_t n;
2166 Capability *old_capabilities = NULL;
2167 uint32_t old_n_capabilities = n_capabilities;
2168
2169 if (new_n_capabilities == enabled_capabilities) {
2170 return;
2171 } else if (new_n_capabilities <= 0) {
2172 errorBelch("setNumCapabilities: Capability count must be positive");
2173 return;
2174 }
2175
2176
2177 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2178 enabled_capabilities, new_n_capabilities);
2179
2180 cap = rts_lock();
2181 task = cap->running_task;
2182
2183 stopAllCapabilities(&cap, task);
2184
2185 if (new_n_capabilities < enabled_capabilities)
2186 {
2187 // Reducing the number of capabilities: we do not actually
2188 // remove the extra capabilities, we just mark them as
2189 // "disabled". This has the following effects:
2190 //
2191 // - threads on a disabled capability are migrated away by the
2192 // scheduler loop
2193 //
2194 // - disabled capabilities do not participate in GC
2195 // (see scheduleDoGC())
2196 //
2197 // - No spark threads are created on this capability
2198 // (see scheduleActivateSpark())
2199 //
2200 // - We do not attempt to migrate threads *to* a disabled
2201 // capability (see schedulePushWork()).
2202 //
2203 // but in other respects, a disabled capability remains
2204 // alive. Threads may be woken up on a disabled capability,
2205 // but they will be immediately migrated away.
2206 //
2207 // This approach is much easier than trying to actually remove
2208 // the capability; we don't have to worry about GC data
2209 // structures, the nursery, etc.
2210 //
2211 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2212 capabilities[n]->disabled = true;
2213 traceCapDisable(capabilities[n]);
2214 }
2215 enabled_capabilities = new_n_capabilities;
2216 }
2217 else
2218 {
2219 // Increasing the number of enabled capabilities.
2220 //
2221 // enable any disabled capabilities, up to the required number
2222 for (n = enabled_capabilities;
2223 n < new_n_capabilities && n < n_capabilities; n++) {
2224 capabilities[n]->disabled = false;
2225 traceCapEnable(capabilities[n]);
2226 }
2227 enabled_capabilities = n;
2228
2229 if (new_n_capabilities > n_capabilities) {
2230 #if defined(TRACING)
2231 // Allocate eventlog buffers for the new capabilities. Note this
2232 // must be done before calling moreCapabilities(), because that
2233 // will emit events about creating the new capabilities and adding
2234 // them to existing capsets.
2235 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2236 #endif
2237
2238 // Resize the capabilities array
2239 // NB. after this, capabilities points somewhere new. Any pointers
2240 // of type (Capability *) are now invalid.
2241 moreCapabilities(n_capabilities, new_n_capabilities);
2242
2243 // Resize and update storage manager data structures
2244 storageAddCapabilities(n_capabilities, new_n_capabilities);
2245 }
2246 }
2247
2248 // update n_capabilities before things start running
2249 if (new_n_capabilities > n_capabilities) {
2250 n_capabilities = enabled_capabilities = new_n_capabilities;
2251 }
2252
2253 // We're done: release the original Capabilities
2254 releaseAllCapabilities(old_n_capabilities, cap,task);
2255
2256 // We can't free the old array until now, because we access it
2257 // while updating pointers in updateCapabilityRefs().
2258 if (old_capabilities) {
2259 stgFree(old_capabilities);
2260 }
2261
2262 // Notify IO manager that the number of capabilities has changed.
2263 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2264
2265 rts_unlock(cap);
2266
2267 #endif // THREADED_RTS
2268 }
2269
2270
2271
2272 /* ---------------------------------------------------------------------------
2273 * Delete all the threads in the system
2274 * ------------------------------------------------------------------------- */
2275
2276 static void
2277 deleteAllThreads ( Capability *cap )
2278 {
2279 // NOTE: only safe to call if we own all capabilities.
2280
2281 StgTSO* t, *next;
2282 uint32_t g;
2283
2284 debugTrace(DEBUG_sched,"deleting all threads");
2285 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2286 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2287 next = t->global_link;
2288 deleteThread(cap,t);
2289 }
2290 }
2291
2292 // The run queue now contains a bunch of ThreadKilled threads. We
2293 // must not throw these away: the main thread(s) will be in there
2294 // somewhere, and the main scheduler loop has to deal with it.
2295 // Also, the run queue is the only thing keeping these threads from
2296 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2297
2298 #if !defined(THREADED_RTS)
2299 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2300 ASSERT(sleeping_queue == END_TSO_QUEUE);
2301 #endif
2302 }
2303
2304 /* -----------------------------------------------------------------------------
2305 Managing the suspended_ccalls list.
2306 Locks required: sched_mutex
2307 -------------------------------------------------------------------------- */
2308
2309 STATIC_INLINE void
2310 suspendTask (Capability *cap, Task *task)
2311 {
2312 InCall *incall;
2313
2314 incall = task->incall;
2315 ASSERT(incall->next == NULL && incall->prev == NULL);
2316 incall->next = cap->suspended_ccalls;
2317 incall->prev = NULL;
2318 if (cap->suspended_ccalls) {
2319 cap->suspended_ccalls->prev = incall;
2320 }
2321 cap->suspended_ccalls = incall;
2322 cap->n_suspended_ccalls++;
2323 }
2324
2325 STATIC_INLINE void
2326 recoverSuspendedTask (Capability *cap, Task *task)
2327 {
2328 InCall *incall;
2329
2330 incall = task->incall;
2331 if (incall->prev) {
2332 incall->prev->next = incall->next;
2333 } else {
2334 ASSERT(cap->suspended_ccalls == incall);
2335 cap->suspended_ccalls = incall->next;
2336 }
2337 if (incall->next) {
2338 incall->next->prev = incall->prev;
2339 }
2340 incall->next = incall->prev = NULL;
2341 cap->n_suspended_ccalls--;
2342 }
2343
2344 /* ---------------------------------------------------------------------------
2345 * Suspending & resuming Haskell threads.
2346 *
2347 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2348 * its capability before calling the C function. This allows another
2349 * task to pick up the capability and carry on running Haskell
2350 * threads. It also means that if the C call blocks, it won't lock
2351 * the whole system.
2352 *
2353 * The Haskell thread making the C call is put to sleep for the
2354 * duration of the call, on the suspended_ccalling_threads queue. We
2355 * give out a token to the task, which it can use to resume the thread
2356 * on return from the C function.
2357 *
2358 * If this is an interruptible C call, this means that the FFI call may be
2359 * unceremoniously terminated and should be scheduled on an
2360 * unbound worker thread.
2361 * ------------------------------------------------------------------------- */
2362
2363 void *
2364 suspendThread (StgRegTable *reg, bool interruptible)
2365 {
2366 Capability *cap;
2367 int saved_errno;
2368 StgTSO *tso;
2369 Task *task;
2370 #if defined(mingw32_HOST_OS)
2371 StgWord32 saved_winerror;
2372 #endif
2373
2374 saved_errno = errno;
2375 #if defined(mingw32_HOST_OS)
2376 saved_winerror = GetLastError();
2377 #endif
2378
2379 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2380 */
2381 cap = regTableToCapability(reg);
2382
2383 task = cap->running_task;
2384 tso = cap->r.rCurrentTSO;
2385
2386 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2387
2388 // XXX this might not be necessary --SDM
2389 tso->what_next = ThreadRunGHC;
2390
2391 threadPaused(cap,tso);
2392
2393 if (interruptible) {
2394 tso->why_blocked = BlockedOnCCall_Interruptible;
2395 } else {
2396 tso->why_blocked = BlockedOnCCall;
2397 }
2398
2399 // Hand back capability
2400 task->incall->suspended_tso = tso;
2401 task->incall->suspended_cap = cap;
2402
2403 // Otherwise allocate() will write to invalid memory.
2404 cap->r.rCurrentTSO = NULL;
2405
2406 ACQUIRE_LOCK(&cap->lock);
2407
2408 suspendTask(cap,task);
2409 cap->in_haskell = false;
2410 releaseCapability_(cap,false);
2411
2412 RELEASE_LOCK(&cap->lock);
2413
2414 errno = saved_errno;
2415 #if defined(mingw32_HOST_OS)
2416 SetLastError(saved_winerror);
2417 #endif
2418 return task;
2419 }
2420
2421 StgRegTable *
2422 resumeThread (void *task_)
2423 {
2424 StgTSO *tso;
2425 InCall *incall;
2426 Capability *cap;
2427 Task *task = task_;
2428 int saved_errno;
2429 #if defined(mingw32_HOST_OS)
2430 StgWord32 saved_winerror;
2431 #endif
2432
2433 saved_errno = errno;
2434 #if defined(mingw32_HOST_OS)
2435 saved_winerror = GetLastError();
2436 #endif
2437
2438 incall = task->incall;
2439 cap = incall->suspended_cap;
2440 task->cap = cap;
2441
2442 // Wait for permission to re-enter the RTS with the result.
2443 waitForCapability(&cap,task);
2444 // we might be on a different capability now... but if so, our
2445 // entry on the suspended_ccalls list will also have been
2446 // migrated.
2447
2448 // Remove the thread from the suspended list
2449 recoverSuspendedTask(cap,task);
2450
2451 tso = incall->suspended_tso;
2452 incall->suspended_tso = NULL;
2453 incall->suspended_cap = NULL;
2454 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2455
2456 traceEventRunThread(cap, tso);
2457
2458 /* Reset blocking status */
2459 tso->why_blocked = NotBlocked;
2460
2461 if ((tso->flags & TSO_BLOCKEX) == 0) {
2462 // avoid locking the TSO if we don't have to
2463 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2464 maybePerformBlockedException(cap,tso);
2465 }
2466 }
2467
2468 cap->r.rCurrentTSO = tso;
2469 cap->in_haskell = true;
2470 errno = saved_errno;
2471 #if defined(mingw32_HOST_OS)
2472 SetLastError(saved_winerror);
2473 #endif
2474
2475 /* We might have GC'd, mark the TSO dirty again */
2476 dirty_TSO(cap,tso);
2477 dirty_STACK(cap,tso->stackobj);
2478
2479 IF_DEBUG(sanity, checkTSO(tso));
2480
2481 return &cap->r;
2482 }
2483
2484 /* ---------------------------------------------------------------------------
2485 * scheduleThread()
2486 *
2487 * scheduleThread puts a thread on the end of the runnable queue.
2488 * This will usually be done immediately after a thread is created.
2489 * The caller of scheduleThread must create the thread using e.g.
2490 * createThread and push an appropriate closure
2491 * on this thread's stack before the scheduler is invoked.
2492 * ------------------------------------------------------------------------ */
2493
2494 void
2495 scheduleThread(Capability *cap, StgTSO *tso)
2496 {
2497 // The thread goes at the *end* of the run-queue, to avoid possible
2498 // starvation of any threads already on the queue.
2499 appendToRunQueue(cap,tso);
2500 }
2501
2502 void
2503 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2504 {
2505 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2506 // move this thread from now on.
2507 #if defined(THREADED_RTS)
2508 cpu %= enabled_capabilities;
2509 if (cpu == cap->no) {
2510 appendToRunQueue(cap,tso);
2511 } else {
2512 migrateThread(cap, tso, capabilities[cpu]);
2513 }
2514 #else
2515 appendToRunQueue(cap,tso);
2516 #endif
2517 }
2518
2519 void
2520 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2521 {
2522 Task *task;
2523 DEBUG_ONLY( StgThreadID id );
2524 Capability *cap;
2525
2526 cap = *pcap;
2527
2528 // We already created/initialised the Task
2529 task = cap->running_task;
2530
2531 // This TSO is now a bound thread; make the Task and TSO
2532 // point to each other.
2533 tso->bound = task->incall;
2534 tso->cap = cap;
2535
2536 task->incall->tso = tso;
2537 task->incall->ret = ret;
2538 task->incall->rstat = NoStatus;
2539
2540 appendToRunQueue(cap,tso);
2541
2542 DEBUG_ONLY( id = tso->id );
2543 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2544
2545 cap = schedule(cap,task);
2546
2547 ASSERT(task->incall->rstat != NoStatus);
2548 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2549
2550 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2551 *pcap = cap;
2552 }
2553
2554 /* ----------------------------------------------------------------------------
2555 * Starting Tasks
2556 * ------------------------------------------------------------------------- */
2557
2558 #if defined(THREADED_RTS)
2559 void scheduleWorker (Capability *cap, Task *task)
2560 {
2561 // schedule() runs without a lock.
2562 cap = schedule(cap,task);
2563
2564 // On exit from schedule(), we have a Capability, but possibly not
2565 // the same one we started with.
2566
2567 // During shutdown, the requirement is that after all the
2568 // Capabilities are shut down, all workers that are shutting down
2569 // have finished workerTaskStop(). This is why we hold on to
2570 // cap->lock until we've finished workerTaskStop() below.
2571 //
2572 // There may be workers still involved in foreign calls; those
2573 // will just block in waitForCapability() because the
2574 // Capability has been shut down.
2575 //
2576 ACQUIRE_LOCK(&cap->lock);
2577 releaseCapability_(cap,false);
2578 workerTaskStop(task);
2579 RELEASE_LOCK(&cap->lock);
2580 }
2581 #endif
2582
2583 /* ---------------------------------------------------------------------------
2584 * Start new worker tasks on Capabilities from--to
2585 * -------------------------------------------------------------------------- */
2586
2587 static void
2588 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2589 {
2590 #if defined(THREADED_RTS)
2591 uint32_t i;
2592 Capability *cap;
2593
2594 for (i = from; i < to; i++) {
2595 cap = capabilities[i];
2596 ACQUIRE_LOCK(&cap->lock);
2597 startWorkerTask(cap);
2598 RELEASE_LOCK(&cap->lock);
2599 }
2600 #endif
2601 }
2602
2603 /* ---------------------------------------------------------------------------
2604 * initScheduler()
2605 *
2606 * Initialise the scheduler. This resets all the queues - if the
2607 * queues contained any threads, they'll be garbage collected at the
2608 * next pass.
2609 *
2610 * ------------------------------------------------------------------------ */
2611
2612 void
2613 initScheduler(void)
2614 {
2615 #if !defined(THREADED_RTS)
2616 blocked_queue_hd = END_TSO_QUEUE;
2617 blocked_queue_tl = END_TSO_QUEUE;
2618 sleeping_queue = END_TSO_QUEUE;
2619 #endif
2620
2621 sched_state = SCHED_RUNNING;
2622 recent_activity = ACTIVITY_YES;
2623
2624 #if defined(THREADED_RTS)
2625 /* Initialise the mutex and condition variables used by
2626 * the scheduler. */
2627 initMutex(&sched_mutex);
2628 #endif
2629
2630 ACQUIRE_LOCK(&sched_mutex);
2631
2632 allocated_bytes_at_heapoverflow = 0;
2633
2634 /* A capability holds the state a native thread needs in
2635 * order to execute STG code. At least one capability is
2636 * floating around (only THREADED_RTS builds have more than one).
2637 */
2638 initCapabilities();
2639
2640 initTaskManager();
2641
2642 /*
2643 * Eagerly start one worker to run each Capability, except for
2644 * Capability 0. The idea is that we're probably going to start a
2645 * bound thread on Capability 0 pretty soon, so we don't want a
2646 * worker task hogging it.
2647 */
2648 startWorkerTasks(1, n_capabilities);
2649
2650 RELEASE_LOCK(&sched_mutex);
2651
2652 }
2653
2654 void
2655 exitScheduler (bool wait_foreign USED_IF_THREADS)
2656 /* see Capability.c, shutdownCapability() */
2657 {
2658 Task *task = NULL;
2659
2660 task = newBoundTask();
2661
2662 // If we haven't killed all the threads yet, do it now.
2663 if (sched_state < SCHED_SHUTTING_DOWN) {
2664 sched_state = SCHED_INTERRUPTING;
2665 Capability *cap = task->cap;
2666 waitForCapability(&cap,task);
2667 scheduleDoGC(&cap,task,true);
2668 ASSERT(task->incall->tso == NULL);
2669 releaseCapability(cap);
2670 }
2671 sched_state = SCHED_SHUTTING_DOWN;
2672
2673 shutdownCapabilities(task, wait_foreign);
2674
2675 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2676 // n_failed_trygrab_idles, n_idle_caps);
2677
2678 boundTaskExiting(task);
2679 }
2680
2681 void
2682 freeScheduler( void )
2683 {
2684 uint32_t still_running;
2685
2686 ACQUIRE_LOCK(&sched_mutex);
2687 still_running = freeTaskManager();
2688 // We can only free the Capabilities if there are no Tasks still
2689 // running. We might have a Task about to return from a foreign
2690 // call into waitForCapability(), for example (actually,
2691 // this should be the *only* thing that a still-running Task can
2692 // do at this point, and it will block waiting for the
2693 // Capability).
2694 if (still_running == 0) {
2695 freeCapabilities();
2696 }
2697 RELEASE_LOCK(&sched_mutex);
2698 #if defined(THREADED_RTS)
2699 closeMutex(&sched_mutex);
2700 #endif
2701 }
2702
2703 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2704 void *user USED_IF_NOT_THREADS)
2705 {
2706 #if !defined(THREADED_RTS)
2707 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2708 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2709 evac(user, (StgClosure **)(void *)&sleeping_queue);
2710 #endif
2711 }
2712
2713 /* -----------------------------------------------------------------------------
2714 performGC
2715
2716 This is the interface to the garbage collector from Haskell land.
2717 We provide this so that external C code can allocate and garbage
2718 collect when called from Haskell via _ccall_GC.
2719 -------------------------------------------------------------------------- */
2720
2721 static void
2722 performGC_(bool force_major)
2723 {
2724 Task *task;
2725 Capability *cap = NULL;
2726
2727 // We must grab a new Task here, because the existing Task may be
2728 // associated with a particular Capability, and chained onto the
2729 // suspended_ccalls queue.
2730 task = newBoundTask();
2731
2732 // TODO: do we need to traceTask*() here?
2733
2734 waitForCapability(&cap,task);
2735 scheduleDoGC(&cap,task,force_major);
2736 releaseCapability(cap);
2737 boundTaskExiting(task);
2738 }
2739
2740 void
2741 performGC(void)
2742 {
2743 performGC_(false);
2744 }
2745
2746 void
2747 performMajorGC(void)
2748 {
2749 performGC_(true);
2750 }
2751
2752 /* ---------------------------------------------------------------------------
2753 Interrupt execution.
2754 Might be called inside a signal handler so it mustn't do anything fancy.
2755 ------------------------------------------------------------------------ */
2756
2757 void
2758 interruptStgRts(void)
2759 {
2760 sched_state = SCHED_INTERRUPTING;
2761 interruptAllCapabilities();
2762 #if defined(THREADED_RTS)
2763 wakeUpRts();
2764 #endif
2765 }
2766
2767 /* -----------------------------------------------------------------------------
2768 Wake up the RTS
2769
2770 This function causes at least one OS thread to wake up and run the
2771 scheduler loop. It is invoked when the RTS might be deadlocked, or
2772 an external event has arrived that may need servicing (eg. a
2773 keyboard interrupt).
2774
2775 In the single-threaded RTS we don't do anything here; we only have
2776 one thread anyway, and the event that caused us to want to wake up
2777 will have interrupted any blocking system call in progress anyway.
2778 -------------------------------------------------------------------------- */
2779
2780 #if defined(THREADED_RTS)
2781 void wakeUpRts(void)
2782 {
2783 // This forces the IO Manager thread to wakeup, which will
2784 // in turn ensure that some OS thread wakes up and runs the
2785 // scheduler loop, which will cause a GC and deadlock check.
2786 ioManagerWakeup();
2787 }
2788 #endif
2789
2790 /* -----------------------------------------------------------------------------
2791 Deleting threads
2792
2793 This is used for interruption (^C) and forking, and corresponds to
2794 raising an exception but without letting the thread catch the
2795 exception.
2796 -------------------------------------------------------------------------- */
2797
2798 static void
2799 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2800 {
2801 // NOTE: must only be called on a TSO that we have exclusive
2802 // access to, because we will call throwToSingleThreaded() below.
2803 // The TSO must be on the run queue of the Capability we own, or
2804 // we must own all Capabilities.
2805
2806 if (tso->why_blocked != BlockedOnCCall &&
2807 tso->why_blocked != BlockedOnCCall_Interruptible) {
2808 throwToSingleThreaded(tso->cap,tso,NULL);
2809 }
2810 }
2811
2812 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2813 static void
2814 deleteThread_(Capability *cap, StgTSO *tso)
2815 { // for forkProcess only:
2816 // like deleteThread(), but we delete threads in foreign calls, too.
2817
2818 if (tso->why_blocked == BlockedOnCCall ||
2819 tso->why_blocked == BlockedOnCCall_Interruptible) {
2820 tso->what_next = ThreadKilled;
2821 appendToRunQueue(tso->cap, tso);
2822 } else {
2823 deleteThread(cap,tso);
2824 }
2825 }
2826 #endif
2827
2828 /* -----------------------------------------------------------------------------
2829 raiseExceptionHelper
2830
2831 This function is called by the raise# primitive, just so that we can
2832 move some of the tricky bits of raising an exception from C-- into
2833 C. Who knows, it might be a useful re-useable thing here too.
2834 -------------------------------------------------------------------------- */
2835
2836 StgWord
2837 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2838 {
2839 Capability *cap = regTableToCapability(reg);
2840 StgThunk *raise_closure = NULL;
2841 StgPtr p, next;
2842 const StgRetInfoTable *info;
2843 //
2844 // This closure represents the expression 'raise# E' where E
2845 // is the exception raise. It is used to overwrite all the
2846 // thunks which are currently under evaluation.
2847 //
2848
2849 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2850 // LDV profiling: stg_raise_info has THUNK as its closure
2851 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2852 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2853 // 1 does not cause any problem unless profiling is performed.
2854 // However, when LDV profiling goes on, we need to linearly scan
2855 // small object pool, where raise_closure is stored, so we should
2856 // use MIN_UPD_SIZE.
2857 //
2858 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2859 // sizeofW(StgClosure)+1);
2860 //
2861
2862 //
2863 // Walk up the stack, looking for the catch frame. On the way,
2864 // we update any closures pointed to from update frames with the
2865 // raise closure that we just built.
2866 //
2867 p = tso->stackobj->sp;
2868 while(1) {
2869 info = get_ret_itbl((StgClosure *)p);
2870 next = p + stack_frame_sizeW((StgClosure *)p);
2871 switch (info->i.type) {
2872
2873 case UPDATE_FRAME:
2874 // Only create raise_closure if we need to.
2875 if (raise_closure == NULL) {
2876 raise_closure =
2877 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2878 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2879 raise_closure->payload[0] = exception;
2880 }
2881 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2882 (StgClosure *)raise_closure);
2883 p = next;
2884 continue;
2885
2886 case ATOMICALLY_FRAME:
2887 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2888 tso->stackobj->sp = p;
2889 return ATOMICALLY_FRAME;
2890
2891 case CATCH_FRAME:
2892 tso->stackobj->sp = p;
2893 return CATCH_FRAME;
2894
2895 case CATCH_STM_FRAME:
2896 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2897 tso->stackobj->sp = p;
2898 return CATCH_STM_FRAME;
2899
2900 case UNDERFLOW_FRAME:
2901 tso->stackobj->sp = p;
2902 threadStackUnderflow(cap,tso);
2903 p = tso->stackobj->sp;
2904 continue;
2905
2906 case STOP_FRAME:
2907 tso->stackobj->sp = p;
2908 return STOP_FRAME;
2909
2910 case CATCH_RETRY_FRAME: {
2911 StgTRecHeader *trec = tso -> trec;
2912 StgTRecHeader *outer = trec -> enclosing_trec;
2913 debugTrace(DEBUG_stm,
2914 "found CATCH_RETRY_FRAME at %p during raise", p);
2915 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2916 stmAbortTransaction(cap, trec);
2917 stmFreeAbortedTRec(cap, trec);
2918 tso -> trec = outer;
2919 p = next;
2920 continue;
2921 }
2922
2923 default:
2924 p = next;
2925 continue;
2926 }
2927 }
2928 }
2929
2930
2931 /* -----------------------------------------------------------------------------
2932 findRetryFrameHelper
2933
2934 This function is called by the retry# primitive. It traverses the stack
2935 leaving tso->sp referring to the frame which should handle the retry.
2936
2937 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2938 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2939
2940 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2941 create) because retries are not considered to be exceptions, despite the
2942 similar implementation.
2943
2944 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2945 not be created within memory transactions.
2946 -------------------------------------------------------------------------- */
2947
2948 StgWord
2949 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2950 {
2951 const StgRetInfoTable *info;
2952 StgPtr p, next;
2953
2954 p = tso->stackobj->sp;
2955 while (1) {
2956 info = get_ret_itbl((const StgClosure *)p);
2957 next = p + stack_frame_sizeW((StgClosure *)p);
2958 switch (info->i.type) {
2959
2960 case ATOMICALLY_FRAME:
2961 debugTrace(DEBUG_stm,
2962 "found ATOMICALLY_FRAME at %p during retry", p);
2963 tso->stackobj->sp = p;
2964 return ATOMICALLY_FRAME;
2965
2966 case CATCH_RETRY_FRAME:
2967 debugTrace(DEBUG_stm,
2968 "found CATCH_RETRY_FRAME at %p during retry", p);
2969 tso->stackobj->sp = p;
2970 return CATCH_RETRY_FRAME;
2971
2972 case CATCH_STM_FRAME: {
2973 StgTRecHeader *trec = tso -> trec;
2974 StgTRecHeader *outer = trec -> enclosing_trec;
2975 debugTrace(DEBUG_stm,
2976 "found CATCH_STM_FRAME at %p during retry", p);
2977 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2978 stmAbortTransaction(cap, trec);
2979 stmFreeAbortedTRec(cap, trec);
2980 tso -> trec = outer;
2981 p = next;
2982 continue;
2983 }
2984
2985 case UNDERFLOW_FRAME:
2986 tso->stackobj->sp = p;
2987 threadStackUnderflow(cap,tso);
2988 p = tso->stackobj->sp;
2989 continue;
2990
2991 default:
2992 ASSERT(info->i.type != CATCH_FRAME);
2993 ASSERT(info->i.type != STOP_FRAME);
2994 p = next;
2995 continue;
2996 }
2997 }
2998 }
2999
3000 /* -----------------------------------------------------------------------------
3001 resurrectThreads is called after garbage collection on the list of
3002 threads found to be garbage. Each of these threads will be woken
3003 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3004 on an MVar, or NonTermination if the thread was blocked on a Black
3005 Hole.
3006
3007 Locks: assumes we hold *all* the capabilities.
3008 -------------------------------------------------------------------------- */
3009
3010 void
3011 resurrectThreads (StgTSO *threads)
3012 {
3013 StgTSO *tso, *next;
3014 Capability *cap;
3015 generation *gen;
3016
3017 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3018 next = tso->global_link;
3019
3020 gen = Bdescr((P_)tso)->gen;
3021 tso->global_link = gen->threads;
3022 gen->threads = tso;
3023
3024 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3025
3026 // Wake up the thread on the Capability it was last on
3027 cap = tso->cap;
3028
3029 switch (tso->why_blocked) {
3030 case BlockedOnMVar:
3031 case BlockedOnMVarRead:
3032 /* Called by GC - sched_mutex lock is currently held. */
3033 throwToSingleThreaded(cap, tso,
3034 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3035 break;
3036 case BlockedOnBlackHole:
3037 throwToSingleThreaded(cap, tso,
3038 (StgClosure *)nonTermination_closure);
3039 break;
3040 case BlockedOnSTM:
3041 throwToSingleThreaded(cap, tso,
3042 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3043 break;
3044 case NotBlocked:
3045 /* This might happen if the thread was blocked on a black hole
3046 * belonging to a thread that we've just woken up (raiseAsync
3047 * can wake up threads, remember...).
3048 */
3049 continue;
3050 case BlockedOnMsgThrowTo:
3051 // This can happen if the target is masking, blocks on a
3052 // black hole, and then is found to be unreachable. In
3053 // this case, we want to let the target wake up and carry
3054 // on, and do nothing to this thread.
3055 continue;
3056 default:
3057 barf("resurrectThreads: thread blocked in a strange way: %d",
3058 tso->why_blocked);
3059 }
3060 }
3061 }