f1363e46ee7d52ee8125ab6092fa0466ccd8b043
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45 #include "TopHandler.h"
46
47 #if defined(HAVE_SYS_TYPES_H)
48 #include <sys/types.h>
49 #endif
50 #if defined(HAVE_UNISTD_H)
51 #include <unistd.h>
52 #endif
53
54 #include <string.h>
55 #include <stdlib.h>
56 #include <stdarg.h>
57
58 #if defined(HAVE_ERRNO_H)
59 #include <errno.h>
60 #endif
61
62 #if defined(TRACING)
63 #include "eventlog/EventLog.h"
64 #endif
65 /* -----------------------------------------------------------------------------
66 * Global variables
67 * -------------------------------------------------------------------------- */
68
69 #if !defined(THREADED_RTS)
70 // Blocked/sleeping thrads
71 StgTSO *blocked_queue_hd = NULL;
72 StgTSO *blocked_queue_tl = NULL;
73 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
74 #endif
75
76 // Bytes allocated since the last time a HeapOverflow exception was thrown by
77 // the RTS
78 uint64_t allocated_bytes_at_heapoverflow = 0;
79
80 /* Set to true when the latest garbage collection failed to reclaim enough
81 * space, and the runtime should proceed to shut itself down in an orderly
82 * fashion (emitting profiling info etc.), OR throw an exception to the main
83 * thread, if it is still alive.
84 */
85 bool heap_overflow = false;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88 * LOCK: currently none, perhaps we should lock (but needs to be
89 * updated in the fast path of the scheduler).
90 *
91 * NB. must be StgWord, we do xchg() on it.
92 */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96 * LOCK: none (changes monotonically)
97 */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 /* -----------------------------------------------------------------------------
113 * static function prototypes
114 * -------------------------------------------------------------------------- */
115
116 static Capability *schedule (Capability *initialCapability, Task *task);
117
118 //
119 // These functions all encapsulate parts of the scheduler loop, and are
120 // abstracted only to make the structure and control flow of the
121 // scheduler clearer.
122 //
123 static void schedulePreLoop (void);
124 static void scheduleFindWork (Capability **pcap);
125 #if defined(THREADED_RTS)
126 static void scheduleYield (Capability **pcap, Task *task);
127 #endif
128 #if defined(THREADED_RTS)
129 static bool requestSync (Capability **pcap, Task *task,
130 PendingSync *sync_type, SyncType *prev_sync_type);
131 static void acquireAllCapabilities(Capability *cap, Task *task);
132 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
133 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
134 uint32_t to USED_IF_THREADS);
135 #endif
136 static void scheduleStartSignalHandlers (Capability *cap);
137 static void scheduleCheckBlockedThreads (Capability *cap);
138 static void scheduleProcessInbox(Capability **cap);
139 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
140 static void schedulePushWork(Capability *cap, Task *task);
141 #if defined(THREADED_RTS)
142 static void scheduleActivateSpark(Capability *cap);
143 #endif
144 static void schedulePostRunThread(Capability *cap, StgTSO *t);
145 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
146 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
147 uint32_t prev_what_next );
148 static void scheduleHandleThreadBlocked( StgTSO *t );
149 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
150 StgTSO *t );
151 static bool scheduleNeedHeapProfile(bool ready_to_gc);
152 static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162 Main scheduling loop.
163
164 We use round-robin scheduling, each thread returning to the
165 scheduler loop when one of these conditions is detected:
166
167 * out of heap space
168 * timer expires (thread yields)
169 * thread blocks
170 * thread ends
171 * stack overflow
172
173 ------------------------------------------------------------------------ */
174
175 static Capability *
176 schedule (Capability *initialCapability, Task *task)
177 {
178 StgTSO *t;
179 Capability *cap;
180 StgThreadReturnCode ret;
181 uint32_t prev_what_next;
182 bool ready_to_gc;
183 #if defined(THREADED_RTS)
184 bool first = true;
185 #endif
186
187 cap = initialCapability;
188
189 // Pre-condition: this task owns initialCapability.
190 // The sched_mutex is *NOT* held
191 // NB. on return, we still hold a capability.
192
193 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
194
195 schedulePreLoop();
196
197 // -----------------------------------------------------------
198 // Scheduler loop starts here:
199
200 while (1) {
201
202 // Check whether we have re-entered the RTS from Haskell without
203 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
204 // call).
205 if (cap->in_haskell) {
206 errorBelch("schedule: re-entered unsafely.\n"
207 " Perhaps a 'foreign import unsafe' should be 'safe'?");
208 stg_exit(EXIT_FAILURE);
209 }
210
211 // Note [shutdown]: The interruption / shutdown sequence.
212 //
213 // In order to cleanly shut down the runtime, we want to:
214 // * make sure that all main threads return to their callers
215 // with the state 'Interrupted'.
216 // * clean up all OS threads assocated with the runtime
217 // * free all memory etc.
218 //
219 // So the sequence goes like this:
220 //
221 // * The shutdown sequence is initiated by calling hs_exit(),
222 // interruptStgRts(), or running out of memory in the GC.
223 //
224 // * Set sched_state = SCHED_INTERRUPTING
225 //
226 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
227 // scheduleDoGC(), which halts the whole runtime by acquiring all the
228 // capabilities, does a GC and then calls deleteAllThreads() to kill all
229 // the remaining threads. The zombies are left on the run queue for
230 // cleaning up. We can't kill threads involved in foreign calls.
231 //
232 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
233 //
234 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
235 // enforced by the scheduler, which won't run any Haskell code when
236 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
237 // other capabilities by doing the GC earlier.
238 //
239 // * all workers exit when the run queue on their capability
240 // drains. All main threads will also exit when their TSO
241 // reaches the head of the run queue and they can return.
242 //
243 // * eventually all Capabilities will shut down, and the RTS can
244 // exit.
245 //
246 // * We might be left with threads blocked in foreign calls,
247 // we should really attempt to kill these somehow (TODO).
248
249 switch (sched_state) {
250 case SCHED_RUNNING:
251 break;
252 case SCHED_INTERRUPTING:
253 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
254 /* scheduleDoGC() deletes all the threads */
255 scheduleDoGC(&cap,task,true);
256
257 // after scheduleDoGC(), we must be shutting down. Either some
258 // other Capability did the final GC, or we did it above,
259 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 // case now.
261 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
262 // fall through
263
264 case SCHED_SHUTTING_DOWN:
265 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
266 // If we are a worker, just exit. If we're a bound thread
267 // then we will exit below when we've removed our TSO from
268 // the run queue.
269 if (!isBoundTask(task) && emptyRunQueue(cap)) {
270 return cap;
271 }
272 break;
273 default:
274 barf("sched_state: %" FMT_Word, sched_state);
275 }
276
277 scheduleFindWork(&cap);
278
279 /* work pushing, currently relevant only for THREADED_RTS:
280 (pushes threads, wakes up idle capabilities for stealing) */
281 schedulePushWork(cap,task);
282
283 scheduleDetectDeadlock(&cap,task);
284
285 // Normally, the only way we can get here with no threads to
286 // run is if a keyboard interrupt received during
287 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
288 // Additionally, it is not fatal for the
289 // threaded RTS to reach here with no threads to run.
290 //
291 // win32: might be here due to awaitEvent() being abandoned
292 // as a result of a console event having been delivered.
293
294 #if defined(THREADED_RTS)
295 if (first)
296 {
297 // XXX: ToDo
298 // // don't yield the first time, we want a chance to run this
299 // // thread for a bit, even if there are others banging at the
300 // // door.
301 // first = false;
302 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
303 }
304
305 scheduleYield(&cap,task);
306
307 if (emptyRunQueue(cap)) continue; // look for work again
308 #endif
309
310 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
311 if ( emptyRunQueue(cap) ) {
312 ASSERT(sched_state >= SCHED_INTERRUPTING);
313 }
314 #endif
315
316 //
317 // Get a thread to run
318 //
319 t = popRunQueue(cap);
320
321 // Sanity check the thread we're about to run. This can be
322 // expensive if there is lots of thread switching going on...
323 IF_DEBUG(sanity,checkTSO(t));
324
325 #if defined(THREADED_RTS)
326 // Check whether we can run this thread in the current task.
327 // If not, we have to pass our capability to the right task.
328 {
329 InCall *bound = t->bound;
330
331 if (bound) {
332 if (bound->task == task) {
333 // yes, the Haskell thread is bound to the current native thread
334 } else {
335 debugTrace(DEBUG_sched,
336 "thread %lu bound to another OS thread",
337 (unsigned long)t->id);
338 // no, bound to a different Haskell thread: pass to that thread
339 pushOnRunQueue(cap,t);
340 continue;
341 }
342 } else {
343 // The thread we want to run is unbound.
344 if (task->incall->tso) {
345 debugTrace(DEBUG_sched,
346 "this OS thread cannot run thread %lu",
347 (unsigned long)t->id);
348 // no, the current native thread is bound to a different
349 // Haskell thread, so pass it to any worker thread
350 pushOnRunQueue(cap,t);
351 continue;
352 }
353 }
354 }
355 #endif
356
357 // If we're shutting down, and this thread has not yet been
358 // killed, kill it now. This sometimes happens when a finalizer
359 // thread is created by the final GC, or a thread previously
360 // in a foreign call returns.
361 if (sched_state >= SCHED_INTERRUPTING &&
362 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
363 deleteThread(cap,t);
364 }
365
366 // If this capability is disabled, migrate the thread away rather
367 // than running it. NB. but not if the thread is bound: it is
368 // really hard for a bound thread to migrate itself. Believe me,
369 // I tried several ways and couldn't find a way to do it.
370 // Instead, when everything is stopped for GC, we migrate all the
371 // threads on the run queue then (see scheduleDoGC()).
372 //
373 // ToDo: what about TSO_LOCKED? Currently we're migrating those
374 // when the number of capabilities drops, but we never migrate
375 // them back if it rises again. Presumably we should, but after
376 // the thread has been migrated we no longer know what capability
377 // it was originally on.
378 #if defined(THREADED_RTS)
379 if (cap->disabled && !t->bound) {
380 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
381 migrateThread(cap, t, dest_cap);
382 continue;
383 }
384 #endif
385
386 /* context switches are initiated by the timer signal, unless
387 * the user specified "context switch as often as possible", with
388 * +RTS -C0
389 */
390 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
391 && !emptyThreadQueues(cap)) {
392 cap->context_switch = 1;
393 }
394
395 run_thread:
396
397 // CurrentTSO is the thread to run. It might be different if we
398 // loop back to run_thread, so make sure to set CurrentTSO after
399 // that.
400 cap->r.rCurrentTSO = t;
401
402 startHeapProfTimer();
403
404 // ----------------------------------------------------------------------
405 // Run the current thread
406
407 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
408 ASSERT(t->cap == cap);
409 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
410
411 prev_what_next = t->what_next;
412
413 errno = t->saved_errno;
414 #if defined(mingw32_HOST_OS)
415 SetLastError(t->saved_winerror);
416 #endif
417
418 // reset the interrupt flag before running Haskell code
419 cap->interrupt = 0;
420
421 cap->in_haskell = true;
422 cap->idle = 0;
423
424 dirty_TSO(cap,t);
425 dirty_STACK(cap,t->stackobj);
426
427 switch (recent_activity)
428 {
429 case ACTIVITY_DONE_GC: {
430 // ACTIVITY_DONE_GC means we turned off the timer signal to
431 // conserve power (see #1623). Re-enable it here.
432 uint32_t prev;
433 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
434 if (prev == ACTIVITY_DONE_GC) {
435 #if !defined(PROFILING)
436 startTimer();
437 #endif
438 }
439 break;
440 }
441 case ACTIVITY_INACTIVE:
442 // If we reached ACTIVITY_INACTIVE, then don't reset it until
443 // we've done the GC. The thread running here might just be
444 // the IO manager thread that handle_tick() woke up via
445 // wakeUpRts().
446 break;
447 default:
448 recent_activity = ACTIVITY_YES;
449 }
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
477 }
478
479 cap->in_haskell = false;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
486 // don't want it set when not running a Haskell thread.
487 cap->r.rCurrentTSO = NULL;
488
489 // And save the current errno in this thread.
490 // XXX: possibly bogus for SMP because this thread might already
491 // be running again, see code below.
492 t->saved_errno = errno;
493 #if defined(mingw32_HOST_OS)
494 // Similarly for Windows error code
495 t->saved_winerror = GetLastError();
496 #endif
497
498 if (ret == ThreadBlocked) {
499 if (t->why_blocked == BlockedOnBlackHole) {
500 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
501 traceEventStopThread(cap, t, t->why_blocked + 6,
502 owner != NULL ? owner->id : 0);
503 } else {
504 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
505 }
506 } else {
507 traceEventStopThread(cap, t, ret, 0);
508 }
509
510 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
511 ASSERT(t->cap == cap);
512
513 // ----------------------------------------------------------------------
514
515 // Costs for the scheduler are assigned to CCS_SYSTEM
516 stopHeapProfTimer();
517 #if defined(PROFILING)
518 cap->r.rCCCS = CCS_SYSTEM;
519 #endif
520
521 schedulePostRunThread(cap,t);
522
523 ready_to_gc = false;
524
525 switch (ret) {
526 case HeapOverflow:
527 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
528 break;
529
530 case StackOverflow:
531 // just adjust the stack for this thread, then pop it back
532 // on the run queue.
533 threadStackOverflow(cap, t);
534 pushOnRunQueue(cap,t);
535 break;
536
537 case ThreadYielding:
538 if (scheduleHandleYield(cap, t, prev_what_next)) {
539 // shortcut for switching between compiler/interpreter:
540 goto run_thread;
541 }
542 break;
543
544 case ThreadBlocked:
545 scheduleHandleThreadBlocked(t);
546 break;
547
548 case ThreadFinished:
549 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551 break;
552
553 default:
554 barf("schedule: invalid thread return code %d", (int)ret);
555 }
556
557 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
558 scheduleDoGC(&cap,task,false);
559 }
560 } /* end of while() */
561 }
562
563 /* -----------------------------------------------------------------------------
564 * Run queue operations
565 * -------------------------------------------------------------------------- */
566
567 static void
568 removeFromRunQueue (Capability *cap, StgTSO *tso)
569 {
570 if (tso->block_info.prev == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_hd == tso);
572 cap->run_queue_hd = tso->_link;
573 } else {
574 setTSOLink(cap, tso->block_info.prev, tso->_link);
575 }
576 if (tso->_link == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_tl == tso);
578 cap->run_queue_tl = tso->block_info.prev;
579 } else {
580 setTSOPrev(cap, tso->_link, tso->block_info.prev);
581 }
582 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
583 cap->n_run_queue--;
584
585 IF_DEBUG(sanity, checkRunQueue(cap));
586 }
587
588 void
589 promoteInRunQueue (Capability *cap, StgTSO *tso)
590 {
591 removeFromRunQueue(cap, tso);
592 pushOnRunQueue(cap, tso);
593 }
594
595 /* ----------------------------------------------------------------------------
596 * Setting up the scheduler loop
597 * ------------------------------------------------------------------------- */
598
599 static void
600 schedulePreLoop(void)
601 {
602 // initialisation for scheduler - what cannot go into initScheduler()
603
604 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
605 win32AllocStack();
606 #endif
607 }
608
609 /* -----------------------------------------------------------------------------
610 * scheduleFindWork()
611 *
612 * Search for work to do, and handle messages from elsewhere.
613 * -------------------------------------------------------------------------- */
614
615 static void
616 scheduleFindWork (Capability **pcap)
617 {
618 scheduleStartSignalHandlers(*pcap);
619
620 scheduleProcessInbox(pcap);
621
622 scheduleCheckBlockedThreads(*pcap);
623
624 #if defined(THREADED_RTS)
625 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
626 #endif
627 }
628
629 #if defined(THREADED_RTS)
630 STATIC_INLINE bool
631 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
632 {
633 // we need to yield this capability to someone else if..
634 // - another thread is initiating a GC, and we didn't just do a GC
635 // (see Note [GC livelock])
636 // - another Task is returning from a foreign call
637 // - the thread at the head of the run queue cannot be run
638 // by this Task (it is bound to another Task, or it is unbound
639 // and this task it bound).
640 //
641 // Note [GC livelock]
642 //
643 // If we are interrupted to do a GC, then we do not immediately do
644 // another one. This avoids a starvation situation where one
645 // Capability keeps forcing a GC and the other Capabilities make no
646 // progress at all.
647
648 return ((pending_sync && !didGcLast) ||
649 cap->n_returning_tasks != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 sched_state >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 didGcLast = yieldCapability(&cap,task, !didGcLast);
683 }
684 while (shouldYieldCapability(cap,task,didGcLast));
685
686 // note there may still be no threads on the run queue at this
687 // point, the caller has to check.
688
689 *pcap = cap;
690 return;
691 }
692 #endif
693
694 /* -----------------------------------------------------------------------------
695 * schedulePushWork()
696 *
697 * Push work to other Capabilities if we have some.
698 * -------------------------------------------------------------------------- */
699
700 static void
701 schedulePushWork(Capability *cap USED_IF_THREADS,
702 Task *task USED_IF_THREADS)
703 {
704 /* following code not for PARALLEL_HASKELL. I kept the call general,
705 future GUM versions might use pushing in a distributed setup */
706 #if defined(THREADED_RTS)
707
708 Capability *free_caps[n_capabilities], *cap0;
709 uint32_t i, n_wanted_caps, n_free_caps;
710
711 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
712
713 // migration can be turned off with +RTS -qm
714 if (!RtsFlags.ParFlags.migrate) {
715 spare_threads = 0;
716 }
717
718 // Figure out how many capabilities we want to wake up. We need at least
719 // sparkPoolSize(cap) plus the number of spare threads we have.
720 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
721 if (n_wanted_caps == 0) return;
722
723 // First grab as many free Capabilities as we can. ToDo: we should use
724 // capabilities on the same NUMA node preferably, but not exclusively.
725 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
726 n_free_caps < n_wanted_caps && i != cap->no;
727 i = (i + 1) % n_capabilities) {
728 cap0 = capabilities[i];
729 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
730 if (!emptyRunQueue(cap0)
731 || cap0->n_returning_tasks != 0
732 || !emptyInbox(cap0)) {
733 // it already has some work, we just grabbed it at
734 // the wrong moment. Or maybe it's deadlocked!
735 releaseCapability(cap0);
736 } else {
737 free_caps[n_free_caps++] = cap0;
738 }
739 }
740 }
741
742 // We now have n_free_caps free capabilities stashed in
743 // free_caps[]. Attempt to share our run queue equally with them.
744 // This is complicated slightly by the fact that we can't move
745 // some threads:
746 //
747 // - threads that have TSO_LOCKED cannot migrate
748 // - a thread that is bound to the current Task cannot be migrated
749 //
750 // This is about the simplest thing we could do; improvements we
751 // might want to do include:
752 //
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
756 //
757 // - giving low priority to moving long-lived threads
758
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761
762 debugTrace(DEBUG_sched,
763 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
764 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
765 n_free_caps);
766
767 // There are n_free_caps+1 caps in total. We will share the threads
768 // evently between them, *except* that if the run queue does not divide
769 // evenly by n_free_caps+1 then we bias towards the current capability.
770 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
771 uint32_t keep_threads =
772 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
773
774 // This also ensures that we don't give away all our threads, since
775 // (x + y) / (y + 1) >= 1 when x >= 1.
776
777 // The number of threads we have left.
778 uint32_t n = cap->n_run_queue;
779
780 // prev = the previous thread on this cap's run queue
781 prev = END_TSO_QUEUE;
782
783 // We're going to walk through the run queue, migrating threads to other
784 // capabilities until we have only keep_threads left. We might
785 // encounter a thread that cannot be migrated, in which case we add it
786 // to the current run queue and decrement keep_threads.
787 for (t = cap->run_queue_hd, i = 0;
788 t != END_TSO_QUEUE && n > keep_threads;
789 t = next)
790 {
791 next = t->_link;
792 t->_link = END_TSO_QUEUE;
793
794 // Should we keep this thread?
795 if (t->bound == task->incall // don't move my bound thread
796 || tsoLocked(t) // don't move a locked thread
797 ) {
798 if (prev == END_TSO_QUEUE) {
799 cap->run_queue_hd = t;
800 } else {
801 setTSOLink(cap, prev, t);
802 }
803 setTSOPrev(cap, t, prev);
804 prev = t;
805 if (keep_threads > 0) keep_threads--;
806 }
807
808 // Or migrate it?
809 else {
810 appendToRunQueue(free_caps[i],t);
811 traceEventMigrateThread (cap, t, free_caps[i]->no);
812
813 if (t->bound) { t->bound->task->cap = free_caps[i]; }
814 t->cap = free_caps[i];
815 n--; // we have one fewer threads now
816 i++; // move on to the next free_cap
817 if (i == n_free_caps) i = 0;
818 }
819 }
820
821 // Join up the beginning of the queue (prev)
822 // with the rest of the queue (t)
823 if (t == END_TSO_QUEUE) {
824 cap->run_queue_tl = prev;
825 } else {
826 setTSOPrev(cap, t, prev);
827 }
828 if (prev == END_TSO_QUEUE) {
829 cap->run_queue_hd = t;
830 } else {
831 setTSOLink(cap, prev, t);
832 }
833 cap->n_run_queue = n;
834
835 IF_DEBUG(sanity, checkRunQueue(cap));
836
837 // release the capabilities
838 for (i = 0; i < n_free_caps; i++) {
839 task->cap = free_caps[i];
840 if (sparkPoolSizeCap(cap) > 0) {
841 // If we have sparks to steal, wake up a worker on the
842 // capability, even if it has no threads to run.
843 releaseAndWakeupCapability(free_caps[i]);
844 } else {
845 releaseCapability(free_caps[i]);
846 }
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850
851 #endif /* THREADED_RTS */
852
853 }
854
855 /* ----------------------------------------------------------------------------
856 * Start any pending signal handlers
857 * ------------------------------------------------------------------------- */
858
859 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
860 static void
861 scheduleStartSignalHandlers(Capability *cap)
862 {
863 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
864 // safe outside the lock
865 startSignalHandlers(cap);
866 }
867 }
868 #else
869 static void
870 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
871 {
872 }
873 #endif
874
875 /* ----------------------------------------------------------------------------
876 * Check for blocked threads that can be woken up.
877 * ------------------------------------------------------------------------- */
878
879 static void
880 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
881 {
882 #if !defined(THREADED_RTS)
883 //
884 // Check whether any waiting threads need to be woken up. If the
885 // run queue is empty, and there are no other tasks running, we
886 // can wait indefinitely for something to happen.
887 //
888 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
889 {
890 awaitEvent (emptyRunQueue(cap));
891 }
892 #endif
893 }
894
895 /* ----------------------------------------------------------------------------
896 * Detect deadlock conditions and attempt to resolve them.
897 * ------------------------------------------------------------------------- */
898
899 static void
900 scheduleDetectDeadlock (Capability **pcap, Task *task)
901 {
902 Capability *cap = *pcap;
903 /*
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
907 * some description.
908 */
909 if ( emptyThreadQueues(cap) )
910 {
911 #if defined(THREADED_RTS)
912 /*
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
917 */
918 if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
922
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
927 // runnable.
928 scheduleDoGC (pcap, task, true/*force major GC*/);
929 cap = *pcap;
930 // when force_major == true. scheduleDoGC sets
931 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 // signal.
933
934 if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
937 /* If we have user-installed signal handlers, then wait
938 * for signals to arrive rather then bombing out with a
939 * deadlock.
940 */
941 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
942 debugTrace(DEBUG_sched,
943 "still deadlocked, waiting for signals...");
944
945 awaitUserSignals();
946
947 if (signals_pending()) {
948 startSignalHandlers(cap);
949 }
950
951 // either we have threads to run, or we were interrupted:
952 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
953
954 return;
955 }
956 #endif
957
958 #if !defined(THREADED_RTS)
959 /* Probably a real deadlock. Send the current main thread the
960 * Deadlock exception.
961 */
962 if (task->incall->tso) {
963 switch (task->incall->tso->why_blocked) {
964 case BlockedOnSTM:
965 case BlockedOnBlackHole:
966 case BlockedOnMsgThrowTo:
967 case BlockedOnMVar:
968 case BlockedOnMVarRead:
969 throwToSingleThreaded(cap, task->incall->tso,
970 (StgClosure *)nonTermination_closure);
971 return;
972 default:
973 barf("deadlock: main thread blocked in a strange way");
974 }
975 }
976 return;
977 #endif
978 }
979 }
980
981
982 /* ----------------------------------------------------------------------------
983 * Process message in the current Capability's inbox
984 * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
988 {
989 #if defined(THREADED_RTS)
990 Message *m, *next;
991 PutMVar *p, *pnext;
992 int r;
993 Capability *cap = *pcap;
994
995 while (!emptyInbox(cap)) {
996 // Executing messages might use heap, so we should check for GC.
997 if (doYouWantToGC(cap)) {
998 scheduleDoGC(pcap, cap->running_task, false);
999 cap = *pcap;
1000 }
1001
1002 // don't use a blocking acquire; if the lock is held by
1003 // another thread then just carry on. This seems to avoid
1004 // getting stuck in a message ping-pong situation with other
1005 // processors. We'll check the inbox again later anyway.
1006 //
1007 // We should really use a more efficient queue data structure
1008 // here. The trickiness is that we must ensure a Capability
1009 // never goes idle if the inbox is non-empty, which is why we
1010 // use cap->lock (cap->lock is released as the last thing
1011 // before going idle; see Capability.c:releaseCapability()).
1012 r = TRY_ACQUIRE_LOCK(&cap->lock);
1013 if (r != 0) return;
1014
1015 m = cap->inbox;
1016 p = cap->putMVars;
1017 cap->inbox = (Message*)END_TSO_QUEUE;
1018 cap->putMVars = NULL;
1019
1020 RELEASE_LOCK(&cap->lock);
1021
1022 while (m != (Message*)END_TSO_QUEUE) {
1023 next = m->link;
1024 executeMessage(cap, m);
1025 m = next;
1026 }
1027
1028 while (p != NULL) {
1029 pnext = p->link;
1030 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1031 Unit_closure);
1032 freeStablePtr(p->mvar);
1033 stgFree(p);
1034 p = pnext;
1035 }
1036 }
1037 #endif
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042 * Activate spark threads (THREADED_RTS)
1043 * ------------------------------------------------------------------------- */
1044
1045 #if defined(THREADED_RTS)
1046 static void
1047 scheduleActivateSpark(Capability *cap)
1048 {
1049 if (anySparks() && !cap->disabled)
1050 {
1051 createSparkThread(cap);
1052 debugTrace(DEBUG_sched, "creating a spark thread");
1053 }
1054 }
1055 #endif // THREADED_RTS
1056
1057 /* ----------------------------------------------------------------------------
1058 * After running a thread...
1059 * ------------------------------------------------------------------------- */
1060
1061 static void
1062 schedulePostRunThread (Capability *cap, StgTSO *t)
1063 {
1064 // We have to be able to catch transactions that are in an
1065 // infinite loop as a result of seeing an inconsistent view of
1066 // memory, e.g.
1067 //
1068 // atomically $ do
1069 // [a,b] <- mapM readTVar [ta,tb]
1070 // when (a == b) loop
1071 //
1072 // and a is never equal to b given a consistent view of memory.
1073 //
1074 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1075 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1076 debugTrace(DEBUG_sched | DEBUG_stm,
1077 "trec %p found wasting its time", t);
1078
1079 // strip the stack back to the
1080 // ATOMICALLY_FRAME, aborting the (nested)
1081 // transaction, and saving the stack of any
1082 // partially-evaluated thunks on the heap.
1083 throwToSingleThreaded_(cap, t, NULL, true);
1084
1085 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 }
1087 }
1088
1089 //
1090 // If the current thread's allocation limit has run out, send it
1091 // the AllocationLimitExceeded exception.
1092
1093 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1094 // Use a throwToSelf rather than a throwToSingleThreaded, because
1095 // it correctly handles the case where the thread is currently
1096 // inside mask. Also the thread might be blocked (e.g. on an
1097 // MVar), and throwToSingleThreaded doesn't unblock it
1098 // correctly in that case.
1099 throwToSelf(cap, t, allocationLimitExceeded_closure);
1100 ASSIGN_Int64((W_*)&(t->alloc_limit),
1101 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1102 }
1103
1104 /* some statistics gathering in the parallel case */
1105 }
1106
1107 /* -----------------------------------------------------------------------------
1108 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1109 * -------------------------------------------------------------------------- */
1110
1111 static bool
1112 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1113 {
1114 if (cap->r.rHpLim == NULL || cap->context_switch) {
1115 // Sometimes we miss a context switch, e.g. when calling
1116 // primitives in a tight loop, MAYBE_GC() doesn't check the
1117 // context switch flag, and we end up waiting for a GC.
1118 // See #1984, and concurrent/should_run/1984
1119 cap->context_switch = 0;
1120 appendToRunQueue(cap,t);
1121 } else {
1122 pushOnRunQueue(cap,t);
1123 }
1124
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1127 // if so, get one and push it on the front of the nursery.
1128 bdescr *bd;
1129 W_ blocks;
1130
1131 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1132
1133 if (blocks > BLOCKS_PER_MBLOCK) {
1134 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1135 }
1136
1137 debugTrace(DEBUG_sched,
1138 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1139 (long)t->id, what_next_strs[t->what_next], blocks);
1140
1141 // don't do this if the nursery is (nearly) full, we'll GC first.
1142 if (cap->r.rCurrentNursery->link != NULL ||
1143 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1144 // infinite loop if the
1145 // nursery has only one
1146 // block.
1147
1148 bd = allocGroupOnNode_lock(cap->node,blocks);
1149 cap->r.rNursery->n_blocks += blocks;
1150
1151 // link the new group after CurrentNursery
1152 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1153
1154 // initialise it as a nursery block. We initialise the
1155 // step, gen_no, and flags field of *every* sub-block in
1156 // this large block, because this is easier than making
1157 // sure that we always find the block head of a large
1158 // block whenever we call Bdescr() (eg. evacuate() and
1159 // isAlive() in the GC would both have to do this, at
1160 // least).
1161 {
1162 bdescr *x;
1163 for (x = bd; x < bd + blocks; x++) {
1164 initBdescr(x,g0,g0);
1165 x->free = x->start;
1166 x->flags = 0;
1167 }
1168 }
1169
1170 // This assert can be a killer if the app is doing lots
1171 // of large block allocations.
1172 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1173
1174 // now update the nursery to point to the new block
1175 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1176 cap->r.rCurrentNursery = bd;
1177
1178 // we might be unlucky and have another thread get on the
1179 // run queue before us and steal the large block, but in that
1180 // case the thread will just end up requesting another large
1181 // block.
1182 return false; /* not actually GC'ing */
1183 }
1184 }
1185
1186 return doYouWantToGC(cap);
1187 /* actual GC is done at the end of the while loop in schedule() */
1188 }
1189
1190 /* -----------------------------------------------------------------------------
1191 * Handle a thread that returned to the scheduler with ThreadYielding
1192 * -------------------------------------------------------------------------- */
1193
1194 static bool
1195 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1196 {
1197 /* put the thread back on the run queue. Then, if we're ready to
1198 * GC, check whether this is the last task to stop. If so, wake
1199 * up the GC thread. getThread will block during a GC until the
1200 * GC is finished.
1201 */
1202
1203 ASSERT(t->_link == END_TSO_QUEUE);
1204
1205 // Shortcut if we're just switching evaluators: just run the thread. See
1206 // Note [avoiding threadPaused] in Interpreter.c.
1207 if (t->what_next != prev_what_next) {
1208 debugTrace(DEBUG_sched,
1209 "--<< thread %ld (%s) stopped to switch evaluators",
1210 (long)t->id, what_next_strs[t->what_next]);
1211 return true;
1212 }
1213
1214 // Reset the context switch flag. We don't do this just before
1215 // running the thread, because that would mean we would lose ticks
1216 // during GC, which can lead to unfair scheduling (a thread hogs
1217 // the CPU because the tick always arrives during GC). This way
1218 // penalises threads that do a lot of allocation, but that seems
1219 // better than the alternative.
1220 if (cap->context_switch != 0) {
1221 cap->context_switch = 0;
1222 appendToRunQueue(cap,t);
1223 } else {
1224 pushOnRunQueue(cap,t);
1225 }
1226
1227 IF_DEBUG(sanity,
1228 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1229 checkTSO(t));
1230
1231 return false;
1232 }
1233
1234 /* -----------------------------------------------------------------------------
1235 * Handle a thread that returned to the scheduler with ThreadBlocked
1236 * -------------------------------------------------------------------------- */
1237
1238 static void
1239 scheduleHandleThreadBlocked( StgTSO *t
1240 #if !defined(DEBUG)
1241 STG_UNUSED
1242 #endif
1243 )
1244 {
1245
1246 // We don't need to do anything. The thread is blocked, and it
1247 // has tidied up its stack and placed itself on whatever queue
1248 // it needs to be on.
1249
1250 // ASSERT(t->why_blocked != NotBlocked);
1251 // Not true: for example,
1252 // - the thread may have woken itself up already, because
1253 // threadPaused() might have raised a blocked throwTo
1254 // exception, see maybePerformBlockedException().
1255
1256 #if defined(DEBUG)
1257 traceThreadStatus(DEBUG_sched, t);
1258 #endif
1259 }
1260
1261 /* -----------------------------------------------------------------------------
1262 * Handle a thread that returned to the scheduler with ThreadFinished
1263 * -------------------------------------------------------------------------- */
1264
1265 static bool
1266 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1267 {
1268 /* Need to check whether this was a main thread, and if so,
1269 * return with the return value.
1270 *
1271 * We also end up here if the thread kills itself with an
1272 * uncaught exception, see Exception.cmm.
1273 */
1274
1275 // blocked exceptions can now complete, even if the thread was in
1276 // blocked mode (see #2910).
1277 awakenBlockedExceptionQueue (cap, t);
1278
1279 //
1280 // Check whether the thread that just completed was a bound
1281 // thread, and if so return with the result.
1282 //
1283 // There is an assumption here that all thread completion goes
1284 // through this point; we need to make sure that if a thread
1285 // ends up in the ThreadKilled state, that it stays on the run
1286 // queue so it can be dealt with here.
1287 //
1288
1289 if (t->bound) {
1290
1291 if (t->bound != task->incall) {
1292 #if !defined(THREADED_RTS)
1293 // Must be a bound thread that is not the topmost one. Leave
1294 // it on the run queue until the stack has unwound to the
1295 // point where we can deal with this. Leaving it on the run
1296 // queue also ensures that the garbage collector knows about
1297 // this thread and its return value (it gets dropped from the
1298 // step->threads list so there's no other way to find it).
1299 appendToRunQueue(cap,t);
1300 return false;
1301 #else
1302 // this cannot happen in the threaded RTS, because a
1303 // bound thread can only be run by the appropriate Task.
1304 barf("finished bound thread that isn't mine");
1305 #endif
1306 }
1307
1308 ASSERT(task->incall->tso == t);
1309
1310 if (t->what_next == ThreadComplete) {
1311 if (task->incall->ret) {
1312 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1313 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1314 }
1315 task->incall->rstat = Success;
1316 } else {
1317 if (task->incall->ret) {
1318 *(task->incall->ret) = NULL;
1319 }
1320 if (sched_state >= SCHED_INTERRUPTING) {
1321 if (heap_overflow) {
1322 task->incall->rstat = HeapExhausted;
1323 } else {
1324 task->incall->rstat = Interrupted;
1325 }
1326 } else {
1327 task->incall->rstat = Killed;
1328 }
1329 }
1330 #if defined(DEBUG)
1331 removeThreadLabel((StgWord)task->incall->tso->id);
1332 #endif
1333
1334 // We no longer consider this thread and task to be bound to
1335 // each other. The TSO lives on until it is GC'd, but the
1336 // task is about to be released by the caller, and we don't
1337 // want anyone following the pointer from the TSO to the
1338 // defunct task (which might have already been
1339 // re-used). This was a real bug: the GC updated
1340 // tso->bound->tso which lead to a deadlock.
1341 t->bound = NULL;
1342 task->incall->tso = NULL;
1343
1344 return true; // tells schedule() to return
1345 }
1346
1347 return false;
1348 }
1349
1350 /* -----------------------------------------------------------------------------
1351 * Perform a heap census
1352 * -------------------------------------------------------------------------- */
1353
1354 static bool
1355 scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
1356 {
1357 // When we have +RTS -i0 and we're heap profiling, do a census at
1358 // every GC. This lets us get repeatable runs for debugging.
1359 if (performHeapProfile ||
1360 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1361 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1362 return true;
1363 } else {
1364 return false;
1365 }
1366 }
1367
1368 /* -----------------------------------------------------------------------------
1369 * stopAllCapabilities()
1370 *
1371 * Stop all Haskell execution. This is used when we need to make some global
1372 * change to the system, such as altering the number of capabilities, or
1373 * forking.
1374 *
1375 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1376 * -------------------------------------------------------------------------- */
1377
1378 #if defined(THREADED_RTS)
1379 static void stopAllCapabilities (Capability **pCap, Task *task)
1380 {
1381 bool was_syncing;
1382 SyncType prev_sync_type;
1383
1384 PendingSync sync = {
1385 .type = SYNC_OTHER,
1386 .idle = NULL,
1387 .task = task
1388 };
1389
1390 do {
1391 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1392 } while (was_syncing);
1393
1394 acquireAllCapabilities(*pCap,task);
1395
1396 pending_sync = 0;
1397 }
1398 #endif
1399
1400 /* -----------------------------------------------------------------------------
1401 * requestSync()
1402 *
1403 * Commence a synchronisation between all capabilities. Normally not called
1404 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1405 * has some special synchronisation requirements.
1406 *
1407 * Returns:
1408 * false if we successfully got a sync
1409 * true if there was another sync request in progress,
1410 * and we yielded to it. The value returned is the
1411 * type of the other sync request.
1412 * -------------------------------------------------------------------------- */
1413
1414 #if defined(THREADED_RTS)
1415 static bool requestSync (
1416 Capability **pcap, Task *task, PendingSync *new_sync,
1417 SyncType *prev_sync_type)
1418 {
1419 PendingSync *sync;
1420
1421 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1422 (StgWord)NULL,
1423 (StgWord)new_sync);
1424
1425 if (sync != NULL)
1426 {
1427 // sync is valid until we have called yieldCapability().
1428 // After the sync is completed, we cannot read that struct any
1429 // more because it has been freed.
1430 *prev_sync_type = sync->type;
1431 do {
1432 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1433 sync->type);
1434 ASSERT(*pcap);
1435 yieldCapability(pcap,task,true);
1436 sync = pending_sync;
1437 } while (sync != NULL);
1438
1439 // NOTE: task->cap might have changed now
1440 return true;
1441 }
1442 else
1443 {
1444 return false;
1445 }
1446 }
1447 #endif
1448
1449 /* -----------------------------------------------------------------------------
1450 * acquireAllCapabilities()
1451 *
1452 * Grab all the capabilities except the one we already hold. Used
1453 * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1454 * before a fork (SYNC_OTHER).
1455 *
1456 * Only call this after requestSync(), otherwise a deadlock might
1457 * ensue if another thread is trying to synchronise.
1458 * -------------------------------------------------------------------------- */
1459
1460 #if defined(THREADED_RTS)
1461 static void acquireAllCapabilities(Capability *cap, Task *task)
1462 {
1463 Capability *tmpcap;
1464 uint32_t i;
1465
1466 ASSERT(pending_sync != NULL);
1467 for (i=0; i < n_capabilities; i++) {
1468 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1469 i, n_capabilities);
1470 tmpcap = capabilities[i];
1471 if (tmpcap != cap) {
1472 // we better hope this task doesn't get migrated to
1473 // another Capability while we're waiting for this one.
1474 // It won't, because load balancing happens while we have
1475 // all the Capabilities, but even so it's a slightly
1476 // unsavoury invariant.
1477 task->cap = tmpcap;
1478 waitForCapability(&tmpcap, task);
1479 if (tmpcap->no != i) {
1480 barf("acquireAllCapabilities: got the wrong capability");
1481 }
1482 }
1483 }
1484 task->cap = cap;
1485 }
1486 #endif
1487
1488 /* -----------------------------------------------------------------------------
1489 * releaseAllcapabilities()
1490 *
1491 * Assuming this thread holds all the capabilities, release them all except for
1492 * the one passed in as cap.
1493 * -------------------------------------------------------------------------- */
1494
1495 #if defined(THREADED_RTS)
1496 static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task)
1497 {
1498 uint32_t i;
1499
1500 for (i = 0; i < n; i++) {
1501 if (cap->no != i) {
1502 task->cap = capabilities[i];
1503 releaseCapability(capabilities[i]);
1504 }
1505 }
1506 task->cap = cap;
1507 }
1508 #endif
1509
1510 /* -----------------------------------------------------------------------------
1511 * Perform a garbage collection if necessary
1512 * -------------------------------------------------------------------------- */
1513
1514 static void
1515 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1516 bool force_major)
1517 {
1518 Capability *cap = *pcap;
1519 bool heap_census;
1520 uint32_t collect_gen;
1521 bool major_gc;
1522 #if defined(THREADED_RTS)
1523 uint32_t gc_type;
1524 uint32_t i;
1525 uint32_t need_idle;
1526 uint32_t n_gc_threads;
1527 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1528 StgTSO *tso;
1529 bool *idle_cap;
1530 // idle_cap is an array (allocated later) of size n_capabilities, where
1531 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1532 // cycle.
1533 #endif
1534
1535 if (sched_state == SCHED_SHUTTING_DOWN) {
1536 // The final GC has already been done, and the system is
1537 // shutting down. We'll probably deadlock if we try to GC
1538 // now.
1539 return;
1540 }
1541
1542 heap_census = scheduleNeedHeapProfile(true);
1543
1544 // Figure out which generation we are collecting, so that we can
1545 // decide whether this is a parallel GC or not.
1546 collect_gen = calcNeeded(force_major || heap_census, NULL);
1547 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1548
1549 #if defined(THREADED_RTS)
1550 if (sched_state < SCHED_INTERRUPTING
1551 && RtsFlags.ParFlags.parGcEnabled
1552 && collect_gen >= RtsFlags.ParFlags.parGcGen
1553 && ! oldest_gen->mark)
1554 {
1555 gc_type = SYNC_GC_PAR;
1556 } else {
1557 gc_type = SYNC_GC_SEQ;
1558 }
1559
1560 // In order to GC, there must be no threads running Haskell code.
1561 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1562 // capabilities, and release them after the GC has completed. For parallel
1563 // GC, we synchronise all the running threads using requestSync().
1564 //
1565 // Other capabilities are prevented from running yet more Haskell threads if
1566 // pending_sync is set. Tested inside yieldCapability() and
1567 // releaseCapability() in Capability.c
1568
1569 PendingSync sync = {
1570 .type = gc_type,
1571 .idle = NULL,
1572 .task = task
1573 };
1574
1575 {
1576 SyncType prev_sync = 0;
1577 bool was_syncing;
1578 do {
1579 // If -qn is not set and we have more capabilities than cores, set
1580 // the number of GC threads to #cores. We do this here rather than
1581 // in normaliseRtsOpts() because here it will work if the program
1582 // calls setNumCapabilities.
1583 //
1584 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1585 if (n_gc_threads == 0 &&
1586 enabled_capabilities > getNumberOfProcessors()) {
1587 n_gc_threads = getNumberOfProcessors();
1588 }
1589
1590 // This calculation must be inside the loop because
1591 // enabled_capabilities may change if requestSync() below fails and
1592 // we retry.
1593 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1594 if (n_gc_threads >= enabled_capabilities) {
1595 need_idle = 0;
1596 } else {
1597 need_idle = enabled_capabilities - n_gc_threads;
1598 }
1599 } else {
1600 need_idle = 0;
1601 }
1602
1603 // We need an array of size n_capabilities, but since this may
1604 // change each time around the loop we must allocate it afresh.
1605 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1606 sizeof(bool),
1607 "scheduleDoGC");
1608 sync.idle = idle_cap;
1609
1610 // When using +RTS -qn, we need some capabilities to be idle during
1611 // GC. The best bet is to choose some inactive ones, so we look for
1612 // those first:
1613 uint32_t n_idle = need_idle;
1614 for (i=0; i < n_capabilities; i++) {
1615 if (capabilities[i]->disabled) {
1616 idle_cap[i] = true;
1617 } else if (n_idle > 0 &&
1618 capabilities[i]->running_task == NULL) {
1619 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1620 n_idle--;
1621 idle_cap[i] = true;
1622 } else {
1623 idle_cap[i] = false;
1624 }
1625 }
1626 // If we didn't find enough inactive capabilities, just pick some
1627 // more to be idle.
1628 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1629 if (!idle_cap[i] && i != cap->no) {
1630 idle_cap[i] = true;
1631 n_idle--;
1632 }
1633 }
1634 ASSERT(n_idle == 0);
1635
1636 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1637 cap = *pcap;
1638 if (was_syncing) {
1639 stgFree(idle_cap);
1640 }
1641 if (was_syncing &&
1642 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1643 !(sched_state == SCHED_INTERRUPTING && force_major)) {
1644 // someone else had a pending sync request for a GC, so
1645 // let's assume GC has been done and we don't need to GC
1646 // again.
1647 // Exception to this: if SCHED_INTERRUPTING, then we still
1648 // need to do the final GC.
1649 return;
1650 }
1651 if (sched_state == SCHED_SHUTTING_DOWN) {
1652 // The scheduler might now be shutting down. We tested
1653 // this above, but it might have become true since then as
1654 // we yielded the capability in requestSync().
1655 return;
1656 }
1657 } while (was_syncing);
1658 }
1659
1660 stat_startGCSync(gc_threads[cap->no]);
1661
1662 #if defined(DEBUG)
1663 unsigned int old_n_capabilities = n_capabilities;
1664 #endif
1665
1666 interruptAllCapabilities();
1667
1668 // The final shutdown GC is always single-threaded, because it's
1669 // possible that some of the Capabilities have no worker threads.
1670
1671 if (gc_type == SYNC_GC_SEQ) {
1672 traceEventRequestSeqGc(cap);
1673 } else {
1674 traceEventRequestParGc(cap);
1675 }
1676
1677 if (gc_type == SYNC_GC_SEQ) {
1678 // single-threaded GC: grab all the capabilities
1679 acquireAllCapabilities(cap,task);
1680 }
1681 else
1682 {
1683 // If we are load-balancing collections in this
1684 // generation, then we require all GC threads to participate
1685 // in the collection. Otherwise, we only require active
1686 // threads to participate, and we set gc_threads[i]->idle for
1687 // any idle capabilities. The rationale here is that waking
1688 // up an idle Capability takes much longer than just doing any
1689 // GC work on its behalf.
1690
1691 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1692 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1693 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1694 {
1695 for (i=0; i < n_capabilities; i++) {
1696 if (capabilities[i]->disabled) {
1697 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1698 if (idle_cap[i]) {
1699 n_idle_caps++;
1700 }
1701 } else {
1702 if (i != cap->no && idle_cap[i]) {
1703 Capability *tmpcap = capabilities[i];
1704 task->cap = tmpcap;
1705 waitForCapability(&tmpcap, task);
1706 n_idle_caps++;
1707 }
1708 }
1709 }
1710 }
1711 else
1712 {
1713 for (i=0; i < n_capabilities; i++) {
1714 if (capabilities[i]->disabled) {
1715 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1716 if (idle_cap[i]) {
1717 n_idle_caps++;
1718 }
1719 } else if (i != cap->no &&
1720 capabilities[i]->idle >=
1721 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1722 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1723 if (idle_cap[i]) {
1724 n_idle_caps++;
1725 } else {
1726 n_failed_trygrab_idles++;
1727 }
1728 }
1729 }
1730 }
1731 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1732
1733 for (i=0; i < n_capabilities; i++) {
1734 capabilities[i]->idle++;
1735 }
1736
1737 // For all capabilities participating in this GC, wait until
1738 // they have stopped mutating and are standing by for GC.
1739 waitForGcThreads(cap, idle_cap);
1740
1741 #if defined(THREADED_RTS)
1742 // Stable point where we can do a global check on our spark counters
1743 ASSERT(checkSparkCountInvariant());
1744 #endif
1745 }
1746
1747 #endif
1748
1749 IF_DEBUG(scheduler, printAllThreads());
1750
1751 delete_threads_and_gc:
1752 /*
1753 * We now have all the capabilities; if we're in an interrupting
1754 * state, then we should take the opportunity to delete all the
1755 * threads in the system.
1756 * Checking for major_gc ensures that the last GC is major.
1757 */
1758 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1759 deleteAllThreads(cap);
1760 #if defined(THREADED_RTS)
1761 // Discard all the sparks from every Capability. Why?
1762 // They'll probably be GC'd anyway since we've killed all the
1763 // threads. It just avoids the GC having to do any work to
1764 // figure out that any remaining sparks are garbage.
1765 for (i = 0; i < n_capabilities; i++) {
1766 capabilities[i]->spark_stats.gcd +=
1767 sparkPoolSize(capabilities[i]->sparks);
1768 // No race here since all Caps are stopped.
1769 discardSparksCap(capabilities[i]);
1770 }
1771 #endif
1772 sched_state = SCHED_SHUTTING_DOWN;
1773 }
1774
1775 /*
1776 * When there are disabled capabilities, we want to migrate any
1777 * threads away from them. Normally this happens in the
1778 * scheduler's loop, but only for unbound threads - it's really
1779 * hard for a bound thread to migrate itself. So we have another
1780 * go here.
1781 */
1782 #if defined(THREADED_RTS)
1783 for (i = enabled_capabilities; i < n_capabilities; i++) {
1784 Capability *tmp_cap, *dest_cap;
1785 tmp_cap = capabilities[i];
1786 ASSERT(tmp_cap->disabled);
1787 if (i != cap->no) {
1788 dest_cap = capabilities[i % enabled_capabilities];
1789 while (!emptyRunQueue(tmp_cap)) {
1790 tso = popRunQueue(tmp_cap);
1791 migrateThread(tmp_cap, tso, dest_cap);
1792 if (tso->bound) {
1793 traceTaskMigrate(tso->bound->task,
1794 tso->bound->task->cap,
1795 dest_cap);
1796 tso->bound->task->cap = dest_cap;
1797 }
1798 }
1799 }
1800 }
1801 #endif
1802
1803 #if defined(THREADED_RTS)
1804 // reset pending_sync *before* GC, so that when the GC threads
1805 // emerge they don't immediately re-enter the GC.
1806 pending_sync = 0;
1807 GarbageCollect(collect_gen, heap_census, gc_type, cap, idle_cap);
1808 #else
1809 GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
1810 #endif
1811
1812 traceSparkCounters(cap);
1813
1814 switch (recent_activity) {
1815 case ACTIVITY_INACTIVE:
1816 if (force_major) {
1817 // We are doing a GC because the system has been idle for a
1818 // timeslice and we need to check for deadlock. Record the
1819 // fact that we've done a GC and turn off the timer signal;
1820 // it will get re-enabled if we run any threads after the GC.
1821 recent_activity = ACTIVITY_DONE_GC;
1822 #if !defined(PROFILING)
1823 stopTimer();
1824 #endif
1825 break;
1826 }
1827 // fall through...
1828
1829 case ACTIVITY_MAYBE_NO:
1830 // the GC might have taken long enough for the timer to set
1831 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1832 // but we aren't necessarily deadlocked:
1833 recent_activity = ACTIVITY_YES;
1834 break;
1835
1836 case ACTIVITY_DONE_GC:
1837 // If we are actually active, the scheduler will reset the
1838 // recent_activity flag and re-enable the timer.
1839 break;
1840 }
1841
1842 #if defined(THREADED_RTS)
1843 // Stable point where we can do a global check on our spark counters
1844 ASSERT(checkSparkCountInvariant());
1845 #endif
1846
1847 // The heap census itself is done during GarbageCollect().
1848 if (heap_census) {
1849 performHeapProfile = false;
1850 }
1851
1852 #if defined(THREADED_RTS)
1853
1854 // If n_capabilities has changed during GC, we're in trouble.
1855 ASSERT(n_capabilities == old_n_capabilities);
1856
1857 if (gc_type == SYNC_GC_PAR)
1858 {
1859 for (i = 0; i < n_capabilities; i++) {
1860 if (i != cap->no) {
1861 if (idle_cap[i]) {
1862 ASSERT(capabilities[i]->running_task == task);
1863 task->cap = capabilities[i];
1864 releaseCapability(capabilities[i]);
1865 } else {
1866 ASSERT(capabilities[i]->running_task != task);
1867 }
1868 }
1869 }
1870 task->cap = cap;
1871
1872 // releaseGCThreads() happens *after* we have released idle
1873 // capabilities. Otherwise what can happen is one of the released
1874 // threads starts a new GC, and finds that it can't acquire some of
1875 // the disabled capabilities, because the previous GC still holds
1876 // them, so those disabled capabilities will not be idle during the
1877 // next GC round. However, if we release the capabilities first,
1878 // then they will be free (because they're disabled) when the next
1879 // GC cycle happens.
1880 releaseGCThreads(cap, idle_cap);
1881 }
1882 #endif
1883 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1884 // GC set the heap_overflow flag. We should throw an exception if we
1885 // can, or shut down otherwise.
1886
1887 // Get the thread to which Ctrl-C is thrown
1888 StgTSO *main_thread = getTopHandlerThread();
1889 if (main_thread == NULL) {
1890 // GC set the heap_overflow flag, and there is no main thread to
1891 // throw an exception to, so we should proceed with an orderly
1892 // shutdown now. Ultimately we want the main thread to return to
1893 // its caller with HeapExhausted, at which point the caller should
1894 // call hs_exit(). The first step is to delete all the threads.
1895 sched_state = SCHED_INTERRUPTING;
1896 goto delete_threads_and_gc;
1897 }
1898
1899 heap_overflow = false;
1900 const uint64_t allocation_count = getAllocations();
1901 if (RtsFlags.GcFlags.heapLimitGrace <
1902 allocation_count - allocated_bytes_at_heapoverflow ||
1903 allocated_bytes_at_heapoverflow == 0) {
1904 allocated_bytes_at_heapoverflow = allocation_count;
1905 // We used to simply exit, but throwing an exception gives the
1906 // program a chance to clean up. It also lets the exception be
1907 // caught.
1908
1909 // FIXME this is not a good way to tell a program to release
1910 // resources. It is neither reliable (the RTS crashes if it fails
1911 // to allocate memory from the OS) nor very usable (it is always
1912 // thrown to the main thread, which might not be able to do anything
1913 // useful with it). We really should have a more general way to
1914 // release resources in low-memory conditions. Nevertheless, this
1915 // is still a big improvement over just exiting.
1916
1917 // FIXME again: perhaps we should throw a synchronous exception
1918 // instead an asynchronous one, or have a way for the program to
1919 // register a handler to be called when heap overflow happens.
1920 throwToSelf(cap, main_thread, heapOverflow_closure);
1921 }
1922 }
1923 #if defined(SPARKBALANCE)
1924 /* JB
1925 Once we are all together... this would be the place to balance all
1926 spark pools. No concurrent stealing or adding of new sparks can
1927 occur. Should be defined in Sparks.c. */
1928 balanceSparkPoolsCaps(n_capabilities, capabilities);
1929 #endif
1930
1931 #if defined(THREADED_RTS)
1932 stgFree(idle_cap);
1933
1934 if (gc_type == SYNC_GC_SEQ) {
1935 // release our stash of capabilities.
1936 releaseAllCapabilities(n_capabilities, cap, task);
1937 }
1938 #endif
1939
1940 return;
1941 }
1942
1943 /* ---------------------------------------------------------------------------
1944 * Singleton fork(). Do not copy any running threads.
1945 * ------------------------------------------------------------------------- */
1946
1947 pid_t
1948 forkProcess(HsStablePtr *entry
1949 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1950 STG_UNUSED
1951 #endif
1952 )
1953 {
1954 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1955 pid_t pid;
1956 StgTSO* t,*next;
1957 Capability *cap;
1958 uint32_t g;
1959 Task *task = NULL;
1960 uint32_t i;
1961
1962 debugTrace(DEBUG_sched, "forking!");
1963
1964 task = newBoundTask();
1965
1966 cap = NULL;
1967 waitForCapability(&cap, task);
1968
1969 #if defined(THREADED_RTS)
1970 stopAllCapabilities(&cap, task);
1971 #endif
1972
1973 // no funny business: hold locks while we fork, otherwise if some
1974 // other thread is holding a lock when the fork happens, the data
1975 // structure protected by the lock will forever be in an
1976 // inconsistent state in the child. See also #1391.
1977 ACQUIRE_LOCK(&sched_mutex);
1978 ACQUIRE_LOCK(&sm_mutex);
1979 ACQUIRE_LOCK(&stable_mutex);
1980 ACQUIRE_LOCK(&task->lock);
1981
1982 for (i=0; i < n_capabilities; i++) {
1983 ACQUIRE_LOCK(&capabilities[i]->lock);
1984 }
1985
1986 #if defined(THREADED_RTS)
1987 ACQUIRE_LOCK(&all_tasks_mutex);
1988 #endif
1989
1990 stopTimer(); // See #4074
1991
1992 #if defined(TRACING)
1993 flushEventLog(); // so that child won't inherit dirty file buffers
1994 #endif
1995
1996 pid = fork();
1997
1998 if (pid) { // parent
1999
2000 startTimer(); // #4074
2001
2002 RELEASE_LOCK(&sched_mutex);
2003 RELEASE_LOCK(&sm_mutex);
2004 RELEASE_LOCK(&stable_mutex);
2005 RELEASE_LOCK(&task->lock);
2006
2007 for (i=0; i < n_capabilities; i++) {
2008 releaseCapability_(capabilities[i],false);
2009 RELEASE_LOCK(&capabilities[i]->lock);
2010 }
2011
2012 #if defined(THREADED_RTS)
2013 RELEASE_LOCK(&all_tasks_mutex);
2014 #endif
2015
2016 boundTaskExiting(task);
2017
2018 // just return the pid
2019 return pid;
2020
2021 } else { // child
2022
2023 #if defined(THREADED_RTS)
2024 initMutex(&sched_mutex);
2025 initMutex(&sm_mutex);
2026 initMutex(&stable_mutex);
2027 initMutex(&task->lock);
2028
2029 for (i=0; i < n_capabilities; i++) {
2030 initMutex(&capabilities[i]->lock);
2031 }
2032
2033 initMutex(&all_tasks_mutex);
2034 #endif
2035
2036 #if defined(TRACING)
2037 resetTracing();
2038 #endif
2039
2040 // Now, all OS threads except the thread that forked are
2041 // stopped. We need to stop all Haskell threads, including
2042 // those involved in foreign calls. Also we need to delete
2043 // all Tasks, because they correspond to OS threads that are
2044 // now gone.
2045
2046 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2047 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2048 next = t->global_link;
2049 // don't allow threads to catch the ThreadKilled
2050 // exception, but we do want to raiseAsync() because these
2051 // threads may be evaluating thunks that we need later.
2052 deleteThread_(t->cap,t);
2053
2054 // stop the GC from updating the InCall to point to
2055 // the TSO. This is only necessary because the
2056 // OSThread bound to the TSO has been killed, and
2057 // won't get a chance to exit in the usual way (see
2058 // also scheduleHandleThreadFinished).
2059 t->bound = NULL;
2060 }
2061 }
2062
2063 discardTasksExcept(task);
2064
2065 for (i=0; i < n_capabilities; i++) {
2066 cap = capabilities[i];
2067
2068 // Empty the run queue. It seems tempting to let all the
2069 // killed threads stay on the run queue as zombies to be
2070 // cleaned up later, but some of them may correspond to
2071 // bound threads for which the corresponding Task does not
2072 // exist.
2073 truncateRunQueue(cap);
2074 cap->n_run_queue = 0;
2075
2076 // Any suspended C-calling Tasks are no more, their OS threads
2077 // don't exist now:
2078 cap->suspended_ccalls = NULL;
2079 cap->n_suspended_ccalls = 0;
2080
2081 #if defined(THREADED_RTS)
2082 // Wipe our spare workers list, they no longer exist. New
2083 // workers will be created if necessary.
2084 cap->spare_workers = NULL;
2085 cap->n_spare_workers = 0;
2086 cap->returning_tasks_hd = NULL;
2087 cap->returning_tasks_tl = NULL;
2088 cap->n_returning_tasks = 0;
2089 #endif
2090
2091 // Release all caps except 0, we'll use that for starting
2092 // the IO manager and running the client action below.
2093 if (cap->no != 0) {
2094 task->cap = cap;
2095 releaseCapability(cap);
2096 }
2097 }
2098 cap = capabilities[0];
2099 task->cap = cap;
2100
2101 // Empty the threads lists. Otherwise, the garbage
2102 // collector may attempt to resurrect some of these threads.
2103 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2104 generations[g].threads = END_TSO_QUEUE;
2105 }
2106
2107 // On Unix, all timers are reset in the child, so we need to start
2108 // the timer again.
2109 initTimer();
2110 startTimer();
2111
2112 // TODO: need to trace various other things in the child
2113 // like startup event, capabilities, process info etc
2114 traceTaskCreate(task, cap);
2115
2116 #if defined(THREADED_RTS)
2117 ioManagerStartCap(&cap);
2118 #endif
2119
2120 // Install toplevel exception handlers, so interruption
2121 // signal will be sent to the main thread.
2122 // See Trac #12903
2123 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2124 rts_checkSchedStatus("forkProcess",cap);
2125
2126 rts_unlock(cap);
2127 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2128 }
2129 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2130 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2131 #endif
2132 }
2133
2134 /* ---------------------------------------------------------------------------
2135 * Changing the number of Capabilities
2136 *
2137 * Changing the number of Capabilities is very tricky! We can only do
2138 * it with the system fully stopped, so we do a full sync with
2139 * requestSync(SYNC_OTHER) and grab all the capabilities.
2140 *
2141 * Then we resize the appropriate data structures, and update all
2142 * references to the old data structures which have now moved.
2143 * Finally we release the Capabilities we are holding, and start
2144 * worker Tasks on the new Capabilities we created.
2145 *
2146 * ------------------------------------------------------------------------- */
2147
2148 void
2149 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2150 {
2151 #if !defined(THREADED_RTS)
2152 if (new_n_capabilities != 1) {
2153 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2154 }
2155 return;
2156 #elif defined(NOSMP)
2157 if (new_n_capabilities != 1) {
2158 errorBelch("setNumCapabilities: not supported on this platform");
2159 }
2160 return;
2161 #else
2162 Task *task;
2163 Capability *cap;
2164 uint32_t n;
2165 Capability *old_capabilities = NULL;
2166 uint32_t old_n_capabilities = n_capabilities;
2167
2168 if (new_n_capabilities == enabled_capabilities) {
2169 return;
2170 } else if (new_n_capabilities <= 0) {
2171 errorBelch("setNumCapabilities: Capability count must be positive");
2172 return;
2173 }
2174
2175
2176 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2177 enabled_capabilities, new_n_capabilities);
2178
2179 cap = rts_lock();
2180 task = cap->running_task;
2181
2182 stopAllCapabilities(&cap, task);
2183
2184 if (new_n_capabilities < enabled_capabilities)
2185 {
2186 // Reducing the number of capabilities: we do not actually
2187 // remove the extra capabilities, we just mark them as
2188 // "disabled". This has the following effects:
2189 //
2190 // - threads on a disabled capability are migrated away by the
2191 // scheduler loop
2192 //
2193 // - disabled capabilities do not participate in GC
2194 // (see scheduleDoGC())
2195 //
2196 // - No spark threads are created on this capability
2197 // (see scheduleActivateSpark())
2198 //
2199 // - We do not attempt to migrate threads *to* a disabled
2200 // capability (see schedulePushWork()).
2201 //
2202 // but in other respects, a disabled capability remains
2203 // alive. Threads may be woken up on a disabled capability,
2204 // but they will be immediately migrated away.
2205 //
2206 // This approach is much easier than trying to actually remove
2207 // the capability; we don't have to worry about GC data
2208 // structures, the nursery, etc.
2209 //
2210 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2211 capabilities[n]->disabled = true;
2212 traceCapDisable(capabilities[n]);
2213 }
2214 enabled_capabilities = new_n_capabilities;
2215 }
2216 else
2217 {
2218 // Increasing the number of enabled capabilities.
2219 //
2220 // enable any disabled capabilities, up to the required number
2221 for (n = enabled_capabilities;
2222 n < new_n_capabilities && n < n_capabilities; n++) {
2223 capabilities[n]->disabled = false;
2224 traceCapEnable(capabilities[n]);
2225 }
2226 enabled_capabilities = n;
2227
2228 if (new_n_capabilities > n_capabilities) {
2229 #if defined(TRACING)
2230 // Allocate eventlog buffers for the new capabilities. Note this
2231 // must be done before calling moreCapabilities(), because that
2232 // will emit events about creating the new capabilities and adding
2233 // them to existing capsets.
2234 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2235 #endif
2236
2237 // Resize the capabilities array
2238 // NB. after this, capabilities points somewhere new. Any pointers
2239 // of type (Capability *) are now invalid.
2240 moreCapabilities(n_capabilities, new_n_capabilities);
2241
2242 // Resize and update storage manager data structures
2243 storageAddCapabilities(n_capabilities, new_n_capabilities);
2244 }
2245 }
2246
2247 // update n_capabilities before things start running
2248 if (new_n_capabilities > n_capabilities) {
2249 n_capabilities = enabled_capabilities = new_n_capabilities;
2250 }
2251
2252 // We're done: release the original Capabilities
2253 releaseAllCapabilities(old_n_capabilities, cap,task);
2254
2255 // We can't free the old array until now, because we access it
2256 // while updating pointers in updateCapabilityRefs().
2257 if (old_capabilities) {
2258 stgFree(old_capabilities);
2259 }
2260
2261 // Notify IO manager that the number of capabilities has changed.
2262 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2263
2264 rts_unlock(cap);
2265
2266 #endif // THREADED_RTS
2267 }
2268
2269
2270
2271 /* ---------------------------------------------------------------------------
2272 * Delete all the threads in the system
2273 * ------------------------------------------------------------------------- */
2274
2275 static void
2276 deleteAllThreads ( Capability *cap )
2277 {
2278 // NOTE: only safe to call if we own all capabilities.
2279
2280 StgTSO* t, *next;
2281 uint32_t g;
2282
2283 debugTrace(DEBUG_sched,"deleting all threads");
2284 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2285 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2286 next = t->global_link;
2287 deleteThread(cap,t);
2288 }
2289 }
2290
2291 // The run queue now contains a bunch of ThreadKilled threads. We
2292 // must not throw these away: the main thread(s) will be in there
2293 // somewhere, and the main scheduler loop has to deal with it.
2294 // Also, the run queue is the only thing keeping these threads from
2295 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2296
2297 #if !defined(THREADED_RTS)
2298 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2299 ASSERT(sleeping_queue == END_TSO_QUEUE);
2300 #endif
2301 }
2302
2303 /* -----------------------------------------------------------------------------
2304 Managing the suspended_ccalls list.
2305 Locks required: sched_mutex
2306 -------------------------------------------------------------------------- */
2307
2308 STATIC_INLINE void
2309 suspendTask (Capability *cap, Task *task)
2310 {
2311 InCall *incall;
2312
2313 incall = task->incall;
2314 ASSERT(incall->next == NULL && incall->prev == NULL);
2315 incall->next = cap->suspended_ccalls;
2316 incall->prev = NULL;
2317 if (cap->suspended_ccalls) {
2318 cap->suspended_ccalls->prev = incall;
2319 }
2320 cap->suspended_ccalls = incall;
2321 cap->n_suspended_ccalls++;
2322 }
2323
2324 STATIC_INLINE void
2325 recoverSuspendedTask (Capability *cap, Task *task)
2326 {
2327 InCall *incall;
2328
2329 incall = task->incall;
2330 if (incall->prev) {
2331 incall->prev->next = incall->next;
2332 } else {
2333 ASSERT(cap->suspended_ccalls == incall);
2334 cap->suspended_ccalls = incall->next;
2335 }
2336 if (incall->next) {
2337 incall->next->prev = incall->prev;
2338 }
2339 incall->next = incall->prev = NULL;
2340 cap->n_suspended_ccalls--;
2341 }
2342
2343 /* ---------------------------------------------------------------------------
2344 * Suspending & resuming Haskell threads.
2345 *
2346 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2347 * its capability before calling the C function. This allows another
2348 * task to pick up the capability and carry on running Haskell
2349 * threads. It also means that if the C call blocks, it won't lock
2350 * the whole system.
2351 *
2352 * The Haskell thread making the C call is put to sleep for the
2353 * duration of the call, on the suspended_ccalling_threads queue. We
2354 * give out a token to the task, which it can use to resume the thread
2355 * on return from the C function.
2356 *
2357 * If this is an interruptible C call, this means that the FFI call may be
2358 * unceremoniously terminated and should be scheduled on an
2359 * unbound worker thread.
2360 * ------------------------------------------------------------------------- */
2361
2362 void *
2363 suspendThread (StgRegTable *reg, bool interruptible)
2364 {
2365 Capability *cap;
2366 int saved_errno;
2367 StgTSO *tso;
2368 Task *task;
2369 #if defined(mingw32_HOST_OS)
2370 StgWord32 saved_winerror;
2371 #endif
2372
2373 saved_errno = errno;
2374 #if defined(mingw32_HOST_OS)
2375 saved_winerror = GetLastError();
2376 #endif
2377
2378 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2379 */
2380 cap = regTableToCapability(reg);
2381
2382 task = cap->running_task;
2383 tso = cap->r.rCurrentTSO;
2384
2385 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2386
2387 // XXX this might not be necessary --SDM
2388 tso->what_next = ThreadRunGHC;
2389
2390 threadPaused(cap,tso);
2391
2392 if (interruptible) {
2393 tso->why_blocked = BlockedOnCCall_Interruptible;
2394 } else {
2395 tso->why_blocked = BlockedOnCCall;
2396 }
2397
2398 // Hand back capability
2399 task->incall->suspended_tso = tso;
2400 task->incall->suspended_cap = cap;
2401
2402 // Otherwise allocate() will write to invalid memory.
2403 cap->r.rCurrentTSO = NULL;
2404
2405 ACQUIRE_LOCK(&cap->lock);
2406
2407 suspendTask(cap,task);
2408 cap->in_haskell = false;
2409 releaseCapability_(cap,false);
2410
2411 RELEASE_LOCK(&cap->lock);
2412
2413 errno = saved_errno;
2414 #if defined(mingw32_HOST_OS)
2415 SetLastError(saved_winerror);
2416 #endif
2417 return task;
2418 }
2419
2420 StgRegTable *
2421 resumeThread (void *task_)
2422 {
2423 StgTSO *tso;
2424 InCall *incall;
2425 Capability *cap;
2426 Task *task = task_;
2427 int saved_errno;
2428 #if defined(mingw32_HOST_OS)
2429 StgWord32 saved_winerror;
2430 #endif
2431
2432 saved_errno = errno;
2433 #if defined(mingw32_HOST_OS)
2434 saved_winerror = GetLastError();
2435 #endif
2436
2437 incall = task->incall;
2438 cap = incall->suspended_cap;
2439 task->cap = cap;
2440
2441 // Wait for permission to re-enter the RTS with the result.
2442 waitForCapability(&cap,task);
2443 // we might be on a different capability now... but if so, our
2444 // entry on the suspended_ccalls list will also have been
2445 // migrated.
2446
2447 // Remove the thread from the suspended list
2448 recoverSuspendedTask(cap,task);
2449
2450 tso = incall->suspended_tso;
2451 incall->suspended_tso = NULL;
2452 incall->suspended_cap = NULL;
2453 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2454
2455 traceEventRunThread(cap, tso);
2456
2457 /* Reset blocking status */
2458 tso->why_blocked = NotBlocked;
2459
2460 if ((tso->flags & TSO_BLOCKEX) == 0) {
2461 // avoid locking the TSO if we don't have to
2462 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2463 maybePerformBlockedException(cap,tso);
2464 }
2465 }
2466
2467 cap->r.rCurrentTSO = tso;
2468 cap->in_haskell = true;
2469 errno = saved_errno;
2470 #if defined(mingw32_HOST_OS)
2471 SetLastError(saved_winerror);
2472 #endif
2473
2474 /* We might have GC'd, mark the TSO dirty again */
2475 dirty_TSO(cap,tso);
2476 dirty_STACK(cap,tso->stackobj);
2477
2478 IF_DEBUG(sanity, checkTSO(tso));
2479
2480 return &cap->r;
2481 }
2482
2483 /* ---------------------------------------------------------------------------
2484 * scheduleThread()
2485 *
2486 * scheduleThread puts a thread on the end of the runnable queue.
2487 * This will usually be done immediately after a thread is created.
2488 * The caller of scheduleThread must create the thread using e.g.
2489 * createThread and push an appropriate closure
2490 * on this thread's stack before the scheduler is invoked.
2491 * ------------------------------------------------------------------------ */
2492
2493 void
2494 scheduleThread(Capability *cap, StgTSO *tso)
2495 {
2496 // The thread goes at the *end* of the run-queue, to avoid possible
2497 // starvation of any threads already on the queue.
2498 appendToRunQueue(cap,tso);
2499 }
2500
2501 void
2502 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2503 {
2504 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2505 // move this thread from now on.
2506 #if defined(THREADED_RTS)
2507 cpu %= enabled_capabilities;
2508 if (cpu == cap->no) {
2509 appendToRunQueue(cap,tso);
2510 } else {
2511 migrateThread(cap, tso, capabilities[cpu]);
2512 }
2513 #else
2514 appendToRunQueue(cap,tso);
2515 #endif
2516 }
2517
2518 void
2519 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2520 {
2521 Task *task;
2522 DEBUG_ONLY( StgThreadID id );
2523 Capability *cap;
2524
2525 cap = *pcap;
2526
2527 // We already created/initialised the Task
2528 task = cap->running_task;
2529
2530 // This TSO is now a bound thread; make the Task and TSO
2531 // point to each other.
2532 tso->bound = task->incall;
2533 tso->cap = cap;
2534
2535 task->incall->tso = tso;
2536 task->incall->ret = ret;
2537 task->incall->rstat = NoStatus;
2538
2539 appendToRunQueue(cap,tso);
2540
2541 DEBUG_ONLY( id = tso->id );
2542 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2543
2544 cap = schedule(cap,task);
2545
2546 ASSERT(task->incall->rstat != NoStatus);
2547 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2548
2549 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2550 *pcap = cap;
2551 }
2552
2553 /* ----------------------------------------------------------------------------
2554 * Starting Tasks
2555 * ------------------------------------------------------------------------- */
2556
2557 #if defined(THREADED_RTS)
2558 void scheduleWorker (Capability *cap, Task *task)
2559 {
2560 // schedule() runs without a lock.
2561 cap = schedule(cap,task);
2562
2563 // On exit from schedule(), we have a Capability, but possibly not
2564 // the same one we started with.
2565
2566 // During shutdown, the requirement is that after all the
2567 // Capabilities are shut down, all workers that are shutting down
2568 // have finished workerTaskStop(). This is why we hold on to
2569 // cap->lock until we've finished workerTaskStop() below.
2570 //
2571 // There may be workers still involved in foreign calls; those
2572 // will just block in waitForCapability() because the
2573 // Capability has been shut down.
2574 //
2575 ACQUIRE_LOCK(&cap->lock);
2576 releaseCapability_(cap,false);
2577 workerTaskStop(task);
2578 RELEASE_LOCK(&cap->lock);
2579 }
2580 #endif
2581
2582 /* ---------------------------------------------------------------------------
2583 * Start new worker tasks on Capabilities from--to
2584 * -------------------------------------------------------------------------- */
2585
2586 static void
2587 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2588 {
2589 #if defined(THREADED_RTS)
2590 uint32_t i;
2591 Capability *cap;
2592
2593 for (i = from; i < to; i++) {
2594 cap = capabilities[i];
2595 ACQUIRE_LOCK(&cap->lock);
2596 startWorkerTask(cap);
2597 RELEASE_LOCK(&cap->lock);
2598 }
2599 #endif
2600 }
2601
2602 /* ---------------------------------------------------------------------------
2603 * initScheduler()
2604 *
2605 * Initialise the scheduler. This resets all the queues - if the
2606 * queues contained any threads, they'll be garbage collected at the
2607 * next pass.
2608 *
2609 * ------------------------------------------------------------------------ */
2610
2611 void
2612 initScheduler(void)
2613 {
2614 #if !defined(THREADED_RTS)
2615 blocked_queue_hd = END_TSO_QUEUE;
2616 blocked_queue_tl = END_TSO_QUEUE;
2617 sleeping_queue = END_TSO_QUEUE;
2618 #endif
2619
2620 sched_state = SCHED_RUNNING;
2621 recent_activity = ACTIVITY_YES;
2622
2623 #if defined(THREADED_RTS)
2624 /* Initialise the mutex and condition variables used by
2625 * the scheduler. */
2626 initMutex(&sched_mutex);
2627 #endif
2628
2629 ACQUIRE_LOCK(&sched_mutex);
2630
2631 allocated_bytes_at_heapoverflow = 0;
2632
2633 /* A capability holds the state a native thread needs in
2634 * order to execute STG code. At least one capability is
2635 * floating around (only THREADED_RTS builds have more than one).
2636 */
2637 initCapabilities();
2638
2639 initTaskManager();
2640
2641 /*
2642 * Eagerly start one worker to run each Capability, except for
2643 * Capability 0. The idea is that we're probably going to start a
2644 * bound thread on Capability 0 pretty soon, so we don't want a
2645 * worker task hogging it.
2646 */
2647 startWorkerTasks(1, n_capabilities);
2648
2649 RELEASE_LOCK(&sched_mutex);
2650
2651 }
2652
2653 void
2654 exitScheduler (bool wait_foreign USED_IF_THREADS)
2655 /* see Capability.c, shutdownCapability() */
2656 {
2657 Task *task = NULL;
2658
2659 task = newBoundTask();
2660
2661 // If we haven't killed all the threads yet, do it now.
2662 if (sched_state < SCHED_SHUTTING_DOWN) {
2663 sched_state = SCHED_INTERRUPTING;
2664 Capability *cap = task->cap;
2665 waitForCapability(&cap,task);
2666 scheduleDoGC(&cap,task,true);
2667 ASSERT(task->incall->tso == NULL);
2668 releaseCapability(cap);
2669 }
2670 sched_state = SCHED_SHUTTING_DOWN;
2671
2672 shutdownCapabilities(task, wait_foreign);
2673
2674 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2675 // n_failed_trygrab_idles, n_idle_caps);
2676
2677 boundTaskExiting(task);
2678 }
2679
2680 void
2681 freeScheduler( void )
2682 {
2683 uint32_t still_running;
2684
2685 ACQUIRE_LOCK(&sched_mutex);
2686 still_running = freeTaskManager();
2687 // We can only free the Capabilities if there are no Tasks still
2688 // running. We might have a Task about to return from a foreign
2689 // call into waitForCapability(), for example (actually,
2690 // this should be the *only* thing that a still-running Task can
2691 // do at this point, and it will block waiting for the
2692 // Capability).
2693 if (still_running == 0) {
2694 freeCapabilities();
2695 }
2696 RELEASE_LOCK(&sched_mutex);
2697 #if defined(THREADED_RTS)
2698 closeMutex(&sched_mutex);
2699 #endif
2700 }
2701
2702 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2703 void *user USED_IF_NOT_THREADS)
2704 {
2705 #if !defined(THREADED_RTS)
2706 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2707 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2708 evac(user, (StgClosure **)(void *)&sleeping_queue);
2709 #endif
2710 }
2711
2712 /* -----------------------------------------------------------------------------
2713 performGC
2714
2715 This is the interface to the garbage collector from Haskell land.
2716 We provide this so that external C code can allocate and garbage
2717 collect when called from Haskell via _ccall_GC.
2718 -------------------------------------------------------------------------- */
2719
2720 static void
2721 performGC_(bool force_major)
2722 {
2723 Task *task;
2724 Capability *cap = NULL;
2725
2726 // We must grab a new Task here, because the existing Task may be
2727 // associated with a particular Capability, and chained onto the
2728 // suspended_ccalls queue.
2729 task = newBoundTask();
2730
2731 // TODO: do we need to traceTask*() here?
2732
2733 waitForCapability(&cap,task);
2734 scheduleDoGC(&cap,task,force_major);
2735 releaseCapability(cap);
2736 boundTaskExiting(task);
2737 }
2738
2739 void
2740 performGC(void)
2741 {
2742 performGC_(false);
2743 }
2744
2745 void
2746 performMajorGC(void)
2747 {
2748 performGC_(true);
2749 }
2750
2751 /* ---------------------------------------------------------------------------
2752 Interrupt execution.
2753 Might be called inside a signal handler so it mustn't do anything fancy.
2754 ------------------------------------------------------------------------ */
2755
2756 void
2757 interruptStgRts(void)
2758 {
2759 sched_state = SCHED_INTERRUPTING;
2760 interruptAllCapabilities();
2761 #if defined(THREADED_RTS)
2762 wakeUpRts();
2763 #endif
2764 }
2765
2766 /* -----------------------------------------------------------------------------
2767 Wake up the RTS
2768
2769 This function causes at least one OS thread to wake up and run the
2770 scheduler loop. It is invoked when the RTS might be deadlocked, or
2771 an external event has arrived that may need servicing (eg. a
2772 keyboard interrupt).
2773
2774 In the single-threaded RTS we don't do anything here; we only have
2775 one thread anyway, and the event that caused us to want to wake up
2776 will have interrupted any blocking system call in progress anyway.
2777 -------------------------------------------------------------------------- */
2778
2779 #if defined(THREADED_RTS)
2780 void wakeUpRts(void)
2781 {
2782 // This forces the IO Manager thread to wakeup, which will
2783 // in turn ensure that some OS thread wakes up and runs the
2784 // scheduler loop, which will cause a GC and deadlock check.
2785 ioManagerWakeup();
2786 }
2787 #endif
2788
2789 /* -----------------------------------------------------------------------------
2790 Deleting threads
2791
2792 This is used for interruption (^C) and forking, and corresponds to
2793 raising an exception but without letting the thread catch the
2794 exception.
2795 -------------------------------------------------------------------------- */
2796
2797 static void
2798 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2799 {
2800 // NOTE: must only be called on a TSO that we have exclusive
2801 // access to, because we will call throwToSingleThreaded() below.
2802 // The TSO must be on the run queue of the Capability we own, or
2803 // we must own all Capabilities.
2804
2805 if (tso->why_blocked != BlockedOnCCall &&
2806 tso->why_blocked != BlockedOnCCall_Interruptible) {
2807 throwToSingleThreaded(tso->cap,tso,NULL);
2808 }
2809 }
2810
2811 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2812 static void
2813 deleteThread_(Capability *cap, StgTSO *tso)
2814 { // for forkProcess only:
2815 // like deleteThread(), but we delete threads in foreign calls, too.
2816
2817 if (tso->why_blocked == BlockedOnCCall ||
2818 tso->why_blocked == BlockedOnCCall_Interruptible) {
2819 tso->what_next = ThreadKilled;
2820 appendToRunQueue(tso->cap, tso);
2821 } else {
2822 deleteThread(cap,tso);
2823 }
2824 }
2825 #endif
2826
2827 /* -----------------------------------------------------------------------------
2828 raiseExceptionHelper
2829
2830 This function is called by the raise# primitive, just so that we can
2831 move some of the tricky bits of raising an exception from C-- into
2832 C. Who knows, it might be a useful re-useable thing here too.
2833 -------------------------------------------------------------------------- */
2834
2835 StgWord
2836 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2837 {
2838 Capability *cap = regTableToCapability(reg);
2839 StgThunk *raise_closure = NULL;
2840 StgPtr p, next;
2841 const StgRetInfoTable *info;
2842 //
2843 // This closure represents the expression 'raise# E' where E
2844 // is the exception raise. It is used to overwrite all the
2845 // thunks which are currently under evaluation.
2846 //
2847
2848 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2849 // LDV profiling: stg_raise_info has THUNK as its closure
2850 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2851 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2852 // 1 does not cause any problem unless profiling is performed.
2853 // However, when LDV profiling goes on, we need to linearly scan
2854 // small object pool, where raise_closure is stored, so we should
2855 // use MIN_UPD_SIZE.
2856 //
2857 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2858 // sizeofW(StgClosure)+1);
2859 //
2860
2861 //
2862 // Walk up the stack, looking for the catch frame. On the way,
2863 // we update any closures pointed to from update frames with the
2864 // raise closure that we just built.
2865 //
2866 p = tso->stackobj->sp;
2867 while(1) {
2868 info = get_ret_itbl((StgClosure *)p);
2869 next = p + stack_frame_sizeW((StgClosure *)p);
2870 switch (info->i.type) {
2871
2872 case UPDATE_FRAME:
2873 // Only create raise_closure if we need to.
2874 if (raise_closure == NULL) {
2875 raise_closure =
2876 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2877 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2878 raise_closure->payload[0] = exception;
2879 }
2880 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2881 (StgClosure *)raise_closure);
2882 p = next;
2883 continue;
2884
2885 case ATOMICALLY_FRAME:
2886 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2887 tso->stackobj->sp = p;
2888 return ATOMICALLY_FRAME;
2889
2890 case CATCH_FRAME:
2891 tso->stackobj->sp = p;
2892 return CATCH_FRAME;
2893
2894 case CATCH_STM_FRAME:
2895 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2896 tso->stackobj->sp = p;
2897 return CATCH_STM_FRAME;
2898
2899 case UNDERFLOW_FRAME:
2900 tso->stackobj->sp = p;
2901 threadStackUnderflow(cap,tso);
2902 p = tso->stackobj->sp;
2903 continue;
2904
2905 case STOP_FRAME:
2906 tso->stackobj->sp = p;
2907 return STOP_FRAME;
2908
2909 case CATCH_RETRY_FRAME: {
2910 StgTRecHeader *trec = tso -> trec;
2911 StgTRecHeader *outer = trec -> enclosing_trec;
2912 debugTrace(DEBUG_stm,
2913 "found CATCH_RETRY_FRAME at %p during raise", p);
2914 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2915 stmAbortTransaction(cap, trec);
2916 stmFreeAbortedTRec(cap, trec);
2917 tso -> trec = outer;
2918 p = next;
2919 continue;
2920 }
2921
2922 default:
2923 p = next;
2924 continue;
2925 }
2926 }
2927 }
2928
2929
2930 /* -----------------------------------------------------------------------------
2931 findRetryFrameHelper
2932
2933 This function is called by the retry# primitive. It traverses the stack
2934 leaving tso->sp referring to the frame which should handle the retry.
2935
2936 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2937 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2938
2939 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2940 create) because retries are not considered to be exceptions, despite the
2941 similar implementation.
2942
2943 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2944 not be created within memory transactions.
2945 -------------------------------------------------------------------------- */
2946
2947 StgWord
2948 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2949 {
2950 const StgRetInfoTable *info;
2951 StgPtr p, next;
2952
2953 p = tso->stackobj->sp;
2954 while (1) {
2955 info = get_ret_itbl((const StgClosure *)p);
2956 next = p + stack_frame_sizeW((StgClosure *)p);
2957 switch (info->i.type) {
2958
2959 case ATOMICALLY_FRAME:
2960 debugTrace(DEBUG_stm,
2961 "found ATOMICALLY_FRAME at %p during retry", p);
2962 tso->stackobj->sp = p;
2963 return ATOMICALLY_FRAME;
2964
2965 case CATCH_RETRY_FRAME:
2966 debugTrace(DEBUG_stm,
2967 "found CATCH_RETRY_FRAME at %p during retry", p);
2968 tso->stackobj->sp = p;
2969 return CATCH_RETRY_FRAME;
2970
2971 case CATCH_STM_FRAME: {
2972 StgTRecHeader *trec = tso -> trec;
2973 StgTRecHeader *outer = trec -> enclosing_trec;
2974 debugTrace(DEBUG_stm,
2975 "found CATCH_STM_FRAME at %p during retry", p);
2976 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2977 stmAbortTransaction(cap, trec);
2978 stmFreeAbortedTRec(cap, trec);
2979 tso -> trec = outer;
2980 p = next;
2981 continue;
2982 }
2983
2984 case UNDERFLOW_FRAME:
2985 tso->stackobj->sp = p;
2986 threadStackUnderflow(cap,tso);
2987 p = tso->stackobj->sp;
2988 continue;
2989
2990 default:
2991 ASSERT(info->i.type != CATCH_FRAME);
2992 ASSERT(info->i.type != STOP_FRAME);
2993 p = next;
2994 continue;
2995 }
2996 }
2997 }
2998
2999 /* -----------------------------------------------------------------------------
3000 resurrectThreads is called after garbage collection on the list of
3001 threads found to be garbage. Each of these threads will be woken
3002 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3003 on an MVar, or NonTermination if the thread was blocked on a Black
3004 Hole.
3005
3006 Locks: assumes we hold *all* the capabilities.
3007 -------------------------------------------------------------------------- */
3008
3009 void
3010 resurrectThreads (StgTSO *threads)
3011 {
3012 StgTSO *tso, *next;
3013 Capability *cap;
3014 generation *gen;
3015
3016 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3017 next = tso->global_link;
3018
3019 gen = Bdescr((P_)tso)->gen;
3020 tso->global_link = gen->threads;
3021 gen->threads = tso;
3022
3023 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3024
3025 // Wake up the thread on the Capability it was last on
3026 cap = tso->cap;
3027
3028 switch (tso->why_blocked) {
3029 case BlockedOnMVar:
3030 case BlockedOnMVarRead:
3031 /* Called by GC - sched_mutex lock is currently held. */
3032 throwToSingleThreaded(cap, tso,
3033 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3034 break;
3035 case BlockedOnBlackHole:
3036 throwToSingleThreaded(cap, tso,
3037 (StgClosure *)nonTermination_closure);
3038 break;
3039 case BlockedOnSTM:
3040 throwToSingleThreaded(cap, tso,
3041 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3042 break;
3043 case NotBlocked:
3044 /* This might happen if the thread was blocked on a black hole
3045 * belonging to a thread that we've just woken up (raiseAsync
3046 * can wake up threads, remember...).
3047 */
3048 continue;
3049 case BlockedOnMsgThrowTo:
3050 // This can happen if the target is masking, blocks on a
3051 // black hole, and then is found to be unreachable. In
3052 // this case, we want to let the target wake up and carry
3053 // on, and do nothing to this thread.
3054 continue;
3055 default:
3056 barf("resurrectThreads: thread blocked in a strange way: %d",
3057 tso->why_blocked);
3058 }
3059 }
3060 }