Use https links in user-facing startup and error messages
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47
48 #if defined(HAVE_SYS_TYPES_H)
49 #include <sys/types.h>
50 #endif
51 #if defined(HAVE_UNISTD_H)
52 #include <unistd.h>
53 #endif
54
55 #include <string.h>
56 #include <stdlib.h>
57 #include <stdarg.h>
58
59 #if defined(HAVE_ERRNO_H)
60 #include <errno.h>
61 #endif
62
63 #if defined(TRACING)
64 #include "eventlog/EventLog.h"
65 #endif
66 /* -----------------------------------------------------------------------------
67 * Global variables
68 * -------------------------------------------------------------------------- */
69
70 #if !defined(THREADED_RTS)
71 // Blocked/sleeping threads
72 StgTSO *blocked_queue_hd = NULL;
73 StgTSO *blocked_queue_tl = NULL;
74 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
75 #endif
76
77 // Bytes allocated since the last time a HeapOverflow exception was thrown by
78 // the RTS
79 uint64_t allocated_bytes_at_heapoverflow = 0;
80
81 /* Set to true when the latest garbage collection failed to reclaim enough
82 * space, and the runtime should proceed to shut itself down in an orderly
83 * fashion (emitting profiling info etc.), OR throw an exception to the main
84 * thread, if it is still alive.
85 */
86 bool heap_overflow = false;
87
88 /* flag that tracks whether we have done any execution in this time slice.
89 * LOCK: currently none, perhaps we should lock (but needs to be
90 * updated in the fast path of the scheduler).
91 *
92 * NB. must be StgWord, we do xchg() on it.
93 */
94 volatile StgWord recent_activity = ACTIVITY_YES;
95
96 /* if this flag is set as well, give up execution
97 * LOCK: none (changes monotonically)
98 */
99 volatile StgWord sched_state = SCHED_RUNNING;
100
101 /*
102 * This mutex protects most of the global scheduler data in
103 * the THREADED_RTS runtime.
104 */
105 #if defined(THREADED_RTS)
106 Mutex sched_mutex;
107 #endif
108
109 #if !defined(mingw32_HOST_OS)
110 #define FORKPROCESS_PRIMOP_SUPPORTED
111 #endif
112
113 /* -----------------------------------------------------------------------------
114 * static function prototypes
115 * -------------------------------------------------------------------------- */
116
117 static Capability *schedule (Capability *initialCapability, Task *task);
118
119 //
120 // These functions all encapsulate parts of the scheduler loop, and are
121 // abstracted only to make the structure and control flow of the
122 // scheduler clearer.
123 //
124 static void schedulePreLoop (void);
125 static void scheduleFindWork (Capability **pcap);
126 #if defined(THREADED_RTS)
127 static void scheduleYield (Capability **pcap, Task *task);
128 #endif
129 #if defined(THREADED_RTS)
130 static bool requestSync (Capability **pcap, Task *task,
131 PendingSync *sync_type, SyncType *prev_sync_type);
132 static void acquireAllCapabilities(Capability *cap, Task *task);
133 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
134 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
135 uint32_t to USED_IF_THREADS);
136 #endif
137 static void scheduleStartSignalHandlers (Capability *cap);
138 static void scheduleCheckBlockedThreads (Capability *cap);
139 static void scheduleProcessInbox(Capability **cap);
140 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
141 static void schedulePushWork(Capability *cap, Task *task);
142 #if defined(THREADED_RTS)
143 static void scheduleActivateSpark(Capability *cap);
144 #endif
145 static void schedulePostRunThread(Capability *cap, StgTSO *t);
146 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
147 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
148 uint32_t prev_what_next );
149 static void scheduleHandleThreadBlocked( StgTSO *t );
150 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
151 StgTSO *t );
152 static bool scheduleNeedHeapProfile(bool ready_to_gc);
153 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
154
155 static void deleteThread (StgTSO *tso);
156 static void deleteAllThreads (void);
157
158 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
159 static void deleteThread_(StgTSO *tso);
160 #endif
161
162 /* ---------------------------------------------------------------------------
163 Main scheduling loop.
164
165 We use round-robin scheduling, each thread returning to the
166 scheduler loop when one of these conditions is detected:
167
168 * out of heap space
169 * timer expires (thread yields)
170 * thread blocks
171 * thread ends
172 * stack overflow
173
174 ------------------------------------------------------------------------ */
175
176 static Capability *
177 schedule (Capability *initialCapability, Task *task)
178 {
179 StgTSO *t;
180 Capability *cap;
181 StgThreadReturnCode ret;
182 uint32_t prev_what_next;
183 bool ready_to_gc;
184
185 cap = initialCapability;
186
187 // Pre-condition: this task owns initialCapability.
188 // The sched_mutex is *NOT* held
189 // NB. on return, we still hold a capability.
190
191 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
192
193 schedulePreLoop();
194
195 // -----------------------------------------------------------
196 // Scheduler loop starts here:
197
198 while (1) {
199
200 // Check whether we have re-entered the RTS from Haskell without
201 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
202 // call).
203 if (cap->in_haskell) {
204 errorBelch("schedule: re-entered unsafely.\n"
205 " Perhaps a 'foreign import unsafe' should be 'safe'?");
206 stg_exit(EXIT_FAILURE);
207 }
208
209 // Note [shutdown]: The interruption / shutdown sequence.
210 //
211 // In order to cleanly shut down the runtime, we want to:
212 // * make sure that all main threads return to their callers
213 // with the state 'Interrupted'.
214 // * clean up all OS threads assocated with the runtime
215 // * free all memory etc.
216 //
217 // So the sequence goes like this:
218 //
219 // * The shutdown sequence is initiated by calling hs_exit(),
220 // interruptStgRts(), or running out of memory in the GC.
221 //
222 // * Set sched_state = SCHED_INTERRUPTING
223 //
224 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
225 // scheduleDoGC(), which halts the whole runtime by acquiring all the
226 // capabilities, does a GC and then calls deleteAllThreads() to kill all
227 // the remaining threads. The zombies are left on the run queue for
228 // cleaning up. We can't kill threads involved in foreign calls.
229 //
230 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
231 //
232 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
233 // enforced by the scheduler, which won't run any Haskell code when
234 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
235 // other capabilities by doing the GC earlier.
236 //
237 // * all workers exit when the run queue on their capability
238 // drains. All main threads will also exit when their TSO
239 // reaches the head of the run queue and they can return.
240 //
241 // * eventually all Capabilities will shut down, and the RTS can
242 // exit.
243 //
244 // * We might be left with threads blocked in foreign calls,
245 // we should really attempt to kill these somehow (TODO).
246
247 switch (sched_state) {
248 case SCHED_RUNNING:
249 break;
250 case SCHED_INTERRUPTING:
251 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
252 /* scheduleDoGC() deletes all the threads */
253 scheduleDoGC(&cap,task,true);
254
255 // after scheduleDoGC(), we must be shutting down. Either some
256 // other Capability did the final GC, or we did it above,
257 // either way we can fall through to the SCHED_SHUTTING_DOWN
258 // case now.
259 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
260 // fall through
261
262 case SCHED_SHUTTING_DOWN:
263 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
264 // If we are a worker, just exit. If we're a bound thread
265 // then we will exit below when we've removed our TSO from
266 // the run queue.
267 if (!isBoundTask(task) && emptyRunQueue(cap)) {
268 return cap;
269 }
270 break;
271 default:
272 barf("sched_state: %" FMT_Word, sched_state);
273 }
274
275 scheduleFindWork(&cap);
276
277 /* work pushing, currently relevant only for THREADED_RTS:
278 (pushes threads, wakes up idle capabilities for stealing) */
279 schedulePushWork(cap,task);
280
281 scheduleDetectDeadlock(&cap,task);
282
283 // Normally, the only way we can get here with no threads to
284 // run is if a keyboard interrupt received during
285 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
286 // Additionally, it is not fatal for the
287 // threaded RTS to reach here with no threads to run.
288 //
289 // win32: might be here due to awaitEvent() being abandoned
290 // as a result of a console event having been delivered.
291
292 #if defined(THREADED_RTS)
293 scheduleYield(&cap,task);
294
295 if (emptyRunQueue(cap)) continue; // look for work again
296 #endif
297
298 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
299 if ( emptyRunQueue(cap) ) {
300 ASSERT(sched_state >= SCHED_INTERRUPTING);
301 }
302 #endif
303
304 //
305 // Get a thread to run
306 //
307 t = popRunQueue(cap);
308
309 // Sanity check the thread we're about to run. This can be
310 // expensive if there is lots of thread switching going on...
311 IF_DEBUG(sanity,checkTSO(t));
312
313 #if defined(THREADED_RTS)
314 // Check whether we can run this thread in the current task.
315 // If not, we have to pass our capability to the right task.
316 {
317 InCall *bound = t->bound;
318
319 if (bound) {
320 if (bound->task == task) {
321 // yes, the Haskell thread is bound to the current native thread
322 } else {
323 debugTrace(DEBUG_sched,
324 "thread %lu bound to another OS thread",
325 (unsigned long)t->id);
326 // no, bound to a different Haskell thread: pass to that thread
327 pushOnRunQueue(cap,t);
328 continue;
329 }
330 } else {
331 // The thread we want to run is unbound.
332 if (task->incall->tso) {
333 debugTrace(DEBUG_sched,
334 "this OS thread cannot run thread %lu",
335 (unsigned long)t->id);
336 // no, the current native thread is bound to a different
337 // Haskell thread, so pass it to any worker thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 }
342 }
343 #endif
344
345 // If we're shutting down, and this thread has not yet been
346 // killed, kill it now. This sometimes happens when a finalizer
347 // thread is created by the final GC, or a thread previously
348 // in a foreign call returns.
349 if (sched_state >= SCHED_INTERRUPTING &&
350 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
351 deleteThread(t);
352 }
353
354 // If this capability is disabled, migrate the thread away rather
355 // than running it. NB. but not if the thread is bound: it is
356 // really hard for a bound thread to migrate itself. Believe me,
357 // I tried several ways and couldn't find a way to do it.
358 // Instead, when everything is stopped for GC, we migrate all the
359 // threads on the run queue then (see scheduleDoGC()).
360 //
361 // ToDo: what about TSO_LOCKED? Currently we're migrating those
362 // when the number of capabilities drops, but we never migrate
363 // them back if it rises again. Presumably we should, but after
364 // the thread has been migrated we no longer know what capability
365 // it was originally on.
366 #if defined(THREADED_RTS)
367 if (cap->disabled && !t->bound) {
368 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
369 migrateThread(cap, t, dest_cap);
370 continue;
371 }
372 #endif
373
374 /* context switches are initiated by the timer signal, unless
375 * the user specified "context switch as often as possible", with
376 * +RTS -C0
377 */
378 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
379 && !emptyThreadQueues(cap)) {
380 cap->context_switch = 1;
381 }
382
383 run_thread:
384
385 // CurrentTSO is the thread to run. It might be different if we
386 // loop back to run_thread, so make sure to set CurrentTSO after
387 // that.
388 cap->r.rCurrentTSO = t;
389
390 startHeapProfTimer();
391
392 // ----------------------------------------------------------------------
393 // Run the current thread
394
395 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
396 ASSERT(t->cap == cap);
397 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
398
399 prev_what_next = t->what_next;
400
401 errno = t->saved_errno;
402 #if defined(mingw32_HOST_OS)
403 SetLastError(t->saved_winerror);
404 #endif
405
406 // reset the interrupt flag before running Haskell code
407 cap->interrupt = 0;
408
409 cap->in_haskell = true;
410 cap->idle = 0;
411
412 dirty_TSO(cap,t);
413 dirty_STACK(cap,t->stackobj);
414
415 switch (recent_activity)
416 {
417 case ACTIVITY_DONE_GC: {
418 // ACTIVITY_DONE_GC means we turned off the timer signal to
419 // conserve power (see #1623). Re-enable it here.
420 uint32_t prev;
421 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
422 if (prev == ACTIVITY_DONE_GC) {
423 #if !defined(PROFILING)
424 startTimer();
425 #endif
426 }
427 break;
428 }
429 case ACTIVITY_INACTIVE:
430 // If we reached ACTIVITY_INACTIVE, then don't reset it until
431 // we've done the GC. The thread running here might just be
432 // the IO manager thread that handle_tick() woke up via
433 // wakeUpRts().
434 break;
435 default:
436 recent_activity = ACTIVITY_YES;
437 }
438
439 traceEventRunThread(cap, t);
440
441 switch (prev_what_next) {
442
443 case ThreadKilled:
444 case ThreadComplete:
445 /* Thread already finished, return to scheduler. */
446 ret = ThreadFinished;
447 break;
448
449 case ThreadRunGHC:
450 {
451 StgRegTable *r;
452 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
453 cap = regTableToCapability(r);
454 ret = r->rRet;
455 break;
456 }
457
458 case ThreadInterpret:
459 cap = interpretBCO(cap);
460 ret = cap->r.rRet;
461 break;
462
463 default:
464 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
465 }
466
467 cap->in_haskell = false;
468
469 // The TSO might have moved, eg. if it re-entered the RTS and a GC
470 // happened. So find the new location:
471 t = cap->r.rCurrentTSO;
472
473 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
474 // don't want it set when not running a Haskell thread.
475 cap->r.rCurrentTSO = NULL;
476
477 // And save the current errno in this thread.
478 // XXX: possibly bogus for SMP because this thread might already
479 // be running again, see code below.
480 t->saved_errno = errno;
481 #if defined(mingw32_HOST_OS)
482 // Similarly for Windows error code
483 t->saved_winerror = GetLastError();
484 #endif
485
486 if (ret == ThreadBlocked) {
487 if (t->why_blocked == BlockedOnBlackHole) {
488 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
489 traceEventStopThread(cap, t, t->why_blocked + 6,
490 owner != NULL ? owner->id : 0);
491 } else {
492 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
493 }
494 } else {
495 if (ret == StackOverflow) {
496 traceEventStopThread(cap, t, ret, t->tot_stack_size);
497 } else {
498 traceEventStopThread(cap, t, ret, 0);
499 }
500 }
501
502 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
503 ASSERT(t->cap == cap);
504
505 // ----------------------------------------------------------------------
506
507 // Costs for the scheduler are assigned to CCS_SYSTEM
508 stopHeapProfTimer();
509 #if defined(PROFILING)
510 cap->r.rCCCS = CCS_SYSTEM;
511 #endif
512
513 schedulePostRunThread(cap,t);
514
515 ready_to_gc = false;
516
517 switch (ret) {
518 case HeapOverflow:
519 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
520 break;
521
522 case StackOverflow:
523 // just adjust the stack for this thread, then pop it back
524 // on the run queue.
525 threadStackOverflow(cap, t);
526 pushOnRunQueue(cap,t);
527 break;
528
529 case ThreadYielding:
530 if (scheduleHandleYield(cap, t, prev_what_next)) {
531 // shortcut for switching between compiler/interpreter:
532 goto run_thread;
533 }
534 break;
535
536 case ThreadBlocked:
537 scheduleHandleThreadBlocked(t);
538 break;
539
540 case ThreadFinished:
541 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
542 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
543 break;
544
545 default:
546 barf("schedule: invalid thread return code %d", (int)ret);
547 }
548
549 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
550 scheduleDoGC(&cap,task,false);
551 }
552 } /* end of while() */
553 }
554
555 /* -----------------------------------------------------------------------------
556 * Run queue operations
557 * -------------------------------------------------------------------------- */
558
559 static void
560 removeFromRunQueue (Capability *cap, StgTSO *tso)
561 {
562 if (tso->block_info.prev == END_TSO_QUEUE) {
563 ASSERT(cap->run_queue_hd == tso);
564 cap->run_queue_hd = tso->_link;
565 } else {
566 setTSOLink(cap, tso->block_info.prev, tso->_link);
567 }
568 if (tso->_link == END_TSO_QUEUE) {
569 ASSERT(cap->run_queue_tl == tso);
570 cap->run_queue_tl = tso->block_info.prev;
571 } else {
572 setTSOPrev(cap, tso->_link, tso->block_info.prev);
573 }
574 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
575 cap->n_run_queue--;
576
577 IF_DEBUG(sanity, checkRunQueue(cap));
578 }
579
580 void
581 promoteInRunQueue (Capability *cap, StgTSO *tso)
582 {
583 removeFromRunQueue(cap, tso);
584 pushOnRunQueue(cap, tso);
585 }
586
587 /* ----------------------------------------------------------------------------
588 * Setting up the scheduler loop
589 * ------------------------------------------------------------------------- */
590
591 static void
592 schedulePreLoop(void)
593 {
594 // initialisation for scheduler - what cannot go into initScheduler()
595
596 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
597 win32AllocStack();
598 #endif
599 }
600
601 /* -----------------------------------------------------------------------------
602 * scheduleFindWork()
603 *
604 * Search for work to do, and handle messages from elsewhere.
605 * -------------------------------------------------------------------------- */
606
607 static void
608 scheduleFindWork (Capability **pcap)
609 {
610 scheduleStartSignalHandlers(*pcap);
611
612 scheduleProcessInbox(pcap);
613
614 scheduleCheckBlockedThreads(*pcap);
615
616 #if defined(THREADED_RTS)
617 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
618 #endif
619 }
620
621 #if defined(THREADED_RTS)
622 STATIC_INLINE bool
623 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
624 {
625 // we need to yield this capability to someone else if..
626 // - another thread is initiating a GC, and we didn't just do a GC
627 // (see Note [GC livelock])
628 // - another Task is returning from a foreign call
629 // - the thread at the head of the run queue cannot be run
630 // by this Task (it is bound to another Task, or it is unbound
631 // and this task it bound).
632 //
633 // Note [GC livelock]
634 //
635 // If we are interrupted to do a GC, then we do not immediately do
636 // another one. This avoids a starvation situation where one
637 // Capability keeps forcing a GC and the other Capabilities make no
638 // progress at all.
639
640 return ((pending_sync && !didGcLast) ||
641 cap->n_returning_tasks != 0 ||
642 (!emptyRunQueue(cap) && (task->incall->tso == NULL
643 ? peekRunQueue(cap)->bound != NULL
644 : peekRunQueue(cap)->bound != task->incall)));
645 }
646
647 // This is the single place where a Task goes to sleep. There are
648 // two reasons it might need to sleep:
649 // - there are no threads to run
650 // - we need to yield this Capability to someone else
651 // (see shouldYieldCapability())
652 //
653 // Careful: the scheduler loop is quite delicate. Make sure you run
654 // the tests in testsuite/concurrent (all ways) after modifying this,
655 // and also check the benchmarks in nofib/parallel for regressions.
656
657 static void
658 scheduleYield (Capability **pcap, Task *task)
659 {
660 Capability *cap = *pcap;
661 bool didGcLast = false;
662
663 // if we have work, and we don't need to give up the Capability, continue.
664 //
665 if (!shouldYieldCapability(cap,task,false) &&
666 (!emptyRunQueue(cap) ||
667 !emptyInbox(cap) ||
668 sched_state >= SCHED_INTERRUPTING)) {
669 return;
670 }
671
672 // otherwise yield (sleep), and keep yielding if necessary.
673 do {
674 if (doIdleGCWork(cap, false)) {
675 // there's more idle GC work to do
676 didGcLast = false;
677 } else {
678 // no more idle GC work to do
679 didGcLast = yieldCapability(&cap,task, !didGcLast);
680 }
681 }
682 while (shouldYieldCapability(cap,task,didGcLast));
683
684 // note there may still be no threads on the run queue at this
685 // point, the caller has to check.
686
687 *pcap = cap;
688 return;
689 }
690 #endif
691
692 /* -----------------------------------------------------------------------------
693 * schedulePushWork()
694 *
695 * Push work to other Capabilities if we have some.
696 * -------------------------------------------------------------------------- */
697
698 static void
699 schedulePushWork(Capability *cap USED_IF_THREADS,
700 Task *task USED_IF_THREADS)
701 {
702 #if defined(THREADED_RTS)
703
704 Capability *free_caps[n_capabilities], *cap0;
705 uint32_t i, n_wanted_caps, n_free_caps;
706
707 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
708
709 // migration can be turned off with +RTS -qm
710 if (!RtsFlags.ParFlags.migrate) {
711 spare_threads = 0;
712 }
713
714 // Figure out how many capabilities we want to wake up. We need at least
715 // sparkPoolSize(cap) plus the number of spare threads we have.
716 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
717 if (n_wanted_caps == 0) return;
718
719 // First grab as many free Capabilities as we can. ToDo: we should use
720 // capabilities on the same NUMA node preferably, but not exclusively.
721 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
722 n_free_caps < n_wanted_caps && i != cap->no;
723 i = (i + 1) % n_capabilities) {
724 cap0 = capabilities[i];
725 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726 if (!emptyRunQueue(cap0)
727 || cap0->n_returning_tasks != 0
728 || !emptyInbox(cap0)) {
729 // it already has some work, we just grabbed it at
730 // the wrong moment. Or maybe it's deadlocked!
731 releaseCapability(cap0);
732 } else {
733 free_caps[n_free_caps++] = cap0;
734 }
735 }
736 }
737
738 // We now have n_free_caps free capabilities stashed in
739 // free_caps[]. Attempt to share our run queue equally with them.
740 // This is complicated slightly by the fact that we can't move
741 // some threads:
742 //
743 // - threads that have TSO_LOCKED cannot migrate
744 // - a thread that is bound to the current Task cannot be migrated
745 //
746 // This is about the simplest thing we could do; improvements we
747 // might want to do include:
748 //
749 // - giving high priority to moving relatively new threads, on
750 // the gournds that they haven't had time to build up a
751 // working set in the cache on this CPU/Capability.
752 //
753 // - giving low priority to moving long-lived threads
754
755 if (n_free_caps > 0) {
756 StgTSO *prev, *t, *next;
757
758 debugTrace(DEBUG_sched,
759 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
760 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
761 n_free_caps);
762
763 // There are n_free_caps+1 caps in total. We will share the threads
764 // evently between them, *except* that if the run queue does not divide
765 // evenly by n_free_caps+1 then we bias towards the current capability.
766 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
767 uint32_t keep_threads =
768 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
769
770 // This also ensures that we don't give away all our threads, since
771 // (x + y) / (y + 1) >= 1 when x >= 1.
772
773 // The number of threads we have left.
774 uint32_t n = cap->n_run_queue;
775
776 // prev = the previous thread on this cap's run queue
777 prev = END_TSO_QUEUE;
778
779 // We're going to walk through the run queue, migrating threads to other
780 // capabilities until we have only keep_threads left. We might
781 // encounter a thread that cannot be migrated, in which case we add it
782 // to the current run queue and decrement keep_threads.
783 for (t = cap->run_queue_hd, i = 0;
784 t != END_TSO_QUEUE && n > keep_threads;
785 t = next)
786 {
787 next = t->_link;
788 t->_link = END_TSO_QUEUE;
789
790 // Should we keep this thread?
791 if (t->bound == task->incall // don't move my bound thread
792 || tsoLocked(t) // don't move a locked thread
793 ) {
794 if (prev == END_TSO_QUEUE) {
795 cap->run_queue_hd = t;
796 } else {
797 setTSOLink(cap, prev, t);
798 }
799 setTSOPrev(cap, t, prev);
800 prev = t;
801 if (keep_threads > 0) keep_threads--;
802 }
803
804 // Or migrate it?
805 else {
806 appendToRunQueue(free_caps[i],t);
807 traceEventMigrateThread (cap, t, free_caps[i]->no);
808
809 if (t->bound) { t->bound->task->cap = free_caps[i]; }
810 t->cap = free_caps[i];
811 n--; // we have one fewer threads now
812 i++; // move on to the next free_cap
813 if (i == n_free_caps) i = 0;
814 }
815 }
816
817 // Join up the beginning of the queue (prev)
818 // with the rest of the queue (t)
819 if (t == END_TSO_QUEUE) {
820 cap->run_queue_tl = prev;
821 } else {
822 setTSOPrev(cap, t, prev);
823 }
824 if (prev == END_TSO_QUEUE) {
825 cap->run_queue_hd = t;
826 } else {
827 setTSOLink(cap, prev, t);
828 }
829 cap->n_run_queue = n;
830
831 IF_DEBUG(sanity, checkRunQueue(cap));
832
833 // release the capabilities
834 for (i = 0; i < n_free_caps; i++) {
835 task->cap = free_caps[i];
836 if (sparkPoolSizeCap(cap) > 0) {
837 // If we have sparks to steal, wake up a worker on the
838 // capability, even if it has no threads to run.
839 releaseAndWakeupCapability(free_caps[i]);
840 } else {
841 releaseCapability(free_caps[i]);
842 }
843 }
844 }
845 task->cap = cap; // reset to point to our Capability.
846
847 #endif /* THREADED_RTS */
848
849 }
850
851 /* ----------------------------------------------------------------------------
852 * Start any pending signal handlers
853 * ------------------------------------------------------------------------- */
854
855 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
856 static void
857 scheduleStartSignalHandlers(Capability *cap)
858 {
859 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
860 // safe outside the lock
861 startSignalHandlers(cap);
862 }
863 }
864 #else
865 static void
866 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
867 {
868 }
869 #endif
870
871 /* ----------------------------------------------------------------------------
872 * Check for blocked threads that can be woken up.
873 * ------------------------------------------------------------------------- */
874
875 static void
876 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
877 {
878 #if !defined(THREADED_RTS)
879 //
880 // Check whether any waiting threads need to be woken up. If the
881 // run queue is empty, and there are no other tasks running, we
882 // can wait indefinitely for something to happen.
883 //
884 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
885 {
886 awaitEvent (emptyRunQueue(cap));
887 }
888 #endif
889 }
890
891 /* ----------------------------------------------------------------------------
892 * Detect deadlock conditions and attempt to resolve them.
893 * ------------------------------------------------------------------------- */
894
895 static void
896 scheduleDetectDeadlock (Capability **pcap, Task *task)
897 {
898 Capability *cap = *pcap;
899 /*
900 * Detect deadlock: when we have no threads to run, there are no
901 * threads blocked, waiting for I/O, or sleeping, and all the
902 * other tasks are waiting for work, we must have a deadlock of
903 * some description.
904 */
905 if ( emptyThreadQueues(cap) )
906 {
907 #if defined(THREADED_RTS)
908 /*
909 * In the threaded RTS, we only check for deadlock if there
910 * has been no activity in a complete timeslice. This means
911 * we won't eagerly start a full GC just because we don't have
912 * any threads to run currently.
913 */
914 if (recent_activity != ACTIVITY_INACTIVE) return;
915 #endif
916
917 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
918
919 // Garbage collection can release some new threads due to
920 // either (a) finalizers or (b) threads resurrected because
921 // they are unreachable and will therefore be sent an
922 // exception. Any threads thus released will be immediately
923 // runnable.
924 scheduleDoGC (pcap, task, true/*force major GC*/);
925 cap = *pcap;
926 // when force_major == true. scheduleDoGC sets
927 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
928 // signal.
929
930 if ( !emptyRunQueue(cap) ) return;
931
932 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
933 /* If we have user-installed signal handlers, then wait
934 * for signals to arrive rather then bombing out with a
935 * deadlock.
936 */
937 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
938 debugTrace(DEBUG_sched,
939 "still deadlocked, waiting for signals...");
940
941 awaitUserSignals();
942
943 if (signals_pending()) {
944 startSignalHandlers(cap);
945 }
946
947 // either we have threads to run, or we were interrupted:
948 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
949
950 return;
951 }
952 #endif
953
954 #if !defined(THREADED_RTS)
955 /* Probably a real deadlock. Send the current main thread the
956 * Deadlock exception.
957 */
958 if (task->incall->tso) {
959 switch (task->incall->tso->why_blocked) {
960 case BlockedOnSTM:
961 case BlockedOnBlackHole:
962 case BlockedOnMsgThrowTo:
963 case BlockedOnMVar:
964 case BlockedOnMVarRead:
965 throwToSingleThreaded(cap, task->incall->tso,
966 (StgClosure *)nonTermination_closure);
967 return;
968 default:
969 barf("deadlock: main thread blocked in a strange way");
970 }
971 }
972 return;
973 #endif
974 }
975 }
976
977
978 /* ----------------------------------------------------------------------------
979 * Process message in the current Capability's inbox
980 * ------------------------------------------------------------------------- */
981
982 static void
983 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
984 {
985 #if defined(THREADED_RTS)
986 Message *m, *next;
987 PutMVar *p, *pnext;
988 int r;
989 Capability *cap = *pcap;
990
991 while (!emptyInbox(cap)) {
992 // Executing messages might use heap, so we should check for GC.
993 if (doYouWantToGC(cap)) {
994 scheduleDoGC(pcap, cap->running_task, false);
995 cap = *pcap;
996 }
997
998 // don't use a blocking acquire; if the lock is held by
999 // another thread then just carry on. This seems to avoid
1000 // getting stuck in a message ping-pong situation with other
1001 // processors. We'll check the inbox again later anyway.
1002 //
1003 // We should really use a more efficient queue data structure
1004 // here. The trickiness is that we must ensure a Capability
1005 // never goes idle if the inbox is non-empty, which is why we
1006 // use cap->lock (cap->lock is released as the last thing
1007 // before going idle; see Capability.c:releaseCapability()).
1008 r = TRY_ACQUIRE_LOCK(&cap->lock);
1009 if (r != 0) return;
1010
1011 m = cap->inbox;
1012 p = cap->putMVars;
1013 cap->inbox = (Message*)END_TSO_QUEUE;
1014 cap->putMVars = NULL;
1015
1016 RELEASE_LOCK(&cap->lock);
1017
1018 while (m != (Message*)END_TSO_QUEUE) {
1019 next = m->link;
1020 executeMessage(cap, m);
1021 m = next;
1022 }
1023
1024 while (p != NULL) {
1025 pnext = p->link;
1026 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1027 Unit_closure);
1028 freeStablePtr(p->mvar);
1029 stgFree(p);
1030 p = pnext;
1031 }
1032 }
1033 #endif
1034 }
1035
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, true);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 //
1086 // If the current thread's allocation limit has run out, send it
1087 // the AllocationLimitExceeded exception.
1088
1089 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1090 // Use a throwToSelf rather than a throwToSingleThreaded, because
1091 // it correctly handles the case where the thread is currently
1092 // inside mask. Also the thread might be blocked (e.g. on an
1093 // MVar), and throwToSingleThreaded doesn't unblock it
1094 // correctly in that case.
1095 throwToSelf(cap, t, allocationLimitExceeded_closure);
1096 ASSIGN_Int64((W_*)&(t->alloc_limit),
1097 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1098 }
1099
1100 /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1105 * -------------------------------------------------------------------------- */
1106
1107 static bool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110 if (cap->r.rHpLim == NULL || cap->context_switch) {
1111 // Sometimes we miss a context switch, e.g. when calling
1112 // primitives in a tight loop, MAYBE_GC() doesn't check the
1113 // context switch flag, and we end up waiting for a GC.
1114 // See #1984, and concurrent/should_run/1984
1115 cap->context_switch = 0;
1116 appendToRunQueue(cap,t);
1117 } else {
1118 pushOnRunQueue(cap,t);
1119 }
1120
1121 // did the task ask for a large block?
1122 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1123 // if so, get one and push it on the front of the nursery.
1124 bdescr *bd;
1125 W_ blocks;
1126
1127 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1128
1129 if (blocks > BLOCKS_PER_MBLOCK) {
1130 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1131 }
1132
1133 debugTrace(DEBUG_sched,
1134 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1135 (long)t->id, what_next_strs[t->what_next], blocks);
1136
1137 // don't do this if the nursery is (nearly) full, we'll GC first.
1138 if (cap->r.rCurrentNursery->link != NULL ||
1139 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1140 // infinite loop if the
1141 // nursery has only one
1142 // block.
1143
1144 bd = allocGroupOnNode_lock(cap->node,blocks);
1145 cap->r.rNursery->n_blocks += blocks;
1146
1147 // link the new group after CurrentNursery
1148 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1149
1150 // initialise it as a nursery block. We initialise the
1151 // step, gen_no, and flags field of *every* sub-block in
1152 // this large block, because this is easier than making
1153 // sure that we always find the block head of a large
1154 // block whenever we call Bdescr() (eg. evacuate() and
1155 // isAlive() in the GC would both have to do this, at
1156 // least).
1157 {
1158 bdescr *x;
1159 for (x = bd; x < bd + blocks; x++) {
1160 initBdescr(x,g0,g0);
1161 x->free = x->start;
1162 x->flags = 0;
1163 }
1164 }
1165
1166 // This assert can be a killer if the app is doing lots
1167 // of large block allocations.
1168 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1169
1170 // now update the nursery to point to the new block
1171 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1172 cap->r.rCurrentNursery = bd;
1173
1174 // we might be unlucky and have another thread get on the
1175 // run queue before us and steal the large block, but in that
1176 // case the thread will just end up requesting another large
1177 // block.
1178 return false; /* not actually GC'ing */
1179 }
1180 }
1181
1182 return doYouWantToGC(cap);
1183 /* actual GC is done at the end of the while loop in schedule() */
1184 }
1185
1186 /* -----------------------------------------------------------------------------
1187 * Handle a thread that returned to the scheduler with ThreadYielding
1188 * -------------------------------------------------------------------------- */
1189
1190 static bool
1191 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1192 {
1193 /* put the thread back on the run queue. Then, if we're ready to
1194 * GC, check whether this is the last task to stop. If so, wake
1195 * up the GC thread. getThread will block during a GC until the
1196 * GC is finished.
1197 */
1198
1199 ASSERT(t->_link == END_TSO_QUEUE);
1200
1201 // Shortcut if we're just switching evaluators: just run the thread. See
1202 // Note [avoiding threadPaused] in Interpreter.c.
1203 if (t->what_next != prev_what_next) {
1204 debugTrace(DEBUG_sched,
1205 "--<< thread %ld (%s) stopped to switch evaluators",
1206 (long)t->id, what_next_strs[t->what_next]);
1207 return true;
1208 }
1209
1210 // Reset the context switch flag. We don't do this just before
1211 // running the thread, because that would mean we would lose ticks
1212 // during GC, which can lead to unfair scheduling (a thread hogs
1213 // the CPU because the tick always arrives during GC). This way
1214 // penalises threads that do a lot of allocation, but that seems
1215 // better than the alternative.
1216 if (cap->context_switch != 0) {
1217 cap->context_switch = 0;
1218 appendToRunQueue(cap,t);
1219 } else {
1220 pushOnRunQueue(cap,t);
1221 }
1222
1223 IF_DEBUG(sanity,
1224 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1225 checkTSO(t));
1226
1227 return false;
1228 }
1229
1230 /* -----------------------------------------------------------------------------
1231 * Handle a thread that returned to the scheduler with ThreadBlocked
1232 * -------------------------------------------------------------------------- */
1233
1234 static void
1235 scheduleHandleThreadBlocked( StgTSO *t
1236 #if !defined(DEBUG)
1237 STG_UNUSED
1238 #endif
1239 )
1240 {
1241
1242 // We don't need to do anything. The thread is blocked, and it
1243 // has tidied up its stack and placed itself on whatever queue
1244 // it needs to be on.
1245
1246 // ASSERT(t->why_blocked != NotBlocked);
1247 // Not true: for example,
1248 // - the thread may have woken itself up already, because
1249 // threadPaused() might have raised a blocked throwTo
1250 // exception, see maybePerformBlockedException().
1251
1252 #if defined(DEBUG)
1253 traceThreadStatus(DEBUG_sched, t);
1254 #endif
1255 }
1256
1257 /* -----------------------------------------------------------------------------
1258 * Handle a thread that returned to the scheduler with ThreadFinished
1259 * -------------------------------------------------------------------------- */
1260
1261 static bool
1262 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1263 {
1264 /* Need to check whether this was a main thread, and if so,
1265 * return with the return value.
1266 *
1267 * We also end up here if the thread kills itself with an
1268 * uncaught exception, see Exception.cmm.
1269 */
1270
1271 // blocked exceptions can now complete, even if the thread was in
1272 // blocked mode (see #2910).
1273 awakenBlockedExceptionQueue (cap, t);
1274
1275 //
1276 // Check whether the thread that just completed was a bound
1277 // thread, and if so return with the result.
1278 //
1279 // There is an assumption here that all thread completion goes
1280 // through this point; we need to make sure that if a thread
1281 // ends up in the ThreadKilled state, that it stays on the run
1282 // queue so it can be dealt with here.
1283 //
1284
1285 if (t->bound) {
1286
1287 if (t->bound != task->incall) {
1288 #if !defined(THREADED_RTS)
1289 // Must be a bound thread that is not the topmost one. Leave
1290 // it on the run queue until the stack has unwound to the
1291 // point where we can deal with this. Leaving it on the run
1292 // queue also ensures that the garbage collector knows about
1293 // this thread and its return value (it gets dropped from the
1294 // step->threads list so there's no other way to find it).
1295 appendToRunQueue(cap,t);
1296 return false;
1297 #else
1298 // this cannot happen in the threaded RTS, because a
1299 // bound thread can only be run by the appropriate Task.
1300 barf("finished bound thread that isn't mine");
1301 #endif
1302 }
1303
1304 ASSERT(task->incall->tso == t);
1305
1306 if (t->what_next == ThreadComplete) {
1307 if (task->incall->ret) {
1308 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1309 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1310 }
1311 task->incall->rstat = Success;
1312 } else {
1313 if (task->incall->ret) {
1314 *(task->incall->ret) = NULL;
1315 }
1316 if (sched_state >= SCHED_INTERRUPTING) {
1317 if (heap_overflow) {
1318 task->incall->rstat = HeapExhausted;
1319 } else {
1320 task->incall->rstat = Interrupted;
1321 }
1322 } else {
1323 task->incall->rstat = Killed;
1324 }
1325 }
1326 #if defined(DEBUG)
1327 removeThreadLabel((StgWord)task->incall->tso->id);
1328 #endif
1329
1330 // We no longer consider this thread and task to be bound to
1331 // each other. The TSO lives on until it is GC'd, but the
1332 // task is about to be released by the caller, and we don't
1333 // want anyone following the pointer from the TSO to the
1334 // defunct task (which might have already been
1335 // re-used). This was a real bug: the GC updated
1336 // tso->bound->tso which lead to a deadlock.
1337 t->bound = NULL;
1338 task->incall->tso = NULL;
1339
1340 return true; // tells schedule() to return
1341 }
1342
1343 return false;
1344 }
1345
1346 /* -----------------------------------------------------------------------------
1347 * Perform a heap census
1348 * -------------------------------------------------------------------------- */
1349
1350 static bool
1351 scheduleNeedHeapProfile( bool ready_to_gc )
1352 {
1353 // When we have +RTS -i0 and we're heap profiling, do a census at
1354 // every GC. This lets us get repeatable runs for debugging.
1355 if (performHeapProfile ||
1356 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1357 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1358 return true;
1359 } else {
1360 return false;
1361 }
1362 }
1363
1364 /* -----------------------------------------------------------------------------
1365 * stopAllCapabilities()
1366 *
1367 * Stop all Haskell execution. This is used when we need to make some global
1368 * change to the system, such as altering the number of capabilities, or
1369 * forking.
1370 *
1371 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1372 * -------------------------------------------------------------------------- */
1373
1374 #if defined(THREADED_RTS)
1375 static void stopAllCapabilities (Capability **pCap, Task *task)
1376 {
1377 bool was_syncing;
1378 SyncType prev_sync_type;
1379
1380 PendingSync sync = {
1381 .type = SYNC_OTHER,
1382 .idle = NULL,
1383 .task = task
1384 };
1385
1386 do {
1387 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1388 } while (was_syncing);
1389
1390 acquireAllCapabilities(*pCap,task);
1391
1392 pending_sync = 0;
1393 }
1394 #endif
1395
1396 /* -----------------------------------------------------------------------------
1397 * requestSync()
1398 *
1399 * Commence a synchronisation between all capabilities. Normally not called
1400 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1401 * has some special synchronisation requirements.
1402 *
1403 * Returns:
1404 * false if we successfully got a sync
1405 * true if there was another sync request in progress,
1406 * and we yielded to it. The value returned is the
1407 * type of the other sync request.
1408 * -------------------------------------------------------------------------- */
1409
1410 #if defined(THREADED_RTS)
1411 static bool requestSync (
1412 Capability **pcap, Task *task, PendingSync *new_sync,
1413 SyncType *prev_sync_type)
1414 {
1415 PendingSync *sync;
1416
1417 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1418 (StgWord)NULL,
1419 (StgWord)new_sync);
1420
1421 if (sync != NULL)
1422 {
1423 // sync is valid until we have called yieldCapability().
1424 // After the sync is completed, we cannot read that struct any
1425 // more because it has been freed.
1426 *prev_sync_type = sync->type;
1427 do {
1428 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1429 sync->type);
1430 ASSERT(*pcap);
1431 yieldCapability(pcap,task,true);
1432 sync = pending_sync;
1433 } while (sync != NULL);
1434
1435 // NOTE: task->cap might have changed now
1436 return true;
1437 }
1438 else
1439 {
1440 return false;
1441 }
1442 }
1443 #endif
1444
1445 /* -----------------------------------------------------------------------------
1446 * acquireAllCapabilities()
1447 *
1448 * Grab all the capabilities except the one we already hold. Used
1449 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1450 * before a fork (SYNC_OTHER).
1451 *
1452 * Only call this after requestSync(), otherwise a deadlock might
1453 * ensue if another thread is trying to synchronise.
1454 * -------------------------------------------------------------------------- */
1455
1456 #if defined(THREADED_RTS)
1457 static void acquireAllCapabilities(Capability *cap, Task *task)
1458 {
1459 Capability *tmpcap;
1460 uint32_t i;
1461
1462 ASSERT(pending_sync != NULL);
1463 for (i=0; i < n_capabilities; i++) {
1464 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1465 i, n_capabilities);
1466 tmpcap = capabilities[i];
1467 if (tmpcap != cap) {
1468 // we better hope this task doesn't get migrated to
1469 // another Capability while we're waiting for this one.
1470 // It won't, because load balancing happens while we have
1471 // all the Capabilities, but even so it's a slightly
1472 // unsavoury invariant.
1473 task->cap = tmpcap;
1474 waitForCapability(&tmpcap, task);
1475 if (tmpcap->no != i) {
1476 barf("acquireAllCapabilities: got the wrong capability");
1477 }
1478 }
1479 }
1480 task->cap = cap;
1481 }
1482 #endif
1483
1484 /* -----------------------------------------------------------------------------
1485 * releaseAllcapabilities()
1486 *
1487 * Assuming this thread holds all the capabilities, release them all except for
1488 * the one passed in as cap.
1489 * -------------------------------------------------------------------------- */
1490
1491 #if defined(THREADED_RTS)
1492 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1493 {
1494 uint32_t i;
1495
1496 for (i = 0; i < n; i++) {
1497 if (cap->no != i) {
1498 task->cap = capabilities[i];
1499 releaseCapability(capabilities[i]);
1500 }
1501 }
1502 task->cap = cap;
1503 }
1504 #endif
1505
1506 /* -----------------------------------------------------------------------------
1507 * Perform a garbage collection if necessary
1508 * -------------------------------------------------------------------------- */
1509
1510 static void
1511 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1512 bool force_major)
1513 {
1514 Capability *cap = *pcap;
1515 bool heap_census;
1516 uint32_t collect_gen;
1517 bool major_gc;
1518 #if defined(THREADED_RTS)
1519 uint32_t gc_type;
1520 uint32_t i;
1521 uint32_t need_idle;
1522 uint32_t n_gc_threads;
1523 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1524 StgTSO *tso;
1525 bool *idle_cap;
1526 // idle_cap is an array (allocated later) of size n_capabilities, where
1527 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1528 // cycle.
1529 #endif
1530
1531 if (sched_state == SCHED_SHUTTING_DOWN) {
1532 // The final GC has already been done, and the system is
1533 // shutting down. We'll probably deadlock if we try to GC
1534 // now.
1535 return;
1536 }
1537
1538 heap_census = scheduleNeedHeapProfile(true);
1539
1540 // Figure out which generation we are collecting, so that we can
1541 // decide whether this is a parallel GC or not.
1542 collect_gen = calcNeeded(force_major || heap_census, NULL);
1543 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1544
1545 #if defined(THREADED_RTS)
1546 if (sched_state < SCHED_INTERRUPTING
1547 && RtsFlags.ParFlags.parGcEnabled
1548 && collect_gen >= RtsFlags.ParFlags.parGcGen
1549 && ! oldest_gen->mark)
1550 {
1551 gc_type = SYNC_GC_PAR;
1552 } else {
1553 gc_type = SYNC_GC_SEQ;
1554 }
1555
1556 // In order to GC, there must be no threads running Haskell code.
1557 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1558 // capabilities, and release them after the GC has completed. For parallel
1559 // GC, we synchronise all the running threads using requestSync().
1560 //
1561 // Other capabilities are prevented from running yet more Haskell threads if
1562 // pending_sync is set. Tested inside yieldCapability() and
1563 // releaseCapability() in Capability.c
1564
1565 PendingSync sync = {
1566 .type = gc_type,
1567 .idle = NULL,
1568 .task = task
1569 };
1570
1571 {
1572 SyncType prev_sync = 0;
1573 bool was_syncing;
1574 do {
1575 // If -qn is not set and we have more capabilities than cores, set
1576 // the number of GC threads to #cores. We do this here rather than
1577 // in normaliseRtsOpts() because here it will work if the program
1578 // calls setNumCapabilities.
1579 //
1580 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1581 if (n_gc_threads == 0 &&
1582 enabled_capabilities > getNumberOfProcessors()) {
1583 n_gc_threads = getNumberOfProcessors();
1584 }
1585
1586 // This calculation must be inside the loop because
1587 // enabled_capabilities may change if requestSync() below fails and
1588 // we retry.
1589 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1590 if (n_gc_threads >= enabled_capabilities) {
1591 need_idle = 0;
1592 } else {
1593 need_idle = enabled_capabilities - n_gc_threads;
1594 }
1595 } else {
1596 need_idle = 0;
1597 }
1598
1599 // We need an array of size n_capabilities, but since this may
1600 // change each time around the loop we must allocate it afresh.
1601 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1602 sizeof(bool),
1603 "scheduleDoGC");
1604 sync.idle = idle_cap;
1605
1606 // When using +RTS -qn, we need some capabilities to be idle during
1607 // GC. The best bet is to choose some inactive ones, so we look for
1608 // those first:
1609 uint32_t n_idle = need_idle;
1610 for (i=0; i < n_capabilities; i++) {
1611 if (capabilities[i]->disabled) {
1612 idle_cap[i] = true;
1613 } else if (n_idle > 0 &&
1614 capabilities[i]->running_task == NULL) {
1615 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1616 n_idle--;
1617 idle_cap[i] = true;
1618 } else {
1619 idle_cap[i] = false;
1620 }
1621 }
1622 // If we didn't find enough inactive capabilities, just pick some
1623 // more to be idle.
1624 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1625 if (!idle_cap[i] && i != cap->no) {
1626 idle_cap[i] = true;
1627 n_idle--;
1628 }
1629 }
1630 ASSERT(n_idle == 0);
1631
1632 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1633 cap = *pcap;
1634 if (was_syncing) {
1635 stgFree(idle_cap);
1636 }
1637 if (was_syncing &&
1638 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1639 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1640 // someone else had a pending sync request for a GC, so
1641 // let's assume GC has been done and we don't need to GC
1642 // again.
1643 // Exception to this: if SCHED_INTERRUPTING, then we still
1644 // need to do the final GC.
1645 return;
1646 }
1647 if (sched_state == SCHED_SHUTTING_DOWN) {
1648 // The scheduler might now be shutting down. We tested
1649 // this above, but it might have become true since then as
1650 // we yielded the capability in requestSync().
1651 return;
1652 }
1653 } while (was_syncing);
1654 }
1655
1656 stat_startGCSync(gc_threads[cap->no]);
1657
1658 #if defined(DEBUG)
1659 unsigned int old_n_capabilities = n_capabilities;
1660 #endif
1661
1662 interruptAllCapabilities();
1663
1664 // The final shutdown GC is always single-threaded, because it's
1665 // possible that some of the Capabilities have no worker threads.
1666
1667 if (gc_type == SYNC_GC_SEQ) {
1668 traceEventRequestSeqGc(cap);
1669 } else {
1670 traceEventRequestParGc(cap);
1671 }
1672
1673 if (gc_type == SYNC_GC_SEQ) {
1674 // single-threaded GC: grab all the capabilities
1675 acquireAllCapabilities(cap,task);
1676 }
1677 else
1678 {
1679 // If we are load-balancing collections in this
1680 // generation, then we require all GC threads to participate
1681 // in the collection. Otherwise, we only require active
1682 // threads to participate, and we set gc_threads[i]->idle for
1683 // any idle capabilities. The rationale here is that waking
1684 // up an idle Capability takes much longer than just doing any
1685 // GC work on its behalf.
1686
1687 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1688 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1689 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1690 {
1691 for (i=0; i < n_capabilities; i++) {
1692 if (capabilities[i]->disabled) {
1693 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1694 if (idle_cap[i]) {
1695 n_idle_caps++;
1696 }
1697 } else {
1698 if (i != cap->no && idle_cap[i]) {
1699 Capability *tmpcap = capabilities[i];
1700 task->cap = tmpcap;
1701 waitForCapability(&tmpcap, task);
1702 n_idle_caps++;
1703 }
1704 }
1705 }
1706 }
1707 else
1708 {
1709 for (i=0; i < n_capabilities; i++) {
1710 if (capabilities[i]->disabled) {
1711 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1712 if (idle_cap[i]) {
1713 n_idle_caps++;
1714 }
1715 } else if (i != cap->no &&
1716 capabilities[i]->idle >=
1717 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1718 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1719 if (idle_cap[i]) {
1720 n_idle_caps++;
1721 } else {
1722 n_failed_trygrab_idles++;
1723 }
1724 }
1725 }
1726 }
1727 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1728
1729 for (i=0; i < n_capabilities; i++) {
1730 capabilities[i]->idle++;
1731 }
1732
1733 // For all capabilities participating in this GC, wait until
1734 // they have stopped mutating and are standing by for GC.
1735 waitForGcThreads(cap, idle_cap);
1736
1737 // Stable point where we can do a global check on our spark counters
1738 ASSERT(checkSparkCountInvariant());
1739 }
1740
1741 #endif
1742
1743 IF_DEBUG(scheduler, printAllThreads());
1744
1745 delete_threads_and_gc:
1746 /*
1747 * We now have all the capabilities; if we're in an interrupting
1748 * state, then we should take the opportunity to delete all the
1749 * threads in the system.
1750 * Checking for major_gc ensures that the last GC is major.
1751 */
1752 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1753 deleteAllThreads();
1754 #if defined(THREADED_RTS)
1755 // Discard all the sparks from every Capability. Why?
1756 // They'll probably be GC'd anyway since we've killed all the
1757 // threads. It just avoids the GC having to do any work to
1758 // figure out that any remaining sparks are garbage.
1759 for (i = 0; i < n_capabilities; i++) {
1760 capabilities[i]->spark_stats.gcd +=
1761 sparkPoolSize(capabilities[i]->sparks);
1762 // No race here since all Caps are stopped.
1763 discardSparksCap(capabilities[i]);
1764 }
1765 #endif
1766 sched_state = SCHED_SHUTTING_DOWN;
1767 }
1768
1769 /*
1770 * When there are disabled capabilities, we want to migrate any
1771 * threads away from them. Normally this happens in the
1772 * scheduler's loop, but only for unbound threads - it's really
1773 * hard for a bound thread to migrate itself. So we have another
1774 * go here.
1775 */
1776 #if defined(THREADED_RTS)
1777 for (i = enabled_capabilities; i < n_capabilities; i++) {
1778 Capability *tmp_cap, *dest_cap;
1779 tmp_cap = capabilities[i];
1780 ASSERT(tmp_cap->disabled);
1781 if (i != cap->no) {
1782 dest_cap = capabilities[i % enabled_capabilities];
1783 while (!emptyRunQueue(tmp_cap)) {
1784 tso = popRunQueue(tmp_cap);
1785 migrateThread(tmp_cap, tso, dest_cap);
1786 if (tso->bound) {
1787 traceTaskMigrate(tso->bound->task,
1788 tso->bound->task->cap,
1789 dest_cap);
1790 tso->bound->task->cap = dest_cap;
1791 }
1792 }
1793 }
1794 }
1795 #endif
1796
1797 // Do any remaining idle GC work from the previous GC
1798 doIdleGCWork(cap, true /* all of it */);
1799
1800 #if defined(THREADED_RTS)
1801 // reset pending_sync *before* GC, so that when the GC threads
1802 // emerge they don't immediately re-enter the GC.
1803 pending_sync = 0;
1804 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1805 #else
1806 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1807 #endif
1808
1809 // If we're shutting down, don't leave any idle GC work to do.
1810 if (sched_state == SCHED_SHUTTING_DOWN) {
1811 doIdleGCWork(cap, true /* all of it */);
1812 }
1813
1814 traceSparkCounters(cap);
1815
1816 switch (recent_activity) {
1817 case ACTIVITY_INACTIVE:
1818 if (force_major) {
1819 // We are doing a GC because the system has been idle for a
1820 // timeslice and we need to check for deadlock. Record the
1821 // fact that we've done a GC and turn off the timer signal;
1822 // it will get re-enabled if we run any threads after the GC.
1823 recent_activity = ACTIVITY_DONE_GC;
1824 #if !defined(PROFILING)
1825 stopTimer();
1826 #endif
1827 break;
1828 }
1829 // fall through...
1830
1831 case ACTIVITY_MAYBE_NO:
1832 // the GC might have taken long enough for the timer to set
1833 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1834 // but we aren't necessarily deadlocked:
1835 recent_activity = ACTIVITY_YES;
1836 break;
1837
1838 case ACTIVITY_DONE_GC:
1839 // If we are actually active, the scheduler will reset the
1840 // recent_activity flag and re-enable the timer.
1841 break;
1842 }
1843
1844 #if defined(THREADED_RTS)
1845 // Stable point where we can do a global check on our spark counters
1846 ASSERT(checkSparkCountInvariant());
1847 #endif
1848
1849 // The heap census itself is done during GarbageCollect().
1850 if (heap_census) {
1851 performHeapProfile = false;
1852 }
1853
1854 #if defined(THREADED_RTS)
1855
1856 // If n_capabilities has changed during GC, we're in trouble.
1857 ASSERT(n_capabilities == old_n_capabilities);
1858
1859 if (gc_type == SYNC_GC_PAR)
1860 {
1861 for (i = 0; i < n_capabilities; i++) {
1862 if (i != cap->no) {
1863 if (idle_cap[i]) {
1864 ASSERT(capabilities[i]->running_task == task);
1865 task->cap = capabilities[i];
1866 releaseCapability(capabilities[i]);
1867 } else {
1868 ASSERT(capabilities[i]->running_task != task);
1869 }
1870 }
1871 }
1872 task->cap = cap;
1873
1874 // releaseGCThreads() happens *after* we have released idle
1875 // capabilities. Otherwise what can happen is one of the released
1876 // threads starts a new GC, and finds that it can't acquire some of
1877 // the disabled capabilities, because the previous GC still holds
1878 // them, so those disabled capabilities will not be idle during the
1879 // next GC round. However, if we release the capabilities first,
1880 // then they will be free (because they're disabled) when the next
1881 // GC cycle happens.
1882 releaseGCThreads(cap, idle_cap);
1883 }
1884 #endif
1885 if (heap_overflow && sched_state == SCHED_RUNNING) {
1886 // GC set the heap_overflow flag. We should throw an exception if we
1887 // can, or shut down otherwise.
1888
1889 // Get the thread to which Ctrl-C is thrown
1890 StgTSO *main_thread = getTopHandlerThread();
1891 if (main_thread == NULL) {
1892 // GC set the heap_overflow flag, and there is no main thread to
1893 // throw an exception to, so we should proceed with an orderly
1894 // shutdown now. Ultimately we want the main thread to return to
1895 // its caller with HeapExhausted, at which point the caller should
1896 // call hs_exit(). The first step is to delete all the threads.
1897 sched_state = SCHED_INTERRUPTING;
1898 goto delete_threads_and_gc;
1899 }
1900
1901 heap_overflow = false;
1902 const uint64_t allocation_count = getAllocations();
1903 if (RtsFlags.GcFlags.heapLimitGrace <
1904 allocation_count - allocated_bytes_at_heapoverflow ||
1905 allocated_bytes_at_heapoverflow == 0) {
1906 allocated_bytes_at_heapoverflow = allocation_count;
1907 // We used to simply exit, but throwing an exception gives the
1908 // program a chance to clean up. It also lets the exception be
1909 // caught.
1910
1911 // FIXME this is not a good way to tell a program to release
1912 // resources. It is neither reliable (the RTS crashes if it fails
1913 // to allocate memory from the OS) nor very usable (it is always
1914 // thrown to the main thread, which might not be able to do anything
1915 // useful with it). We really should have a more general way to
1916 // release resources in low-memory conditions. Nevertheless, this
1917 // is still a big improvement over just exiting.
1918
1919 // FIXME again: perhaps we should throw a synchronous exception
1920 // instead an asynchronous one, or have a way for the program to
1921 // register a handler to be called when heap overflow happens.
1922 throwToSelf(cap, main_thread, heapOverflow_closure);
1923 }
1924 }
1925
1926 #if defined(THREADED_RTS)
1927 stgFree(idle_cap);
1928
1929 if (gc_type == SYNC_GC_SEQ) {
1930 // release our stash of capabilities.
1931 releaseAllCapabilities(n_capabilities, cap, task);
1932 }
1933 #endif
1934
1935 return;
1936 }
1937
1938 /* ---------------------------------------------------------------------------
1939 * Singleton fork(). Do not copy any running threads.
1940 * ------------------------------------------------------------------------- */
1941
1942 pid_t
1943 forkProcess(HsStablePtr *entry
1944 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1945 STG_UNUSED
1946 #endif
1947 )
1948 {
1949 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1950 pid_t pid;
1951 StgTSO* t,*next;
1952 Capability *cap;
1953 uint32_t g;
1954 Task *task = NULL;
1955 uint32_t i;
1956
1957 debugTrace(DEBUG_sched, "forking!");
1958
1959 task = newBoundTask();
1960
1961 cap = NULL;
1962 waitForCapability(&cap, task);
1963
1964 #if defined(THREADED_RTS)
1965 stopAllCapabilities(&cap, task);
1966 #endif
1967
1968 // no funny business: hold locks while we fork, otherwise if some
1969 // other thread is holding a lock when the fork happens, the data
1970 // structure protected by the lock will forever be in an
1971 // inconsistent state in the child. See also #1391.
1972 ACQUIRE_LOCK(&sched_mutex);
1973 ACQUIRE_LOCK(&sm_mutex);
1974 ACQUIRE_LOCK(&stable_ptr_mutex);
1975 ACQUIRE_LOCK(&stable_name_mutex);
1976 ACQUIRE_LOCK(&task->lock);
1977
1978 for (i=0; i < n_capabilities; i++) {
1979 ACQUIRE_LOCK(&capabilities[i]->lock);
1980 }
1981
1982 #if defined(THREADED_RTS)
1983 ACQUIRE_LOCK(&all_tasks_mutex);
1984 #endif
1985
1986 stopTimer(); // See #4074
1987
1988 #if defined(TRACING)
1989 flushEventLog(); // so that child won't inherit dirty file buffers
1990 #endif
1991
1992 pid = fork();
1993
1994 if (pid) { // parent
1995
1996 startTimer(); // #4074
1997
1998 RELEASE_LOCK(&sched_mutex);
1999 RELEASE_LOCK(&sm_mutex);
2000 RELEASE_LOCK(&stable_ptr_mutex);
2001 RELEASE_LOCK(&stable_name_mutex);
2002 RELEASE_LOCK(&task->lock);
2003
2004 #if defined(THREADED_RTS)
2005 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2006 RELEASE_LOCK(&all_tasks_mutex);
2007 #endif
2008
2009 for (i=0; i < n_capabilities; i++) {
2010 releaseCapability_(capabilities[i],false);
2011 RELEASE_LOCK(&capabilities[i]->lock);
2012 }
2013
2014 boundTaskExiting(task);
2015
2016 // just return the pid
2017 return pid;
2018
2019 } else { // child
2020
2021 #if defined(THREADED_RTS)
2022 initMutex(&sched_mutex);
2023 initMutex(&sm_mutex);
2024 initMutex(&stable_ptr_mutex);
2025 initMutex(&stable_name_mutex);
2026 initMutex(&task->lock);
2027
2028 for (i=0; i < n_capabilities; i++) {
2029 initMutex(&capabilities[i]->lock);
2030 }
2031
2032 initMutex(&all_tasks_mutex);
2033 #endif
2034
2035 #if defined(TRACING)
2036 resetTracing();
2037 #endif
2038
2039 // Now, all OS threads except the thread that forked are
2040 // stopped. We need to stop all Haskell threads, including
2041 // those involved in foreign calls. Also we need to delete
2042 // all Tasks, because they correspond to OS threads that are
2043 // now gone.
2044
2045 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2046 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2047 next = t->global_link;
2048 // don't allow threads to catch the ThreadKilled
2049 // exception, but we do want to raiseAsync() because these
2050 // threads may be evaluating thunks that we need later.
2051 deleteThread_(t);
2052
2053 // stop the GC from updating the InCall to point to
2054 // the TSO. This is only necessary because the
2055 // OSThread bound to the TSO has been killed, and
2056 // won't get a chance to exit in the usual way (see
2057 // also scheduleHandleThreadFinished).
2058 t->bound = NULL;
2059 }
2060 }
2061
2062 discardTasksExcept(task);
2063
2064 for (i=0; i < n_capabilities; i++) {
2065 cap = capabilities[i];
2066
2067 // Empty the run queue. It seems tempting to let all the
2068 // killed threads stay on the run queue as zombies to be
2069 // cleaned up later, but some of them may correspond to
2070 // bound threads for which the corresponding Task does not
2071 // exist.
2072 truncateRunQueue(cap);
2073 cap->n_run_queue = 0;
2074
2075 // Any suspended C-calling Tasks are no more, their OS threads
2076 // don't exist now:
2077 cap->suspended_ccalls = NULL;
2078 cap->n_suspended_ccalls = 0;
2079
2080 #if defined(THREADED_RTS)
2081 // Wipe our spare workers list, they no longer exist. New
2082 // workers will be created if necessary.
2083 cap->spare_workers = NULL;
2084 cap->n_spare_workers = 0;
2085 cap->returning_tasks_hd = NULL;
2086 cap->returning_tasks_tl = NULL;
2087 cap->n_returning_tasks = 0;
2088 #endif
2089
2090 // Release all caps except 0, we'll use that for starting
2091 // the IO manager and running the client action below.
2092 if (cap->no != 0) {
2093 task->cap = cap;
2094 releaseCapability(cap);
2095 }
2096 }
2097 cap = capabilities[0];
2098 task->cap = cap;
2099
2100 // Empty the threads lists. Otherwise, the garbage
2101 // collector may attempt to resurrect some of these threads.
2102 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2103 generations[g].threads = END_TSO_QUEUE;
2104 }
2105
2106 // On Unix, all timers are reset in the child, so we need to start
2107 // the timer again.
2108 initTimer();
2109 startTimer();
2110
2111 // TODO: need to trace various other things in the child
2112 // like startup event, capabilities, process info etc
2113 traceTaskCreate(task, cap);
2114
2115 #if defined(THREADED_RTS)
2116 ioManagerStartCap(&cap);
2117 #endif
2118
2119 // Install toplevel exception handlers, so interruption
2120 // signal will be sent to the main thread.
2121 // See Trac #12903
2122 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2123 rts_checkSchedStatus("forkProcess",cap);
2124
2125 rts_unlock(cap);
2126 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2127 }
2128 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2129 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2130 #endif
2131 }
2132
2133 /* ---------------------------------------------------------------------------
2134 * Changing the number of Capabilities
2135 *
2136 * Changing the number of Capabilities is very tricky! We can only do
2137 * it with the system fully stopped, so we do a full sync with
2138 * requestSync(SYNC_OTHER) and grab all the capabilities.
2139 *
2140 * Then we resize the appropriate data structures, and update all
2141 * references to the old data structures which have now moved.
2142 * Finally we release the Capabilities we are holding, and start
2143 * worker Tasks on the new Capabilities we created.
2144 *
2145 * ------------------------------------------------------------------------- */
2146
2147 void
2148 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2149 {
2150 #if !defined(THREADED_RTS)
2151 if (new_n_capabilities != 1) {
2152 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2153 }
2154 return;
2155 #elif defined(NOSMP)
2156 if (new_n_capabilities != 1) {
2157 errorBelch("setNumCapabilities: not supported on this platform");
2158 }
2159 return;
2160 #else
2161 Task *task;
2162 Capability *cap;
2163 uint32_t n;
2164 Capability *old_capabilities = NULL;
2165 uint32_t old_n_capabilities = n_capabilities;
2166
2167 if (new_n_capabilities == enabled_capabilities) {
2168 return;
2169 } else if (new_n_capabilities <= 0) {
2170 errorBelch("setNumCapabilities: Capability count must be positive");
2171 return;
2172 }
2173
2174
2175 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2176 enabled_capabilities, new_n_capabilities);
2177
2178 cap = rts_lock();
2179 task = cap->running_task;
2180
2181 stopAllCapabilities(&cap, task);
2182
2183 if (new_n_capabilities < enabled_capabilities)
2184 {
2185 // Reducing the number of capabilities: we do not actually
2186 // remove the extra capabilities, we just mark them as
2187 // "disabled". This has the following effects:
2188 //
2189 // - threads on a disabled capability are migrated away by the
2190 // scheduler loop
2191 //
2192 // - disabled capabilities do not participate in GC
2193 // (see scheduleDoGC())
2194 //
2195 // - No spark threads are created on this capability
2196 // (see scheduleActivateSpark())
2197 //
2198 // - We do not attempt to migrate threads *to* a disabled
2199 // capability (see schedulePushWork()).
2200 //
2201 // but in other respects, a disabled capability remains
2202 // alive. Threads may be woken up on a disabled capability,
2203 // but they will be immediately migrated away.
2204 //
2205 // This approach is much easier than trying to actually remove
2206 // the capability; we don't have to worry about GC data
2207 // structures, the nursery, etc.
2208 //
2209 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2210 capabilities[n]->disabled = true;
2211 traceCapDisable(capabilities[n]);
2212 }
2213 enabled_capabilities = new_n_capabilities;
2214 }
2215 else
2216 {
2217 // Increasing the number of enabled capabilities.
2218 //
2219 // enable any disabled capabilities, up to the required number
2220 for (n = enabled_capabilities;
2221 n < new_n_capabilities && n < n_capabilities; n++) {
2222 capabilities[n]->disabled = false;
2223 traceCapEnable(capabilities[n]);
2224 }
2225 enabled_capabilities = n;
2226
2227 if (new_n_capabilities > n_capabilities) {
2228 #if defined(TRACING)
2229 // Allocate eventlog buffers for the new capabilities. Note this
2230 // must be done before calling moreCapabilities(), because that
2231 // will emit events about creating the new capabilities and adding
2232 // them to existing capsets.
2233 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2234 #endif
2235
2236 // Resize the capabilities array
2237 // NB. after this, capabilities points somewhere new. Any pointers
2238 // of type (Capability *) are now invalid.
2239 moreCapabilities(n_capabilities, new_n_capabilities);
2240
2241 // Resize and update storage manager data structures
2242 storageAddCapabilities(n_capabilities, new_n_capabilities);
2243 }
2244 }
2245
2246 // update n_capabilities before things start running
2247 if (new_n_capabilities > n_capabilities) {
2248 n_capabilities = enabled_capabilities = new_n_capabilities;
2249 }
2250
2251 // We're done: release the original Capabilities
2252 releaseAllCapabilities(old_n_capabilities, cap,task);
2253
2254 // We can't free the old array until now, because we access it
2255 // while updating pointers in updateCapabilityRefs().
2256 if (old_capabilities) {
2257 stgFree(old_capabilities);
2258 }
2259
2260 // Notify IO manager that the number of capabilities has changed.
2261 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2262
2263 rts_unlock(cap);
2264
2265 #endif // THREADED_RTS
2266 }
2267
2268
2269
2270 /* ---------------------------------------------------------------------------
2271 * Delete all the threads in the system
2272 * ------------------------------------------------------------------------- */
2273
2274 static void
2275 deleteAllThreads ()
2276 {
2277 // NOTE: only safe to call if we own all capabilities.
2278
2279 StgTSO* t, *next;
2280 uint32_t g;
2281
2282 debugTrace(DEBUG_sched,"deleting all threads");
2283 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2284 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2285 next = t->global_link;
2286 deleteThread(t);
2287 }
2288 }
2289
2290 // The run queue now contains a bunch of ThreadKilled threads. We
2291 // must not throw these away: the main thread(s) will be in there
2292 // somewhere, and the main scheduler loop has to deal with it.
2293 // Also, the run queue is the only thing keeping these threads from
2294 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2295
2296 #if !defined(THREADED_RTS)
2297 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2298 ASSERT(sleeping_queue == END_TSO_QUEUE);
2299 #endif
2300 }
2301
2302 /* -----------------------------------------------------------------------------
2303 Managing the suspended_ccalls list.
2304 Locks required: sched_mutex
2305 -------------------------------------------------------------------------- */
2306
2307 STATIC_INLINE void
2308 suspendTask (Capability *cap, Task *task)
2309 {
2310 InCall *incall;
2311
2312 incall = task->incall;
2313 ASSERT(incall->next == NULL && incall->prev == NULL);
2314 incall->next = cap->suspended_ccalls;
2315 incall->prev = NULL;
2316 if (cap->suspended_ccalls) {
2317 cap->suspended_ccalls->prev = incall;
2318 }
2319 cap->suspended_ccalls = incall;
2320 cap->n_suspended_ccalls++;
2321 }
2322
2323 STATIC_INLINE void
2324 recoverSuspendedTask (Capability *cap, Task *task)
2325 {
2326 InCall *incall;
2327
2328 incall = task->incall;
2329 if (incall->prev) {
2330 incall->prev->next = incall->next;
2331 } else {
2332 ASSERT(cap->suspended_ccalls == incall);
2333 cap->suspended_ccalls = incall->next;
2334 }
2335 if (incall->next) {
2336 incall->next->prev = incall->prev;
2337 }
2338 incall->next = incall->prev = NULL;
2339 cap->n_suspended_ccalls--;
2340 }
2341
2342 /* ---------------------------------------------------------------------------
2343 * Suspending & resuming Haskell threads.
2344 *
2345 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2346 * its capability before calling the C function. This allows another
2347 * task to pick up the capability and carry on running Haskell
2348 * threads. It also means that if the C call blocks, it won't lock
2349 * the whole system.
2350 *
2351 * The Haskell thread making the C call is put to sleep for the
2352 * duration of the call, on the suspended_ccalling_threads queue. We
2353 * give out a token to the task, which it can use to resume the thread
2354 * on return from the C function.
2355 *
2356 * If this is an interruptible C call, this means that the FFI call may be
2357 * unceremoniously terminated and should be scheduled on an
2358 * unbound worker thread.
2359 * ------------------------------------------------------------------------- */
2360
2361 void *
2362 suspendThread (StgRegTable *reg, bool interruptible)
2363 {
2364 Capability *cap;
2365 int saved_errno;
2366 StgTSO *tso;
2367 Task *task;
2368 #if defined(mingw32_HOST_OS)
2369 StgWord32 saved_winerror;
2370 #endif
2371
2372 saved_errno = errno;
2373 #if defined(mingw32_HOST_OS)
2374 saved_winerror = GetLastError();
2375 #endif
2376
2377 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2378 */
2379 cap = regTableToCapability(reg);
2380
2381 task = cap->running_task;
2382 tso = cap->r.rCurrentTSO;
2383
2384 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2385
2386 // XXX this might not be necessary --SDM
2387 tso->what_next = ThreadRunGHC;
2388
2389 threadPaused(cap,tso);
2390
2391 if (interruptible) {
2392 tso->why_blocked = BlockedOnCCall_Interruptible;
2393 } else {
2394 tso->why_blocked = BlockedOnCCall;
2395 }
2396
2397 // Hand back capability
2398 task->incall->suspended_tso = tso;
2399 task->incall->suspended_cap = cap;
2400
2401 // Otherwise allocate() will write to invalid memory.
2402 cap->r.rCurrentTSO = NULL;
2403
2404 ACQUIRE_LOCK(&cap->lock);
2405
2406 suspendTask(cap,task);
2407 cap->in_haskell = false;
2408 releaseCapability_(cap,false);
2409
2410 RELEASE_LOCK(&cap->lock);
2411
2412 errno = saved_errno;
2413 #if defined(mingw32_HOST_OS)
2414 SetLastError(saved_winerror);
2415 #endif
2416 return task;
2417 }
2418
2419 StgRegTable *
2420 resumeThread (void *task_)
2421 {
2422 StgTSO *tso;
2423 InCall *incall;
2424 Capability *cap;
2425 Task *task = task_;
2426 int saved_errno;
2427 #if defined(mingw32_HOST_OS)
2428 StgWord32 saved_winerror;
2429 #endif
2430
2431 saved_errno = errno;
2432 #if defined(mingw32_HOST_OS)
2433 saved_winerror = GetLastError();
2434 #endif
2435
2436 incall = task->incall;
2437 cap = incall->suspended_cap;
2438 task->cap = cap;
2439
2440 // Wait for permission to re-enter the RTS with the result.
2441 waitForCapability(&cap,task);
2442 // we might be on a different capability now... but if so, our
2443 // entry on the suspended_ccalls list will also have been
2444 // migrated.
2445
2446 // Remove the thread from the suspended list
2447 recoverSuspendedTask(cap,task);
2448
2449 tso = incall->suspended_tso;
2450 incall->suspended_tso = NULL;
2451 incall->suspended_cap = NULL;
2452 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2453
2454 traceEventRunThread(cap, tso);
2455
2456 /* Reset blocking status */
2457 tso->why_blocked = NotBlocked;
2458
2459 if ((tso->flags & TSO_BLOCKEX) == 0) {
2460 // avoid locking the TSO if we don't have to
2461 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2462 maybePerformBlockedException(cap,tso);
2463 }
2464 }
2465
2466 cap->r.rCurrentTSO = tso;
2467 cap->in_haskell = true;
2468 errno = saved_errno;
2469 #if defined(mingw32_HOST_OS)
2470 SetLastError(saved_winerror);
2471 #endif
2472
2473 /* We might have GC'd, mark the TSO dirty again */
2474 dirty_TSO(cap,tso);
2475 dirty_STACK(cap,tso->stackobj);
2476
2477 IF_DEBUG(sanity, checkTSO(tso));
2478
2479 return &cap->r;
2480 }
2481
2482 /* ---------------------------------------------------------------------------
2483 * scheduleThread()
2484 *
2485 * scheduleThread puts a thread on the end of the runnable queue.
2486 * This will usually be done immediately after a thread is created.
2487 * The caller of scheduleThread must create the thread using e.g.
2488 * createThread and push an appropriate closure
2489 * on this thread's stack before the scheduler is invoked.
2490 * ------------------------------------------------------------------------ */
2491
2492 void
2493 scheduleThread(Capability *cap, StgTSO *tso)
2494 {
2495 // The thread goes at the *end* of the run-queue, to avoid possible
2496 // starvation of any threads already on the queue.
2497 appendToRunQueue(cap,tso);
2498 }
2499
2500 void
2501 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2502 {
2503 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2504 // move this thread from now on.
2505 #if defined(THREADED_RTS)
2506 cpu %= enabled_capabilities;
2507 if (cpu == cap->no) {
2508 appendToRunQueue(cap,tso);
2509 } else {
2510 migrateThread(cap, tso, capabilities[cpu]);
2511 }
2512 #else
2513 appendToRunQueue(cap,tso);
2514 #endif
2515 }
2516
2517 void
2518 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2519 {
2520 Task *task;
2521 DEBUG_ONLY( StgThreadID id );
2522 Capability *cap;
2523
2524 cap = *pcap;
2525
2526 // We already created/initialised the Task
2527 task = cap->running_task;
2528
2529 // This TSO is now a bound thread; make the Task and TSO
2530 // point to each other.
2531 tso->bound = task->incall;
2532 tso->cap = cap;
2533
2534 task->incall->tso = tso;
2535 task->incall->ret = ret;
2536 task->incall->rstat = NoStatus;
2537
2538 appendToRunQueue(cap,tso);
2539
2540 DEBUG_ONLY( id = tso->id );
2541 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2542
2543 cap = schedule(cap,task);
2544
2545 ASSERT(task->incall->rstat != NoStatus);
2546 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2547
2548 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2549 *pcap = cap;
2550 }
2551
2552 /* ----------------------------------------------------------------------------
2553 * Starting Tasks
2554 * ------------------------------------------------------------------------- */
2555
2556 #if defined(THREADED_RTS)
2557 void scheduleWorker (Capability *cap, Task *task)
2558 {
2559 // schedule() runs without a lock.
2560 cap = schedule(cap,task);
2561
2562 // On exit from schedule(), we have a Capability, but possibly not
2563 // the same one we started with.
2564
2565 // During shutdown, the requirement is that after all the
2566 // Capabilities are shut down, all workers that are shutting down
2567 // have finished workerTaskStop(). This is why we hold on to
2568 // cap->lock until we've finished workerTaskStop() below.
2569 //
2570 // There may be workers still involved in foreign calls; those
2571 // will just block in waitForCapability() because the
2572 // Capability has been shut down.
2573 //
2574 ACQUIRE_LOCK(&cap->lock);
2575 releaseCapability_(cap,false);
2576 workerTaskStop(task);
2577 RELEASE_LOCK(&cap->lock);
2578 }
2579 #endif
2580
2581 /* ---------------------------------------------------------------------------
2582 * Start new worker tasks on Capabilities from--to
2583 * -------------------------------------------------------------------------- */
2584
2585 static void
2586 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2587 {
2588 #if defined(THREADED_RTS)
2589 uint32_t i;
2590 Capability *cap;
2591
2592 for (i = from; i < to; i++) {
2593 cap = capabilities[i];
2594 ACQUIRE_LOCK(&cap->lock);
2595 startWorkerTask(cap);
2596 RELEASE_LOCK(&cap->lock);
2597 }
2598 #endif
2599 }
2600
2601 /* ---------------------------------------------------------------------------
2602 * initScheduler()
2603 *
2604 * Initialise the scheduler. This resets all the queues - if the
2605 * queues contained any threads, they'll be garbage collected at the
2606 * next pass.
2607 *
2608 * ------------------------------------------------------------------------ */
2609
2610 void
2611 initScheduler(void)
2612 {
2613 #if !defined(THREADED_RTS)
2614 blocked_queue_hd = END_TSO_QUEUE;
2615 blocked_queue_tl = END_TSO_QUEUE;
2616 sleeping_queue = END_TSO_QUEUE;
2617 #endif
2618
2619 sched_state = SCHED_RUNNING;
2620 recent_activity = ACTIVITY_YES;
2621
2622 #if defined(THREADED_RTS)
2623 /* Initialise the mutex and condition variables used by
2624 * the scheduler. */
2625 initMutex(&sched_mutex);
2626 #endif
2627
2628 ACQUIRE_LOCK(&sched_mutex);
2629
2630 allocated_bytes_at_heapoverflow = 0;
2631
2632 /* A capability holds the state a native thread needs in
2633 * order to execute STG code. At least one capability is
2634 * floating around (only THREADED_RTS builds have more than one).
2635 */
2636 initCapabilities();
2637
2638 initTaskManager();
2639
2640 /*
2641 * Eagerly start one worker to run each Capability, except for
2642 * Capability 0. The idea is that we're probably going to start a
2643 * bound thread on Capability 0 pretty soon, so we don't want a
2644 * worker task hogging it.
2645 */
2646 startWorkerTasks(1, n_capabilities);
2647
2648 RELEASE_LOCK(&sched_mutex);
2649
2650 }
2651
2652 void
2653 exitScheduler (bool wait_foreign USED_IF_THREADS)
2654 /* see Capability.c, shutdownCapability() */
2655 {
2656 Task *task = newBoundTask();
2657
2658 // If we haven't killed all the threads yet, do it now.
2659 if (sched_state < SCHED_SHUTTING_DOWN) {
2660 sched_state = SCHED_INTERRUPTING;
2661 Capability *cap = task->cap;
2662 waitForCapability(&cap,task);
2663 scheduleDoGC(&cap,task,true);
2664 ASSERT(task->incall->tso == NULL);
2665 releaseCapability(cap);
2666 }
2667 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2668
2669 shutdownCapabilities(task, wait_foreign);
2670
2671 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2672 // n_failed_trygrab_idles, n_idle_caps);
2673
2674 boundTaskExiting(task);
2675 }
2676
2677 void
2678 freeScheduler( void )
2679 {
2680 uint32_t still_running;
2681
2682 ACQUIRE_LOCK(&sched_mutex);
2683 still_running = freeTaskManager();
2684 // We can only free the Capabilities if there are no Tasks still
2685 // running. We might have a Task about to return from a foreign
2686 // call into waitForCapability(), for example (actually,
2687 // this should be the *only* thing that a still-running Task can
2688 // do at this point, and it will block waiting for the
2689 // Capability).
2690 if (still_running == 0) {
2691 freeCapabilities();
2692 }
2693 RELEASE_LOCK(&sched_mutex);
2694 #if defined(THREADED_RTS)
2695 closeMutex(&sched_mutex);
2696 #endif
2697 }
2698
2699 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2700 void *user USED_IF_NOT_THREADS)
2701 {
2702 #if !defined(THREADED_RTS)
2703 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2704 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2705 evac(user, (StgClosure **)(void *)&sleeping_queue);
2706 #endif
2707 }
2708
2709 /* -----------------------------------------------------------------------------
2710 performGC
2711
2712 This is the interface to the garbage collector from Haskell land.
2713 We provide this so that external C code can allocate and garbage
2714 collect when called from Haskell via _ccall_GC.
2715 -------------------------------------------------------------------------- */
2716
2717 static void
2718 performGC_(bool force_major)
2719 {
2720 Task *task;
2721 Capability *cap = NULL;
2722
2723 // We must grab a new Task here, because the existing Task may be
2724 // associated with a particular Capability, and chained onto the
2725 // suspended_ccalls queue.
2726 task = newBoundTask();
2727
2728 // TODO: do we need to traceTask*() here?
2729
2730 waitForCapability(&cap,task);
2731 scheduleDoGC(&cap,task,force_major);
2732 releaseCapability(cap);
2733 boundTaskExiting(task);
2734 }
2735
2736 void
2737 performGC(void)
2738 {
2739 performGC_(false);
2740 }
2741
2742 void
2743 performMajorGC(void)
2744 {
2745 performGC_(true);
2746 }
2747
2748 /* ---------------------------------------------------------------------------
2749 Interrupt execution.
2750 Might be called inside a signal handler so it mustn't do anything fancy.
2751 ------------------------------------------------------------------------ */
2752
2753 void
2754 interruptStgRts(void)
2755 {
2756 ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2757 sched_state = SCHED_INTERRUPTING;
2758 interruptAllCapabilities();
2759 #if defined(THREADED_RTS)
2760 wakeUpRts();
2761 #endif
2762 }
2763
2764 /* -----------------------------------------------------------------------------
2765 Wake up the RTS
2766
2767 This function causes at least one OS thread to wake up and run the
2768 scheduler loop. It is invoked when the RTS might be deadlocked, or
2769 an external event has arrived that may need servicing (eg. a
2770 keyboard interrupt).
2771
2772 In the single-threaded RTS we don't do anything here; we only have
2773 one thread anyway, and the event that caused us to want to wake up
2774 will have interrupted any blocking system call in progress anyway.
2775 -------------------------------------------------------------------------- */
2776
2777 #if defined(THREADED_RTS)
2778 void wakeUpRts(void)
2779 {
2780 // This forces the IO Manager thread to wakeup, which will
2781 // in turn ensure that some OS thread wakes up and runs the
2782 // scheduler loop, which will cause a GC and deadlock check.
2783 ioManagerWakeup();
2784 }
2785 #endif
2786
2787 /* -----------------------------------------------------------------------------
2788 Deleting threads
2789
2790 This is used for interruption (^C) and forking, and corresponds to
2791 raising an exception but without letting the thread catch the
2792 exception.
2793 -------------------------------------------------------------------------- */
2794
2795 static void
2796 deleteThread (StgTSO *tso)
2797 {
2798 // NOTE: must only be called on a TSO that we have exclusive
2799 // access to, because we will call throwToSingleThreaded() below.
2800 // The TSO must be on the run queue of the Capability we own, or
2801 // we must own all Capabilities.
2802
2803 if (tso->why_blocked != BlockedOnCCall &&
2804 tso->why_blocked != BlockedOnCCall_Interruptible) {
2805 throwToSingleThreaded(tso->cap,tso,NULL);
2806 }
2807 }
2808
2809 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2810 static void
2811 deleteThread_(StgTSO *tso)
2812 { // for forkProcess only:
2813 // like deleteThread(), but we delete threads in foreign calls, too.
2814
2815 if (tso->why_blocked == BlockedOnCCall ||
2816 tso->why_blocked == BlockedOnCCall_Interruptible) {
2817 tso->what_next = ThreadKilled;
2818 appendToRunQueue(tso->cap, tso);
2819 } else {
2820 deleteThread(tso);
2821 }
2822 }
2823 #endif
2824
2825 /* -----------------------------------------------------------------------------
2826 raiseExceptionHelper
2827
2828 This function is called by the raise# primitive, just so that we can
2829 move some of the tricky bits of raising an exception from C-- into
2830 C. Who knows, it might be a useful re-useable thing here too.
2831 -------------------------------------------------------------------------- */
2832
2833 StgWord
2834 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2835 {
2836 Capability *cap = regTableToCapability(reg);
2837 StgThunk *raise_closure = NULL;
2838 StgPtr p, next;
2839 const StgRetInfoTable *info;
2840 //
2841 // This closure represents the expression 'raise# E' where E
2842 // is the exception raise. It is used to overwrite all the
2843 // thunks which are currently under evaluation.
2844 //
2845
2846 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2847 // LDV profiling: stg_raise_info has THUNK as its closure
2848 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2849 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2850 // 1 does not cause any problem unless profiling is performed.
2851 // However, when LDV profiling goes on, we need to linearly scan
2852 // small object pool, where raise_closure is stored, so we should
2853 // use MIN_UPD_SIZE.
2854 //
2855 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2856 // sizeofW(StgClosure)+1);
2857 //
2858
2859 //
2860 // Walk up the stack, looking for the catch frame. On the way,
2861 // we update any closures pointed to from update frames with the
2862 // raise closure that we just built.
2863 //
2864 p = tso->stackobj->sp;
2865 while(1) {
2866 info = get_ret_itbl((StgClosure *)p);
2867 next = p + stack_frame_sizeW((StgClosure *)p);
2868 switch (info->i.type) {
2869
2870 case UPDATE_FRAME:
2871 // Only create raise_closure if we need to.
2872 if (raise_closure == NULL) {
2873 raise_closure =
2874 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2875 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2876 raise_closure->payload[0] = exception;
2877 }
2878 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2879 (StgClosure *)raise_closure);
2880 p = next;
2881 continue;
2882
2883 case ATOMICALLY_FRAME:
2884 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2885 tso->stackobj->sp = p;
2886 return ATOMICALLY_FRAME;
2887
2888 case CATCH_FRAME:
2889 tso->stackobj->sp = p;
2890 return CATCH_FRAME;
2891
2892 case CATCH_STM_FRAME:
2893 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2894 tso->stackobj->sp = p;
2895 return CATCH_STM_FRAME;
2896
2897 case UNDERFLOW_FRAME:
2898 tso->stackobj->sp = p;
2899 threadStackUnderflow(cap,tso);
2900 p = tso->stackobj->sp;
2901 continue;
2902
2903 case STOP_FRAME:
2904 tso->stackobj->sp = p;
2905 return STOP_FRAME;
2906
2907 case CATCH_RETRY_FRAME: {
2908 StgTRecHeader *trec = tso -> trec;
2909 StgTRecHeader *outer = trec -> enclosing_trec;
2910 debugTrace(DEBUG_stm,
2911 "found CATCH_RETRY_FRAME at %p during raise", p);
2912 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2913 stmAbortTransaction(cap, trec);
2914 stmFreeAbortedTRec(cap, trec);
2915 tso -> trec = outer;
2916 p = next;
2917 continue;
2918 }
2919
2920 default:
2921 p = next;
2922 continue;
2923 }
2924 }
2925 }
2926
2927
2928 /* -----------------------------------------------------------------------------
2929 findRetryFrameHelper
2930
2931 This function is called by the retry# primitive. It traverses the stack
2932 leaving tso->sp referring to the frame which should handle the retry.
2933
2934 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2935 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2936
2937 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2938 create) because retries are not considered to be exceptions, despite the
2939 similar implementation.
2940
2941 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2942 not be created within memory transactions.
2943 -------------------------------------------------------------------------- */
2944
2945 StgWord
2946 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2947 {
2948 const StgRetInfoTable *info;
2949 StgPtr p, next;
2950
2951 p = tso->stackobj->sp;
2952 while (1) {
2953 info = get_ret_itbl((const StgClosure *)p);
2954 next = p + stack_frame_sizeW((StgClosure *)p);
2955 switch (info->i.type) {
2956
2957 case ATOMICALLY_FRAME:
2958 debugTrace(DEBUG_stm,
2959 "found ATOMICALLY_FRAME at %p during retry", p);
2960 tso->stackobj->sp = p;
2961 return ATOMICALLY_FRAME;
2962
2963 case CATCH_RETRY_FRAME:
2964 debugTrace(DEBUG_stm,
2965 "found CATCH_RETRY_FRAME at %p during retry", p);
2966 tso->stackobj->sp = p;
2967 return CATCH_RETRY_FRAME;
2968
2969 case CATCH_STM_FRAME: {
2970 StgTRecHeader *trec = tso -> trec;
2971 StgTRecHeader *outer = trec -> enclosing_trec;
2972 debugTrace(DEBUG_stm,
2973 "found CATCH_STM_FRAME at %p during retry", p);
2974 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2975 stmAbortTransaction(cap, trec);
2976 stmFreeAbortedTRec(cap, trec);
2977 tso -> trec = outer;
2978 p = next;
2979 continue;
2980 }
2981
2982 case UNDERFLOW_FRAME:
2983 tso->stackobj->sp = p;
2984 threadStackUnderflow(cap,tso);
2985 p = tso->stackobj->sp;
2986 continue;
2987
2988 default:
2989 ASSERT(info->i.type != CATCH_FRAME);
2990 ASSERT(info->i.type != STOP_FRAME);
2991 p = next;
2992 continue;
2993 }
2994 }
2995 }
2996
2997 /* -----------------------------------------------------------------------------
2998 resurrectThreads is called after garbage collection on the list of
2999 threads found to be garbage. Each of these threads will be woken
3000 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3001 on an MVar, or NonTermination if the thread was blocked on a Black
3002 Hole.
3003
3004 Locks: assumes we hold *all* the capabilities.
3005 -------------------------------------------------------------------------- */
3006
3007 void
3008 resurrectThreads (StgTSO *threads)
3009 {
3010 StgTSO *tso, *next;
3011 Capability *cap;
3012 generation *gen;
3013
3014 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3015 next = tso->global_link;
3016
3017 gen = Bdescr((P_)tso)->gen;
3018 tso->global_link = gen->threads;
3019 gen->threads = tso;
3020
3021 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3022
3023 // Wake up the thread on the Capability it was last on
3024 cap = tso->cap;
3025
3026 switch (tso->why_blocked) {
3027 case BlockedOnMVar:
3028 case BlockedOnMVarRead:
3029 /* Called by GC - sched_mutex lock is currently held. */
3030 throwToSingleThreaded(cap, tso,
3031 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3032 break;
3033 case BlockedOnBlackHole:
3034 throwToSingleThreaded(cap, tso,
3035 (StgClosure *)nonTermination_closure);
3036 break;
3037 case BlockedOnSTM:
3038 throwToSingleThreaded(cap, tso,
3039 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3040 break;
3041 case NotBlocked:
3042 /* This might happen if the thread was blocked on a black hole
3043 * belonging to a thread that we've just woken up (raiseAsync
3044 * can wake up threads, remember...).
3045 */
3046 continue;
3047 case BlockedOnMsgThrowTo:
3048 // This can happen if the target is masking, blocks on a
3049 // black hole, and then is found to be unreachable. In
3050 // this case, we want to let the target wake up and carry
3051 // on, and do nothing to this thread.
3052 continue;
3053 default:
3054 barf("resurrectThreads: thread blocked in a strange way: %d",
3055 tso->why_blocked);
3056 }
3057 }
3058 }