f376286a80e48f5622e263c25bf8f2733b8754e1
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36 #include "EventLog.h"
37
38 /* PARALLEL_HASKELL includes go here */
39
40 #include "Sparks.h"
41 #include "Capability.h"
42 #include "Task.h"
43 #include "AwaitEvent.h"
44 #if defined(mingw32_HOST_OS)
45 #include "win32/IOManager.h"
46 #endif
47 #include "Trace.h"
48 #include "RaiseAsync.h"
49 #include "Threads.h"
50 #include "ThrIOManager.h"
51
52 #ifdef HAVE_SYS_TYPES_H
53 #include <sys/types.h>
54 #endif
55 #ifdef HAVE_UNISTD_H
56 #include <unistd.h>
57 #endif
58
59 #include <string.h>
60 #include <stdlib.h>
61 #include <stdarg.h>
62
63 #ifdef HAVE_ERRNO_H
64 #include <errno.h>
65 #endif
66
67 // Turn off inlining when debugging - it obfuscates things
68 #ifdef DEBUG
69 # undef STATIC_INLINE
70 # define STATIC_INLINE static
71 #endif
72
73 /* -----------------------------------------------------------------------------
74 * Global variables
75 * -------------------------------------------------------------------------- */
76
77 #if !defined(THREADED_RTS)
78 // Blocked/sleeping thrads
79 StgTSO *blocked_queue_hd = NULL;
80 StgTSO *blocked_queue_tl = NULL;
81 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
82 #endif
83
84 /* Threads blocked on blackholes.
85 * LOCK: sched_mutex+capability, or all capabilities
86 */
87 StgTSO *blackhole_queue = NULL;
88
89 /* The blackhole_queue should be checked for threads to wake up. See
90 * Schedule.h for more thorough comment.
91 * LOCK: none (doesn't matter if we miss an update)
92 */
93 rtsBool blackholes_need_checking = rtsFalse;
94
95 /* Set to true when the latest garbage collection failed to reclaim
96 * enough space, and the runtime should proceed to shut itself down in
97 * an orderly fashion (emitting profiling info etc.)
98 */
99 rtsBool heap_overflow = rtsFalse;
100
101 /* flag that tracks whether we have done any execution in this time slice.
102 * LOCK: currently none, perhaps we should lock (but needs to be
103 * updated in the fast path of the scheduler).
104 *
105 * NB. must be StgWord, we do xchg() on it.
106 */
107 volatile StgWord recent_activity = ACTIVITY_YES;
108
109 /* if this flag is set as well, give up execution
110 * LOCK: none (changes monotonically)
111 */
112 volatile StgWord sched_state = SCHED_RUNNING;
113
114 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
115 * exists - earlier gccs apparently didn't.
116 * -= chak
117 */
118 StgTSO dummy_tso;
119
120 /*
121 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
122 * in an MT setting, needed to signal that a worker thread shouldn't hang around
123 * in the scheduler when it is out of work.
124 */
125 rtsBool shutting_down_scheduler = rtsFalse;
126
127 /*
128 * This mutex protects most of the global scheduler data in
129 * the THREADED_RTS runtime.
130 */
131 #if defined(THREADED_RTS)
132 Mutex sched_mutex;
133 #endif
134
135 #if !defined(mingw32_HOST_OS)
136 #define FORKPROCESS_PRIMOP_SUPPORTED
137 #endif
138
139 /* -----------------------------------------------------------------------------
140 * static function prototypes
141 * -------------------------------------------------------------------------- */
142
143 static Capability *schedule (Capability *initialCapability, Task *task);
144
145 //
146 // These function all encapsulate parts of the scheduler loop, and are
147 // abstracted only to make the structure and control flow of the
148 // scheduler clearer.
149 //
150 static void schedulePreLoop (void);
151 static void scheduleFindWork (Capability *cap);
152 #if defined(THREADED_RTS)
153 static void scheduleYield (Capability **pcap, Task *task);
154 #endif
155 static void scheduleStartSignalHandlers (Capability *cap);
156 static void scheduleCheckBlockedThreads (Capability *cap);
157 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
158 static void scheduleCheckBlackHoles (Capability *cap);
159 static void scheduleDetectDeadlock (Capability *cap, Task *task);
160 static void schedulePushWork(Capability *cap, Task *task);
161 #if defined(PARALLEL_HASKELL)
162 static rtsBool scheduleGetRemoteWork(Capability *cap);
163 static void scheduleSendPendingMessages(void);
164 #endif
165 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
166 static void scheduleActivateSpark(Capability *cap);
167 #endif
168 static void schedulePostRunThread(Capability *cap, StgTSO *t);
169 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
170 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
171 StgTSO *t);
172 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
173 nat prev_what_next );
174 static void scheduleHandleThreadBlocked( StgTSO *t );
175 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
176 StgTSO *t );
177 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
178 static Capability *scheduleDoGC(Capability *cap, Task *task,
179 rtsBool force_major);
180
181 static rtsBool checkBlackHoles(Capability *cap);
182
183 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
184 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
185
186 static void deleteThread (Capability *cap, StgTSO *tso);
187 static void deleteAllThreads (Capability *cap);
188
189 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
190 static void deleteThread_(Capability *cap, StgTSO *tso);
191 #endif
192
193 #ifdef DEBUG
194 static char *whatNext_strs[] = {
195 "(unknown)",
196 "ThreadRunGHC",
197 "ThreadInterpret",
198 "ThreadKilled",
199 "ThreadRelocated",
200 "ThreadComplete"
201 };
202 #endif
203
204 /* -----------------------------------------------------------------------------
205 * Putting a thread on the run queue: different scheduling policies
206 * -------------------------------------------------------------------------- */
207
208 STATIC_INLINE void
209 addToRunQueue( Capability *cap, StgTSO *t )
210 {
211 #if defined(PARALLEL_HASKELL)
212 if (RtsFlags.ParFlags.doFairScheduling) {
213 // this does round-robin scheduling; good for concurrency
214 appendToRunQueue(cap,t);
215 } else {
216 // this does unfair scheduling; good for parallelism
217 pushOnRunQueue(cap,t);
218 }
219 #else
220 // this does round-robin scheduling; good for concurrency
221 appendToRunQueue(cap,t);
222 #endif
223 }
224
225 /* ---------------------------------------------------------------------------
226 Main scheduling loop.
227
228 We use round-robin scheduling, each thread returning to the
229 scheduler loop when one of these conditions is detected:
230
231 * out of heap space
232 * timer expires (thread yields)
233 * thread blocks
234 * thread ends
235 * stack overflow
236
237 GRAN version:
238 In a GranSim setup this loop iterates over the global event queue.
239 This revolves around the global event queue, which determines what
240 to do next. Therefore, it's more complicated than either the
241 concurrent or the parallel (GUM) setup.
242 This version has been entirely removed (JB 2008/08).
243
244 GUM version:
245 GUM iterates over incoming messages.
246 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
247 and sends out a fish whenever it has nothing to do; in-between
248 doing the actual reductions (shared code below) it processes the
249 incoming messages and deals with delayed operations
250 (see PendingFetches).
251 This is not the ugliest code you could imagine, but it's bloody close.
252
253 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
254 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
255 as well as future GUM versions. This file has been refurbished to
256 only contain valid code, which is however incomplete, refers to
257 invalid includes etc.
258
259 ------------------------------------------------------------------------ */
260
261 static Capability *
262 schedule (Capability *initialCapability, Task *task)
263 {
264 StgTSO *t;
265 Capability *cap;
266 StgThreadReturnCode ret;
267 #if defined(PARALLEL_HASKELL)
268 rtsBool receivedFinish = rtsFalse;
269 #endif
270 nat prev_what_next;
271 rtsBool ready_to_gc;
272 #if defined(THREADED_RTS)
273 rtsBool first = rtsTrue;
274 #endif
275
276 cap = initialCapability;
277
278 // Pre-condition: this task owns initialCapability.
279 // The sched_mutex is *NOT* held
280 // NB. on return, we still hold a capability.
281
282 debugTrace (DEBUG_sched,
283 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
284 task, initialCapability);
285
286 if (running_finalizers) {
287 errorBelch("error: a C finalizer called back into Haskell.\n"
288 " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
289 " To create finalizers that may call back into Haskll, use\n"
290 " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
291 stg_exit(EXIT_FAILURE);
292 }
293
294 schedulePreLoop();
295
296 // -----------------------------------------------------------
297 // Scheduler loop starts here:
298
299 #if defined(PARALLEL_HASKELL)
300 #define TERMINATION_CONDITION (!receivedFinish)
301 #else
302 #define TERMINATION_CONDITION rtsTrue
303 #endif
304
305 while (TERMINATION_CONDITION) {
306
307 // Check whether we have re-entered the RTS from Haskell without
308 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
309 // call).
310 if (cap->in_haskell) {
311 errorBelch("schedule: re-entered unsafely.\n"
312 " Perhaps a 'foreign import unsafe' should be 'safe'?");
313 stg_exit(EXIT_FAILURE);
314 }
315
316 // The interruption / shutdown sequence.
317 //
318 // In order to cleanly shut down the runtime, we want to:
319 // * make sure that all main threads return to their callers
320 // with the state 'Interrupted'.
321 // * clean up all OS threads assocated with the runtime
322 // * free all memory etc.
323 //
324 // So the sequence for ^C goes like this:
325 //
326 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
327 // arranges for some Capability to wake up
328 //
329 // * all threads in the system are halted, and the zombies are
330 // placed on the run queue for cleaning up. We acquire all
331 // the capabilities in order to delete the threads, this is
332 // done by scheduleDoGC() for convenience (because GC already
333 // needs to acquire all the capabilities). We can't kill
334 // threads involved in foreign calls.
335 //
336 // * somebody calls shutdownHaskell(), which calls exitScheduler()
337 //
338 // * sched_state := SCHED_SHUTTING_DOWN
339 //
340 // * all workers exit when the run queue on their capability
341 // drains. All main threads will also exit when their TSO
342 // reaches the head of the run queue and they can return.
343 //
344 // * eventually all Capabilities will shut down, and the RTS can
345 // exit.
346 //
347 // * We might be left with threads blocked in foreign calls,
348 // we should really attempt to kill these somehow (TODO);
349
350 switch (sched_state) {
351 case SCHED_RUNNING:
352 break;
353 case SCHED_INTERRUPTING:
354 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
355 #if defined(THREADED_RTS)
356 discardSparksCap(cap);
357 #endif
358 /* scheduleDoGC() deletes all the threads */
359 cap = scheduleDoGC(cap,task,rtsFalse);
360
361 // after scheduleDoGC(), we must be shutting down. Either some
362 // other Capability did the final GC, or we did it above,
363 // either way we can fall through to the SCHED_SHUTTING_DOWN
364 // case now.
365 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
366 // fall through
367
368 case SCHED_SHUTTING_DOWN:
369 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
370 // If we are a worker, just exit. If we're a bound thread
371 // then we will exit below when we've removed our TSO from
372 // the run queue.
373 if (task->tso == NULL && emptyRunQueue(cap)) {
374 return cap;
375 }
376 break;
377 default:
378 barf("sched_state: %d", sched_state);
379 }
380
381 scheduleFindWork(cap);
382
383 /* work pushing, currently relevant only for THREADED_RTS:
384 (pushes threads, wakes up idle capabilities for stealing) */
385 schedulePushWork(cap,task);
386
387 #if defined(PARALLEL_HASKELL)
388 /* since we perform a blocking receive and continue otherwise,
389 either we never reach here or we definitely have work! */
390 // from here: non-empty run queue
391 ASSERT(!emptyRunQueue(cap));
392
393 if (PacketsWaiting()) { /* now process incoming messages, if any
394 pending...
395
396 CAUTION: scheduleGetRemoteWork called
397 above, waits for messages as well! */
398 processMessages(cap, &receivedFinish);
399 }
400 #endif // PARALLEL_HASKELL: non-empty run queue!
401
402 scheduleDetectDeadlock(cap,task);
403
404 #if defined(THREADED_RTS)
405 cap = task->cap; // reload cap, it might have changed
406 #endif
407
408 // Normally, the only way we can get here with no threads to
409 // run is if a keyboard interrupt received during
410 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
411 // Additionally, it is not fatal for the
412 // threaded RTS to reach here with no threads to run.
413 //
414 // win32: might be here due to awaitEvent() being abandoned
415 // as a result of a console event having been delivered.
416
417 #if defined(THREADED_RTS)
418 if (first)
419 {
420 // XXX: ToDo
421 // // don't yield the first time, we want a chance to run this
422 // // thread for a bit, even if there are others banging at the
423 // // door.
424 // first = rtsFalse;
425 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
426 }
427
428 yield:
429 scheduleYield(&cap,task);
430 if (emptyRunQueue(cap)) continue; // look for work again
431 #endif
432
433 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
434 if ( emptyRunQueue(cap) ) {
435 ASSERT(sched_state >= SCHED_INTERRUPTING);
436 }
437 #endif
438
439 //
440 // Get a thread to run
441 //
442 t = popRunQueue(cap);
443
444 // Sanity check the thread we're about to run. This can be
445 // expensive if there is lots of thread switching going on...
446 IF_DEBUG(sanity,checkTSO(t));
447
448 #if defined(THREADED_RTS)
449 // Check whether we can run this thread in the current task.
450 // If not, we have to pass our capability to the right task.
451 {
452 Task *bound = t->bound;
453
454 if (bound) {
455 if (bound == task) {
456 debugTrace(DEBUG_sched,
457 "### Running thread %lu in bound thread", (unsigned long)t->id);
458 // yes, the Haskell thread is bound to the current native thread
459 } else {
460 debugTrace(DEBUG_sched,
461 "### thread %lu bound to another OS thread", (unsigned long)t->id);
462 // no, bound to a different Haskell thread: pass to that thread
463 pushOnRunQueue(cap,t);
464 continue;
465 }
466 } else {
467 // The thread we want to run is unbound.
468 if (task->tso) {
469 debugTrace(DEBUG_sched,
470 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
471 // no, the current native thread is bound to a different
472 // Haskell thread, so pass it to any worker thread
473 pushOnRunQueue(cap,t);
474 continue;
475 }
476 }
477 }
478 #endif
479
480 // If we're shutting down, and this thread has not yet been
481 // killed, kill it now. This sometimes happens when a finalizer
482 // thread is created by the final GC, or a thread previously
483 // in a foreign call returns.
484 if (sched_state >= SCHED_INTERRUPTING &&
485 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
486 deleteThread(cap,t);
487 }
488
489 /* context switches are initiated by the timer signal, unless
490 * the user specified "context switch as often as possible", with
491 * +RTS -C0
492 */
493 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
494 && !emptyThreadQueues(cap)) {
495 cap->context_switch = 1;
496 }
497
498 run_thread:
499
500 // CurrentTSO is the thread to run. t might be different if we
501 // loop back to run_thread, so make sure to set CurrentTSO after
502 // that.
503 cap->r.rCurrentTSO = t;
504
505 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
506 (long)t->id, whatNext_strs[t->what_next]);
507
508 startHeapProfTimer();
509
510 // Check for exceptions blocked on this thread
511 maybePerformBlockedException (cap, t);
512
513 // ----------------------------------------------------------------------
514 // Run the current thread
515
516 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517 ASSERT(t->cap == cap);
518 ASSERT(t->bound ? t->bound->cap == cap : 1);
519
520 prev_what_next = t->what_next;
521
522 errno = t->saved_errno;
523 #if mingw32_HOST_OS
524 SetLastError(t->saved_winerror);
525 #endif
526
527 cap->in_haskell = rtsTrue;
528
529 dirty_TSO(cap,t);
530
531 #if defined(THREADED_RTS)
532 if (recent_activity == ACTIVITY_DONE_GC) {
533 // ACTIVITY_DONE_GC means we turned off the timer signal to
534 // conserve power (see #1623). Re-enable it here.
535 nat prev;
536 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
537 if (prev == ACTIVITY_DONE_GC) {
538 startTimer();
539 }
540 } else {
541 recent_activity = ACTIVITY_YES;
542 }
543 #endif
544
545 postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
546
547 switch (prev_what_next) {
548
549 case ThreadKilled:
550 case ThreadComplete:
551 /* Thread already finished, return to scheduler. */
552 ret = ThreadFinished;
553 break;
554
555 case ThreadRunGHC:
556 {
557 StgRegTable *r;
558 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
559 cap = regTableToCapability(r);
560 ret = r->rRet;
561 break;
562 }
563
564 case ThreadInterpret:
565 cap = interpretBCO(cap);
566 ret = cap->r.rRet;
567 break;
568
569 default:
570 barf("schedule: invalid what_next field");
571 }
572
573 cap->in_haskell = rtsFalse;
574
575 // The TSO might have moved, eg. if it re-entered the RTS and a GC
576 // happened. So find the new location:
577 t = cap->r.rCurrentTSO;
578
579 // We have run some Haskell code: there might be blackhole-blocked
580 // threads to wake up now.
581 // Lock-free test here should be ok, we're just setting a flag.
582 if ( blackhole_queue != END_TSO_QUEUE ) {
583 blackholes_need_checking = rtsTrue;
584 }
585
586 // And save the current errno in this thread.
587 // XXX: possibly bogus for SMP because this thread might already
588 // be running again, see code below.
589 t->saved_errno = errno;
590 #if mingw32_HOST_OS
591 // Similarly for Windows error code
592 t->saved_winerror = GetLastError();
593 #endif
594
595 postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
596
597 #if defined(THREADED_RTS)
598 // If ret is ThreadBlocked, and this Task is bound to the TSO that
599 // blocked, we are in limbo - the TSO is now owned by whatever it
600 // is blocked on, and may in fact already have been woken up,
601 // perhaps even on a different Capability. It may be the case
602 // that task->cap != cap. We better yield this Capability
603 // immediately and return to normaility.
604 if (ret == ThreadBlocked) {
605 debugTrace(DEBUG_sched,
606 "--<< thread %lu (%s) stopped: blocked",
607 (unsigned long)t->id, whatNext_strs[t->what_next]);
608 goto yield;
609 }
610 #endif
611
612 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613 ASSERT(t->cap == cap);
614
615 // ----------------------------------------------------------------------
616
617 // Costs for the scheduler are assigned to CCS_SYSTEM
618 stopHeapProfTimer();
619 #if defined(PROFILING)
620 CCCS = CCS_SYSTEM;
621 #endif
622
623 schedulePostRunThread(cap,t);
624
625 t = threadStackUnderflow(task,t);
626
627 ready_to_gc = rtsFalse;
628
629 switch (ret) {
630 case HeapOverflow:
631 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
632 break;
633
634 case StackOverflow:
635 scheduleHandleStackOverflow(cap,task,t);
636 break;
637
638 case ThreadYielding:
639 if (scheduleHandleYield(cap, t, prev_what_next)) {
640 // shortcut for switching between compiler/interpreter:
641 goto run_thread;
642 }
643 break;
644
645 case ThreadBlocked:
646 scheduleHandleThreadBlocked(t);
647 break;
648
649 case ThreadFinished:
650 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
651 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
652 break;
653
654 default:
655 barf("schedule: invalid thread return code %d", (int)ret);
656 }
657
658 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
659 cap = scheduleDoGC(cap,task,rtsFalse);
660 }
661 } /* end of while() */
662 }
663
664 /* ----------------------------------------------------------------------------
665 * Setting up the scheduler loop
666 * ------------------------------------------------------------------------- */
667
668 static void
669 schedulePreLoop(void)
670 {
671 // initialisation for scheduler - what cannot go into initScheduler()
672 }
673
674 /* -----------------------------------------------------------------------------
675 * scheduleFindWork()
676 *
677 * Search for work to do, and handle messages from elsewhere.
678 * -------------------------------------------------------------------------- */
679
680 static void
681 scheduleFindWork (Capability *cap)
682 {
683 scheduleStartSignalHandlers(cap);
684
685 // Only check the black holes here if we've nothing else to do.
686 // During normal execution, the black hole list only gets checked
687 // at GC time, to avoid repeatedly traversing this possibly long
688 // list each time around the scheduler.
689 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
690
691 scheduleCheckWakeupThreads(cap);
692
693 scheduleCheckBlockedThreads(cap);
694
695 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
696 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
697 #endif
698
699 #if defined(PARALLEL_HASKELL)
700 // if messages have been buffered...
701 scheduleSendPendingMessages();
702 #endif
703
704 #if defined(PARALLEL_HASKELL)
705 if (emptyRunQueue(cap)) {
706 receivedFinish = scheduleGetRemoteWork(cap);
707 continue; // a new round, (hopefully) with new work
708 /*
709 in GUM, this a) sends out a FISH and returns IF no fish is
710 out already
711 b) (blocking) awaits and receives messages
712
713 in Eden, this is only the blocking receive, as b) in GUM.
714 */
715 }
716 #endif
717 }
718
719 #if defined(THREADED_RTS)
720 STATIC_INLINE rtsBool
721 shouldYieldCapability (Capability *cap, Task *task)
722 {
723 // we need to yield this capability to someone else if..
724 // - another thread is initiating a GC
725 // - another Task is returning from a foreign call
726 // - the thread at the head of the run queue cannot be run
727 // by this Task (it is bound to another Task, or it is unbound
728 // and this task it bound).
729 return (waiting_for_gc ||
730 cap->returning_tasks_hd != NULL ||
731 (!emptyRunQueue(cap) && (task->tso == NULL
732 ? cap->run_queue_hd->bound != NULL
733 : cap->run_queue_hd->bound != task)));
734 }
735
736 // This is the single place where a Task goes to sleep. There are
737 // two reasons it might need to sleep:
738 // - there are no threads to run
739 // - we need to yield this Capability to someone else
740 // (see shouldYieldCapability())
741 //
742 // Careful: the scheduler loop is quite delicate. Make sure you run
743 // the tests in testsuite/concurrent (all ways) after modifying this,
744 // and also check the benchmarks in nofib/parallel for regressions.
745
746 static void
747 scheduleYield (Capability **pcap, Task *task)
748 {
749 Capability *cap = *pcap;
750
751 // if we have work, and we don't need to give up the Capability, continue.
752 if (!shouldYieldCapability(cap,task) &&
753 (!emptyRunQueue(cap) ||
754 !emptyWakeupQueue(cap) ||
755 blackholes_need_checking ||
756 sched_state >= SCHED_INTERRUPTING))
757 return;
758
759 // otherwise yield (sleep), and keep yielding if necessary.
760 do {
761 yieldCapability(&cap,task);
762 }
763 while (shouldYieldCapability(cap,task));
764
765 // note there may still be no threads on the run queue at this
766 // point, the caller has to check.
767
768 *pcap = cap;
769 return;
770 }
771 #endif
772
773 /* -----------------------------------------------------------------------------
774 * schedulePushWork()
775 *
776 * Push work to other Capabilities if we have some.
777 * -------------------------------------------------------------------------- */
778
779 static void
780 schedulePushWork(Capability *cap USED_IF_THREADS,
781 Task *task USED_IF_THREADS)
782 {
783 /* following code not for PARALLEL_HASKELL. I kept the call general,
784 future GUM versions might use pushing in a distributed setup */
785 #if defined(THREADED_RTS)
786
787 Capability *free_caps[n_capabilities], *cap0;
788 nat i, n_free_caps;
789
790 // migration can be turned off with +RTS -qg
791 if (!RtsFlags.ParFlags.migrate) return;
792
793 // Check whether we have more threads on our run queue, or sparks
794 // in our pool, that we could hand to another Capability.
795 if (cap->run_queue_hd == END_TSO_QUEUE) {
796 if (sparkPoolSizeCap(cap) < 2) return;
797 } else {
798 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
799 sparkPoolSizeCap(cap) < 1) return;
800 }
801
802 // First grab as many free Capabilities as we can.
803 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
804 cap0 = &capabilities[i];
805 if (cap != cap0 && tryGrabCapability(cap0,task)) {
806 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
807 // it already has some work, we just grabbed it at
808 // the wrong moment. Or maybe it's deadlocked!
809 releaseCapability(cap0);
810 } else {
811 free_caps[n_free_caps++] = cap0;
812 }
813 }
814 }
815
816 // we now have n_free_caps free capabilities stashed in
817 // free_caps[]. Share our run queue equally with them. This is
818 // probably the simplest thing we could do; improvements we might
819 // want to do include:
820 //
821 // - giving high priority to moving relatively new threads, on
822 // the gournds that they haven't had time to build up a
823 // working set in the cache on this CPU/Capability.
824 //
825 // - giving low priority to moving long-lived threads
826
827 if (n_free_caps > 0) {
828 StgTSO *prev, *t, *next;
829 rtsBool pushed_to_all;
830
831 debugTrace(DEBUG_sched,
832 "cap %d: %s and %d free capabilities, sharing...",
833 cap->no,
834 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
835 "excess threads on run queue":"sparks to share (>=2)",
836 n_free_caps);
837
838 i = 0;
839 pushed_to_all = rtsFalse;
840
841 if (cap->run_queue_hd != END_TSO_QUEUE) {
842 prev = cap->run_queue_hd;
843 t = prev->_link;
844 prev->_link = END_TSO_QUEUE;
845 for (; t != END_TSO_QUEUE; t = next) {
846 next = t->_link;
847 t->_link = END_TSO_QUEUE;
848 if (t->what_next == ThreadRelocated
849 || t->bound == task // don't move my bound thread
850 || tsoLocked(t)) { // don't move a locked thread
851 setTSOLink(cap, prev, t);
852 prev = t;
853 } else if (i == n_free_caps) {
854 pushed_to_all = rtsTrue;
855 i = 0;
856 // keep one for us
857 setTSOLink(cap, prev, t);
858 prev = t;
859 } else {
860 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
861 appendToRunQueue(free_caps[i],t);
862
863 postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
864
865 if (t->bound) { t->bound->cap = free_caps[i]; }
866 t->cap = free_caps[i];
867 i++;
868 }
869 }
870 cap->run_queue_tl = prev;
871 }
872
873 #ifdef SPARK_PUSHING
874 /* JB I left this code in place, it would work but is not necessary */
875
876 // If there are some free capabilities that we didn't push any
877 // threads to, then try to push a spark to each one.
878 if (!pushed_to_all) {
879 StgClosure *spark;
880 // i is the next free capability to push to
881 for (; i < n_free_caps; i++) {
882 if (emptySparkPoolCap(free_caps[i])) {
883 spark = tryStealSpark(cap->sparks);
884 if (spark != NULL) {
885 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
886
887 postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
888
889 newSpark(&(free_caps[i]->r), spark);
890 }
891 }
892 }
893 }
894 #endif /* SPARK_PUSHING */
895
896 // release the capabilities
897 for (i = 0; i < n_free_caps; i++) {
898 task->cap = free_caps[i];
899 releaseAndWakeupCapability(free_caps[i]);
900 }
901 }
902 task->cap = cap; // reset to point to our Capability.
903
904 #endif /* THREADED_RTS */
905
906 }
907
908 /* ----------------------------------------------------------------------------
909 * Start any pending signal handlers
910 * ------------------------------------------------------------------------- */
911
912 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
913 static void
914 scheduleStartSignalHandlers(Capability *cap)
915 {
916 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
917 // safe outside the lock
918 startSignalHandlers(cap);
919 }
920 }
921 #else
922 static void
923 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
924 {
925 }
926 #endif
927
928 /* ----------------------------------------------------------------------------
929 * Check for blocked threads that can be woken up.
930 * ------------------------------------------------------------------------- */
931
932 static void
933 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
934 {
935 #if !defined(THREADED_RTS)
936 //
937 // Check whether any waiting threads need to be woken up. If the
938 // run queue is empty, and there are no other tasks running, we
939 // can wait indefinitely for something to happen.
940 //
941 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
942 {
943 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
944 }
945 #endif
946 }
947
948
949 /* ----------------------------------------------------------------------------
950 * Check for threads woken up by other Capabilities
951 * ------------------------------------------------------------------------- */
952
953 static void
954 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
955 {
956 #if defined(THREADED_RTS)
957 // Any threads that were woken up by other Capabilities get
958 // appended to our run queue.
959 if (!emptyWakeupQueue(cap)) {
960 ACQUIRE_LOCK(&cap->lock);
961 if (emptyRunQueue(cap)) {
962 cap->run_queue_hd = cap->wakeup_queue_hd;
963 cap->run_queue_tl = cap->wakeup_queue_tl;
964 } else {
965 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
966 cap->run_queue_tl = cap->wakeup_queue_tl;
967 }
968 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
969 RELEASE_LOCK(&cap->lock);
970 }
971 #endif
972 }
973
974 /* ----------------------------------------------------------------------------
975 * Check for threads blocked on BLACKHOLEs that can be woken up
976 * ------------------------------------------------------------------------- */
977 static void
978 scheduleCheckBlackHoles (Capability *cap)
979 {
980 if ( blackholes_need_checking ) // check without the lock first
981 {
982 ACQUIRE_LOCK(&sched_mutex);
983 if ( blackholes_need_checking ) {
984 blackholes_need_checking = rtsFalse;
985 // important that we reset the flag *before* checking the
986 // blackhole queue, otherwise we could get deadlock. This
987 // happens as follows: we wake up a thread that
988 // immediately runs on another Capability, blocks on a
989 // blackhole, and then we reset the blackholes_need_checking flag.
990 checkBlackHoles(cap);
991 }
992 RELEASE_LOCK(&sched_mutex);
993 }
994 }
995
996 /* ----------------------------------------------------------------------------
997 * Detect deadlock conditions and attempt to resolve them.
998 * ------------------------------------------------------------------------- */
999
1000 static void
1001 scheduleDetectDeadlock (Capability *cap, Task *task)
1002 {
1003
1004 #if defined(PARALLEL_HASKELL)
1005 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
1006 return;
1007 #endif
1008
1009 /*
1010 * Detect deadlock: when we have no threads to run, there are no
1011 * threads blocked, waiting for I/O, or sleeping, and all the
1012 * other tasks are waiting for work, we must have a deadlock of
1013 * some description.
1014 */
1015 if ( emptyThreadQueues(cap) )
1016 {
1017 #if defined(THREADED_RTS)
1018 /*
1019 * In the threaded RTS, we only check for deadlock if there
1020 * has been no activity in a complete timeslice. This means
1021 * we won't eagerly start a full GC just because we don't have
1022 * any threads to run currently.
1023 */
1024 if (recent_activity != ACTIVITY_INACTIVE) return;
1025 #endif
1026
1027 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1028
1029 // Garbage collection can release some new threads due to
1030 // either (a) finalizers or (b) threads resurrected because
1031 // they are unreachable and will therefore be sent an
1032 // exception. Any threads thus released will be immediately
1033 // runnable.
1034 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1035 // when force_major == rtsTrue. scheduleDoGC sets
1036 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1037 // signal.
1038
1039 if ( !emptyRunQueue(cap) ) return;
1040
1041 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1042 /* If we have user-installed signal handlers, then wait
1043 * for signals to arrive rather then bombing out with a
1044 * deadlock.
1045 */
1046 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1047 debugTrace(DEBUG_sched,
1048 "still deadlocked, waiting for signals...");
1049
1050 awaitUserSignals();
1051
1052 if (signals_pending()) {
1053 startSignalHandlers(cap);
1054 }
1055
1056 // either we have threads to run, or we were interrupted:
1057 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1058
1059 return;
1060 }
1061 #endif
1062
1063 #if !defined(THREADED_RTS)
1064 /* Probably a real deadlock. Send the current main thread the
1065 * Deadlock exception.
1066 */
1067 if (task->tso) {
1068 switch (task->tso->why_blocked) {
1069 case BlockedOnSTM:
1070 case BlockedOnBlackHole:
1071 case BlockedOnException:
1072 case BlockedOnMVar:
1073 throwToSingleThreaded(cap, task->tso,
1074 (StgClosure *)nonTermination_closure);
1075 return;
1076 default:
1077 barf("deadlock: main thread blocked in a strange way");
1078 }
1079 }
1080 return;
1081 #endif
1082 }
1083 }
1084
1085
1086 /* ----------------------------------------------------------------------------
1087 * Send pending messages (PARALLEL_HASKELL only)
1088 * ------------------------------------------------------------------------- */
1089
1090 #if defined(PARALLEL_HASKELL)
1091 static void
1092 scheduleSendPendingMessages(void)
1093 {
1094
1095 # if defined(PAR) // global Mem.Mgmt., omit for now
1096 if (PendingFetches != END_BF_QUEUE) {
1097 processFetches();
1098 }
1099 # endif
1100
1101 if (RtsFlags.ParFlags.BufferTime) {
1102 // if we use message buffering, we must send away all message
1103 // packets which have become too old...
1104 sendOldBuffers();
1105 }
1106 }
1107 #endif
1108
1109 /* ----------------------------------------------------------------------------
1110 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1111 * ------------------------------------------------------------------------- */
1112
1113 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1114 static void
1115 scheduleActivateSpark(Capability *cap)
1116 {
1117 if (anySparks())
1118 {
1119 createSparkThread(cap);
1120 debugTrace(DEBUG_sched, "creating a spark thread");
1121 }
1122 }
1123 #endif // PARALLEL_HASKELL || THREADED_RTS
1124
1125 /* ----------------------------------------------------------------------------
1126 * Get work from a remote node (PARALLEL_HASKELL only)
1127 * ------------------------------------------------------------------------- */
1128
1129 #if defined(PARALLEL_HASKELL)
1130 static rtsBool /* return value used in PARALLEL_HASKELL only */
1131 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1132 {
1133 #if defined(PARALLEL_HASKELL)
1134 rtsBool receivedFinish = rtsFalse;
1135
1136 // idle() , i.e. send all buffers, wait for work
1137 if (RtsFlags.ParFlags.BufferTime) {
1138 IF_PAR_DEBUG(verbose,
1139 debugBelch("...send all pending data,"));
1140 {
1141 nat i;
1142 for (i=1; i<=nPEs; i++)
1143 sendImmediately(i); // send all messages away immediately
1144 }
1145 }
1146
1147 /* this would be the place for fishing in GUM...
1148
1149 if (no-earlier-fish-around)
1150 sendFish(choosePe());
1151 */
1152
1153 // Eden:just look for incoming messages (blocking receive)
1154 IF_PAR_DEBUG(verbose,
1155 debugBelch("...wait for incoming messages...\n"));
1156 processMessages(cap, &receivedFinish); // blocking receive...
1157
1158
1159 return receivedFinish;
1160 // reenter scheduling look after having received something
1161
1162 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1163
1164 return rtsFalse; /* return value unused in THREADED_RTS */
1165
1166 #endif /* PARALLEL_HASKELL */
1167 }
1168 #endif // PARALLEL_HASKELL || THREADED_RTS
1169
1170 /* ----------------------------------------------------------------------------
1171 * After running a thread...
1172 * ------------------------------------------------------------------------- */
1173
1174 static void
1175 schedulePostRunThread (Capability *cap, StgTSO *t)
1176 {
1177 // We have to be able to catch transactions that are in an
1178 // infinite loop as a result of seeing an inconsistent view of
1179 // memory, e.g.
1180 //
1181 // atomically $ do
1182 // [a,b] <- mapM readTVar [ta,tb]
1183 // when (a == b) loop
1184 //
1185 // and a is never equal to b given a consistent view of memory.
1186 //
1187 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1188 if (!stmValidateNestOfTransactions (t -> trec)) {
1189 debugTrace(DEBUG_sched | DEBUG_stm,
1190 "trec %p found wasting its time", t);
1191
1192 // strip the stack back to the
1193 // ATOMICALLY_FRAME, aborting the (nested)
1194 // transaction, and saving the stack of any
1195 // partially-evaluated thunks on the heap.
1196 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1197
1198 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1199 }
1200 }
1201
1202 /* some statistics gathering in the parallel case */
1203 }
1204
1205 /* -----------------------------------------------------------------------------
1206 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1207 * -------------------------------------------------------------------------- */
1208
1209 static rtsBool
1210 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1211 {
1212 // did the task ask for a large block?
1213 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1214 // if so, get one and push it on the front of the nursery.
1215 bdescr *bd;
1216 lnat blocks;
1217
1218 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1219
1220 debugTrace(DEBUG_sched,
1221 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1222 (long)t->id, whatNext_strs[t->what_next], blocks);
1223
1224 // don't do this if the nursery is (nearly) full, we'll GC first.
1225 if (cap->r.rCurrentNursery->link != NULL ||
1226 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1227 // if the nursery has only one block.
1228
1229 ACQUIRE_SM_LOCK
1230 bd = allocGroup( blocks );
1231 RELEASE_SM_LOCK
1232 cap->r.rNursery->n_blocks += blocks;
1233
1234 // link the new group into the list
1235 bd->link = cap->r.rCurrentNursery;
1236 bd->u.back = cap->r.rCurrentNursery->u.back;
1237 if (cap->r.rCurrentNursery->u.back != NULL) {
1238 cap->r.rCurrentNursery->u.back->link = bd;
1239 } else {
1240 #if !defined(THREADED_RTS)
1241 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1242 g0s0 == cap->r.rNursery);
1243 #endif
1244 cap->r.rNursery->blocks = bd;
1245 }
1246 cap->r.rCurrentNursery->u.back = bd;
1247
1248 // initialise it as a nursery block. We initialise the
1249 // step, gen_no, and flags field of *every* sub-block in
1250 // this large block, because this is easier than making
1251 // sure that we always find the block head of a large
1252 // block whenever we call Bdescr() (eg. evacuate() and
1253 // isAlive() in the GC would both have to do this, at
1254 // least).
1255 {
1256 bdescr *x;
1257 for (x = bd; x < bd + blocks; x++) {
1258 x->step = cap->r.rNursery;
1259 x->gen_no = 0;
1260 x->flags = 0;
1261 }
1262 }
1263
1264 // This assert can be a killer if the app is doing lots
1265 // of large block allocations.
1266 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1267
1268 // now update the nursery to point to the new block
1269 cap->r.rCurrentNursery = bd;
1270
1271 // we might be unlucky and have another thread get on the
1272 // run queue before us and steal the large block, but in that
1273 // case the thread will just end up requesting another large
1274 // block.
1275 pushOnRunQueue(cap,t);
1276 return rtsFalse; /* not actually GC'ing */
1277 }
1278 }
1279
1280 debugTrace(DEBUG_sched,
1281 "--<< thread %ld (%s) stopped: HeapOverflow",
1282 (long)t->id, whatNext_strs[t->what_next]);
1283
1284 if (cap->r.rHpLim == NULL || cap->context_switch) {
1285 // Sometimes we miss a context switch, e.g. when calling
1286 // primitives in a tight loop, MAYBE_GC() doesn't check the
1287 // context switch flag, and we end up waiting for a GC.
1288 // See #1984, and concurrent/should_run/1984
1289 cap->context_switch = 0;
1290 addToRunQueue(cap,t);
1291 } else {
1292 pushOnRunQueue(cap,t);
1293 }
1294 return rtsTrue;
1295 /* actual GC is done at the end of the while loop in schedule() */
1296 }
1297
1298 /* -----------------------------------------------------------------------------
1299 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1300 * -------------------------------------------------------------------------- */
1301
1302 static void
1303 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1304 {
1305 debugTrace (DEBUG_sched,
1306 "--<< thread %ld (%s) stopped, StackOverflow",
1307 (long)t->id, whatNext_strs[t->what_next]);
1308
1309 /* just adjust the stack for this thread, then pop it back
1310 * on the run queue.
1311 */
1312 {
1313 /* enlarge the stack */
1314 StgTSO *new_t = threadStackOverflow(cap, t);
1315
1316 /* The TSO attached to this Task may have moved, so update the
1317 * pointer to it.
1318 */
1319 if (task->tso == t) {
1320 task->tso = new_t;
1321 }
1322 pushOnRunQueue(cap,new_t);
1323 }
1324 }
1325
1326 /* -----------------------------------------------------------------------------
1327 * Handle a thread that returned to the scheduler with ThreadYielding
1328 * -------------------------------------------------------------------------- */
1329
1330 static rtsBool
1331 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1332 {
1333 // Reset the context switch flag. We don't do this just before
1334 // running the thread, because that would mean we would lose ticks
1335 // during GC, which can lead to unfair scheduling (a thread hogs
1336 // the CPU because the tick always arrives during GC). This way
1337 // penalises threads that do a lot of allocation, but that seems
1338 // better than the alternative.
1339 cap->context_switch = 0;
1340
1341 /* put the thread back on the run queue. Then, if we're ready to
1342 * GC, check whether this is the last task to stop. If so, wake
1343 * up the GC thread. getThread will block during a GC until the
1344 * GC is finished.
1345 */
1346 #ifdef DEBUG
1347 if (t->what_next != prev_what_next) {
1348 debugTrace(DEBUG_sched,
1349 "--<< thread %ld (%s) stopped to switch evaluators",
1350 (long)t->id, whatNext_strs[t->what_next]);
1351 } else {
1352 debugTrace(DEBUG_sched,
1353 "--<< thread %ld (%s) stopped, yielding",
1354 (long)t->id, whatNext_strs[t->what_next]);
1355 }
1356 #endif
1357
1358 IF_DEBUG(sanity,
1359 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1360 checkTSO(t));
1361 ASSERT(t->_link == END_TSO_QUEUE);
1362
1363 // Shortcut if we're just switching evaluators: don't bother
1364 // doing stack squeezing (which can be expensive), just run the
1365 // thread.
1366 if (t->what_next != prev_what_next) {
1367 return rtsTrue;
1368 }
1369
1370 addToRunQueue(cap,t);
1371
1372 return rtsFalse;
1373 }
1374
1375 /* -----------------------------------------------------------------------------
1376 * Handle a thread that returned to the scheduler with ThreadBlocked
1377 * -------------------------------------------------------------------------- */
1378
1379 static void
1380 scheduleHandleThreadBlocked( StgTSO *t
1381 #if !defined(GRAN) && !defined(DEBUG)
1382 STG_UNUSED
1383 #endif
1384 )
1385 {
1386
1387 // We don't need to do anything. The thread is blocked, and it
1388 // has tidied up its stack and placed itself on whatever queue
1389 // it needs to be on.
1390
1391 // ASSERT(t->why_blocked != NotBlocked);
1392 // Not true: for example,
1393 // - in THREADED_RTS, the thread may already have been woken
1394 // up by another Capability. This actually happens: try
1395 // conc023 +RTS -N2.
1396 // - the thread may have woken itself up already, because
1397 // threadPaused() might have raised a blocked throwTo
1398 // exception, see maybePerformBlockedException().
1399
1400 #ifdef DEBUG
1401 if (traceClass(DEBUG_sched)) {
1402 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1403 (unsigned long)t->id, whatNext_strs[t->what_next]);
1404 printThreadBlockage(t);
1405 debugTraceEnd();
1406 }
1407 #endif
1408 }
1409
1410 /* -----------------------------------------------------------------------------
1411 * Handle a thread that returned to the scheduler with ThreadFinished
1412 * -------------------------------------------------------------------------- */
1413
1414 static rtsBool
1415 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1416 {
1417 /* Need to check whether this was a main thread, and if so,
1418 * return with the return value.
1419 *
1420 * We also end up here if the thread kills itself with an
1421 * uncaught exception, see Exception.cmm.
1422 */
1423 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1424 (unsigned long)t->id, whatNext_strs[t->what_next]);
1425
1426 // blocked exceptions can now complete, even if the thread was in
1427 // blocked mode (see #2910). This unconditionally calls
1428 // lockTSO(), which ensures that we don't miss any threads that
1429 // are engaged in throwTo() with this thread as a target.
1430 awakenBlockedExceptionQueue (cap, t);
1431
1432 //
1433 // Check whether the thread that just completed was a bound
1434 // thread, and if so return with the result.
1435 //
1436 // There is an assumption here that all thread completion goes
1437 // through this point; we need to make sure that if a thread
1438 // ends up in the ThreadKilled state, that it stays on the run
1439 // queue so it can be dealt with here.
1440 //
1441
1442 if (t->bound) {
1443
1444 if (t->bound != task) {
1445 #if !defined(THREADED_RTS)
1446 // Must be a bound thread that is not the topmost one. Leave
1447 // it on the run queue until the stack has unwound to the
1448 // point where we can deal with this. Leaving it on the run
1449 // queue also ensures that the garbage collector knows about
1450 // this thread and its return value (it gets dropped from the
1451 // step->threads list so there's no other way to find it).
1452 appendToRunQueue(cap,t);
1453 return rtsFalse;
1454 #else
1455 // this cannot happen in the threaded RTS, because a
1456 // bound thread can only be run by the appropriate Task.
1457 barf("finished bound thread that isn't mine");
1458 #endif
1459 }
1460
1461 ASSERT(task->tso == t);
1462
1463 if (t->what_next == ThreadComplete) {
1464 if (task->ret) {
1465 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1466 *(task->ret) = (StgClosure *)task->tso->sp[1];
1467 }
1468 task->stat = Success;
1469 } else {
1470 if (task->ret) {
1471 *(task->ret) = NULL;
1472 }
1473 if (sched_state >= SCHED_INTERRUPTING) {
1474 if (heap_overflow) {
1475 task->stat = HeapExhausted;
1476 } else {
1477 task->stat = Interrupted;
1478 }
1479 } else {
1480 task->stat = Killed;
1481 }
1482 }
1483 #ifdef DEBUG
1484 removeThreadLabel((StgWord)task->tso->id);
1485 #endif
1486 return rtsTrue; // tells schedule() to return
1487 }
1488
1489 return rtsFalse;
1490 }
1491
1492 /* -----------------------------------------------------------------------------
1493 * Perform a heap census
1494 * -------------------------------------------------------------------------- */
1495
1496 static rtsBool
1497 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1498 {
1499 // When we have +RTS -i0 and we're heap profiling, do a census at
1500 // every GC. This lets us get repeatable runs for debugging.
1501 if (performHeapProfile ||
1502 (RtsFlags.ProfFlags.profileInterval==0 &&
1503 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1504 return rtsTrue;
1505 } else {
1506 return rtsFalse;
1507 }
1508 }
1509
1510 /* -----------------------------------------------------------------------------
1511 * Perform a garbage collection if necessary
1512 * -------------------------------------------------------------------------- */
1513
1514 static Capability *
1515 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1516 {
1517 rtsBool heap_census;
1518 #ifdef THREADED_RTS
1519 /* extern static volatile StgWord waiting_for_gc;
1520 lives inside capability.c */
1521 rtsBool gc_type, prev_pending_gc;
1522 nat i;
1523 #endif
1524
1525 if (sched_state == SCHED_SHUTTING_DOWN) {
1526 // The final GC has already been done, and the system is
1527 // shutting down. We'll probably deadlock if we try to GC
1528 // now.
1529 return cap;
1530 }
1531
1532 #ifdef THREADED_RTS
1533 if (sched_state < SCHED_INTERRUPTING
1534 && RtsFlags.ParFlags.parGcEnabled
1535 && N >= RtsFlags.ParFlags.parGcGen
1536 && ! oldest_gen->steps[0].mark)
1537 {
1538 gc_type = PENDING_GC_PAR;
1539 } else {
1540 gc_type = PENDING_GC_SEQ;
1541 }
1542
1543 // In order to GC, there must be no threads running Haskell code.
1544 // Therefore, the GC thread needs to hold *all* the capabilities,
1545 // and release them after the GC has completed.
1546 //
1547 // This seems to be the simplest way: previous attempts involved
1548 // making all the threads with capabilities give up their
1549 // capabilities and sleep except for the *last* one, which
1550 // actually did the GC. But it's quite hard to arrange for all
1551 // the other tasks to sleep and stay asleep.
1552 //
1553
1554 /* Other capabilities are prevented from running yet more Haskell
1555 threads if waiting_for_gc is set. Tested inside
1556 yieldCapability() and releaseCapability() in Capability.c */
1557
1558 prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1559 if (prev_pending_gc) {
1560 do {
1561 debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
1562 prev_pending_gc);
1563 ASSERT(cap);
1564 yieldCapability(&cap,task);
1565 } while (waiting_for_gc);
1566 return cap; // NOTE: task->cap might have changed here
1567 }
1568
1569 setContextSwitches();
1570
1571 // The final shutdown GC is always single-threaded, because it's
1572 // possible that some of the Capabilities have no worker threads.
1573
1574 if (gc_type == PENDING_GC_SEQ)
1575 {
1576 postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1577 // single-threaded GC: grab all the capabilities
1578 for (i=0; i < n_capabilities; i++) {
1579 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1580 if (cap != &capabilities[i]) {
1581 Capability *pcap = &capabilities[i];
1582 // we better hope this task doesn't get migrated to
1583 // another Capability while we're waiting for this one.
1584 // It won't, because load balancing happens while we have
1585 // all the Capabilities, but even so it's a slightly
1586 // unsavoury invariant.
1587 task->cap = pcap;
1588 waitForReturnCapability(&pcap, task);
1589 if (pcap != &capabilities[i]) {
1590 barf("scheduleDoGC: got the wrong capability");
1591 }
1592 }
1593 }
1594 }
1595 else
1596 {
1597 // multi-threaded GC: make sure all the Capabilities donate one
1598 // GC thread each.
1599 postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1600 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1601
1602 waitForGcThreads(cap);
1603 }
1604 #endif
1605
1606 // so this happens periodically:
1607 if (cap) scheduleCheckBlackHoles(cap);
1608
1609 IF_DEBUG(scheduler, printAllThreads());
1610
1611 delete_threads_and_gc:
1612 /*
1613 * We now have all the capabilities; if we're in an interrupting
1614 * state, then we should take the opportunity to delete all the
1615 * threads in the system.
1616 */
1617 if (sched_state == SCHED_INTERRUPTING) {
1618 deleteAllThreads(cap);
1619 sched_state = SCHED_SHUTTING_DOWN;
1620 }
1621
1622 heap_census = scheduleNeedHeapProfile(rtsTrue);
1623
1624 #if defined(THREADED_RTS)
1625 postEvent(cap, EVENT_GC_START, 0, 0);
1626 debugTrace(DEBUG_sched, "doing GC");
1627 // reset waiting_for_gc *before* GC, so that when the GC threads
1628 // emerge they don't immediately re-enter the GC.
1629 waiting_for_gc = 0;
1630 GarbageCollect(force_major || heap_census, gc_type, cap);
1631 #else
1632 GarbageCollect(force_major || heap_census, 0, cap);
1633 #endif
1634 postEvent(cap, EVENT_GC_END, 0, 0);
1635
1636 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1637 {
1638 // We are doing a GC because the system has been idle for a
1639 // timeslice and we need to check for deadlock. Record the
1640 // fact that we've done a GC and turn off the timer signal;
1641 // it will get re-enabled if we run any threads after the GC.
1642 recent_activity = ACTIVITY_DONE_GC;
1643 stopTimer();
1644 }
1645 else
1646 {
1647 // the GC might have taken long enough for the timer to set
1648 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1649 // necessarily deadlocked:
1650 recent_activity = ACTIVITY_YES;
1651 }
1652
1653 #if defined(THREADED_RTS)
1654 if (gc_type == PENDING_GC_PAR)
1655 {
1656 releaseGCThreads(cap);
1657 }
1658 #endif
1659
1660 if (heap_census) {
1661 debugTrace(DEBUG_sched, "performing heap census");
1662 heapCensus();
1663 performHeapProfile = rtsFalse;
1664 }
1665
1666 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1667 // GC set the heap_overflow flag, so we should proceed with
1668 // an orderly shutdown now. Ultimately we want the main
1669 // thread to return to its caller with HeapExhausted, at which
1670 // point the caller should call hs_exit(). The first step is
1671 // to delete all the threads.
1672 //
1673 // Another way to do this would be to raise an exception in
1674 // the main thread, which we really should do because it gives
1675 // the program a chance to clean up. But how do we find the
1676 // main thread? It should presumably be the same one that
1677 // gets ^C exceptions, but that's all done on the Haskell side
1678 // (GHC.TopHandler).
1679 sched_state = SCHED_INTERRUPTING;
1680 goto delete_threads_and_gc;
1681 }
1682
1683 #ifdef SPARKBALANCE
1684 /* JB
1685 Once we are all together... this would be the place to balance all
1686 spark pools. No concurrent stealing or adding of new sparks can
1687 occur. Should be defined in Sparks.c. */
1688 balanceSparkPoolsCaps(n_capabilities, capabilities);
1689 #endif
1690
1691 #if defined(THREADED_RTS)
1692 if (gc_type == PENDING_GC_SEQ) {
1693 // release our stash of capabilities.
1694 for (i = 0; i < n_capabilities; i++) {
1695 if (cap != &capabilities[i]) {
1696 task->cap = &capabilities[i];
1697 releaseCapability(&capabilities[i]);
1698 }
1699 }
1700 }
1701 if (cap) {
1702 task->cap = cap;
1703 } else {
1704 task->cap = NULL;
1705 }
1706 #endif
1707
1708 return cap;
1709 }
1710
1711 /* ---------------------------------------------------------------------------
1712 * Singleton fork(). Do not copy any running threads.
1713 * ------------------------------------------------------------------------- */
1714
1715 pid_t
1716 forkProcess(HsStablePtr *entry
1717 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1718 STG_UNUSED
1719 #endif
1720 )
1721 {
1722 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1723 Task *task;
1724 pid_t pid;
1725 StgTSO* t,*next;
1726 Capability *cap;
1727 nat s;
1728
1729 #if defined(THREADED_RTS)
1730 if (RtsFlags.ParFlags.nNodes > 1) {
1731 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1732 stg_exit(EXIT_FAILURE);
1733 }
1734 #endif
1735
1736 debugTrace(DEBUG_sched, "forking!");
1737
1738 // ToDo: for SMP, we should probably acquire *all* the capabilities
1739 cap = rts_lock();
1740
1741 // no funny business: hold locks while we fork, otherwise if some
1742 // other thread is holding a lock when the fork happens, the data
1743 // structure protected by the lock will forever be in an
1744 // inconsistent state in the child. See also #1391.
1745 ACQUIRE_LOCK(&sched_mutex);
1746 ACQUIRE_LOCK(&cap->lock);
1747 ACQUIRE_LOCK(&cap->running_task->lock);
1748
1749 pid = fork();
1750
1751 if (pid) { // parent
1752
1753 RELEASE_LOCK(&sched_mutex);
1754 RELEASE_LOCK(&cap->lock);
1755 RELEASE_LOCK(&cap->running_task->lock);
1756
1757 // just return the pid
1758 rts_unlock(cap);
1759 return pid;
1760
1761 } else { // child
1762
1763 #if defined(THREADED_RTS)
1764 initMutex(&sched_mutex);
1765 initMutex(&cap->lock);
1766 initMutex(&cap->running_task->lock);
1767 #endif
1768
1769 // Now, all OS threads except the thread that forked are
1770 // stopped. We need to stop all Haskell threads, including
1771 // those involved in foreign calls. Also we need to delete
1772 // all Tasks, because they correspond to OS threads that are
1773 // now gone.
1774
1775 for (s = 0; s < total_steps; s++) {
1776 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1777 if (t->what_next == ThreadRelocated) {
1778 next = t->_link;
1779 } else {
1780 next = t->global_link;
1781 // don't allow threads to catch the ThreadKilled
1782 // exception, but we do want to raiseAsync() because these
1783 // threads may be evaluating thunks that we need later.
1784 deleteThread_(cap,t);
1785 }
1786 }
1787 }
1788
1789 // Empty the run queue. It seems tempting to let all the
1790 // killed threads stay on the run queue as zombies to be
1791 // cleaned up later, but some of them correspond to bound
1792 // threads for which the corresponding Task does not exist.
1793 cap->run_queue_hd = END_TSO_QUEUE;
1794 cap->run_queue_tl = END_TSO_QUEUE;
1795
1796 // Any suspended C-calling Tasks are no more, their OS threads
1797 // don't exist now:
1798 cap->suspended_ccalling_tasks = NULL;
1799
1800 // Empty the threads lists. Otherwise, the garbage
1801 // collector may attempt to resurrect some of these threads.
1802 for (s = 0; s < total_steps; s++) {
1803 all_steps[s].threads = END_TSO_QUEUE;
1804 }
1805
1806 // Wipe the task list, except the current Task.
1807 ACQUIRE_LOCK(&sched_mutex);
1808 for (task = all_tasks; task != NULL; task=task->all_link) {
1809 if (task != cap->running_task) {
1810 #if defined(THREADED_RTS)
1811 initMutex(&task->lock); // see #1391
1812 #endif
1813 discardTask(task);
1814 }
1815 }
1816 RELEASE_LOCK(&sched_mutex);
1817
1818 #if defined(THREADED_RTS)
1819 // Wipe our spare workers list, they no longer exist. New
1820 // workers will be created if necessary.
1821 cap->spare_workers = NULL;
1822 cap->returning_tasks_hd = NULL;
1823 cap->returning_tasks_tl = NULL;
1824 #endif
1825
1826 // On Unix, all timers are reset in the child, so we need to start
1827 // the timer again.
1828 initTimer();
1829 startTimer();
1830
1831 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1832 rts_checkSchedStatus("forkProcess",cap);
1833
1834 rts_unlock(cap);
1835 hs_exit(); // clean up and exit
1836 stg_exit(EXIT_SUCCESS);
1837 }
1838 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1839 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1840 return -1;
1841 #endif
1842 }
1843
1844 /* ---------------------------------------------------------------------------
1845 * Delete all the threads in the system
1846 * ------------------------------------------------------------------------- */
1847
1848 static void
1849 deleteAllThreads ( Capability *cap )
1850 {
1851 // NOTE: only safe to call if we own all capabilities.
1852
1853 StgTSO* t, *next;
1854 nat s;
1855
1856 debugTrace(DEBUG_sched,"deleting all threads");
1857 for (s = 0; s < total_steps; s++) {
1858 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1859 if (t->what_next == ThreadRelocated) {
1860 next = t->_link;
1861 } else {
1862 next = t->global_link;
1863 deleteThread(cap,t);
1864 }
1865 }
1866 }
1867
1868 // The run queue now contains a bunch of ThreadKilled threads. We
1869 // must not throw these away: the main thread(s) will be in there
1870 // somewhere, and the main scheduler loop has to deal with it.
1871 // Also, the run queue is the only thing keeping these threads from
1872 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1873
1874 #if !defined(THREADED_RTS)
1875 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1876 ASSERT(sleeping_queue == END_TSO_QUEUE);
1877 #endif
1878 }
1879
1880 /* -----------------------------------------------------------------------------
1881 Managing the suspended_ccalling_tasks list.
1882 Locks required: sched_mutex
1883 -------------------------------------------------------------------------- */
1884
1885 STATIC_INLINE void
1886 suspendTask (Capability *cap, Task *task)
1887 {
1888 ASSERT(task->next == NULL && task->prev == NULL);
1889 task->next = cap->suspended_ccalling_tasks;
1890 task->prev = NULL;
1891 if (cap->suspended_ccalling_tasks) {
1892 cap->suspended_ccalling_tasks->prev = task;
1893 }
1894 cap->suspended_ccalling_tasks = task;
1895 }
1896
1897 STATIC_INLINE void
1898 recoverSuspendedTask (Capability *cap, Task *task)
1899 {
1900 if (task->prev) {
1901 task->prev->next = task->next;
1902 } else {
1903 ASSERT(cap->suspended_ccalling_tasks == task);
1904 cap->suspended_ccalling_tasks = task->next;
1905 }
1906 if (task->next) {
1907 task->next->prev = task->prev;
1908 }
1909 task->next = task->prev = NULL;
1910 }
1911
1912 /* ---------------------------------------------------------------------------
1913 * Suspending & resuming Haskell threads.
1914 *
1915 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1916 * its capability before calling the C function. This allows another
1917 * task to pick up the capability and carry on running Haskell
1918 * threads. It also means that if the C call blocks, it won't lock
1919 * the whole system.
1920 *
1921 * The Haskell thread making the C call is put to sleep for the
1922 * duration of the call, on the susepended_ccalling_threads queue. We
1923 * give out a token to the task, which it can use to resume the thread
1924 * on return from the C function.
1925 * ------------------------------------------------------------------------- */
1926
1927 void *
1928 suspendThread (StgRegTable *reg)
1929 {
1930 Capability *cap;
1931 int saved_errno;
1932 StgTSO *tso;
1933 Task *task;
1934 #if mingw32_HOST_OS
1935 StgWord32 saved_winerror;
1936 #endif
1937
1938 saved_errno = errno;
1939 #if mingw32_HOST_OS
1940 saved_winerror = GetLastError();
1941 #endif
1942
1943 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1944 */
1945 cap = regTableToCapability(reg);
1946
1947 task = cap->running_task;
1948 tso = cap->r.rCurrentTSO;
1949
1950 postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1951 debugTrace(DEBUG_sched,
1952 "thread %lu did a safe foreign call",
1953 (unsigned long)cap->r.rCurrentTSO->id);
1954
1955 // XXX this might not be necessary --SDM
1956 tso->what_next = ThreadRunGHC;
1957
1958 threadPaused(cap,tso);
1959
1960 if ((tso->flags & TSO_BLOCKEX) == 0) {
1961 tso->why_blocked = BlockedOnCCall;
1962 tso->flags |= TSO_BLOCKEX;
1963 tso->flags &= ~TSO_INTERRUPTIBLE;
1964 } else {
1965 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1966 }
1967
1968 // Hand back capability
1969 task->suspended_tso = tso;
1970
1971 ACQUIRE_LOCK(&cap->lock);
1972
1973 suspendTask(cap,task);
1974 cap->in_haskell = rtsFalse;
1975 releaseCapability_(cap,rtsFalse);
1976
1977 RELEASE_LOCK(&cap->lock);
1978
1979 #if defined(THREADED_RTS)
1980 /* Preparing to leave the RTS, so ensure there's a native thread/task
1981 waiting to take over.
1982 */
1983 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1984 #endif
1985
1986 errno = saved_errno;
1987 #if mingw32_HOST_OS
1988 SetLastError(saved_winerror);
1989 #endif
1990 return task;
1991 }
1992
1993 StgRegTable *
1994 resumeThread (void *task_)
1995 {
1996 StgTSO *tso;
1997 Capability *cap;
1998 Task *task = task_;
1999 int saved_errno;
2000 #if mingw32_HOST_OS
2001 StgWord32 saved_winerror;
2002 #endif
2003
2004 saved_errno = errno;
2005 #if mingw32_HOST_OS
2006 saved_winerror = GetLastError();
2007 #endif
2008
2009 cap = task->cap;
2010 // Wait for permission to re-enter the RTS with the result.
2011 waitForReturnCapability(&cap,task);
2012 // we might be on a different capability now... but if so, our
2013 // entry on the suspended_ccalling_tasks list will also have been
2014 // migrated.
2015
2016 // Remove the thread from the suspended list
2017 recoverSuspendedTask(cap,task);
2018
2019 tso = task->suspended_tso;
2020 task->suspended_tso = NULL;
2021 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2022
2023 postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
2024 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2025
2026 if (tso->why_blocked == BlockedOnCCall) {
2027 // avoid locking the TSO if we don't have to
2028 if (tso->blocked_exceptions != END_TSO_QUEUE) {
2029 awakenBlockedExceptionQueue(cap,tso);
2030 }
2031 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2032 }
2033
2034 /* Reset blocking status */
2035 tso->why_blocked = NotBlocked;
2036
2037 cap->r.rCurrentTSO = tso;
2038 cap->in_haskell = rtsTrue;
2039 errno = saved_errno;
2040 #if mingw32_HOST_OS
2041 SetLastError(saved_winerror);
2042 #endif
2043
2044 /* We might have GC'd, mark the TSO dirty again */
2045 dirty_TSO(cap,tso);
2046
2047 IF_DEBUG(sanity, checkTSO(tso));
2048
2049 return &cap->r;
2050 }
2051
2052 /* ---------------------------------------------------------------------------
2053 * scheduleThread()
2054 *
2055 * scheduleThread puts a thread on the end of the runnable queue.
2056 * This will usually be done immediately after a thread is created.
2057 * The caller of scheduleThread must create the thread using e.g.
2058 * createThread and push an appropriate closure
2059 * on this thread's stack before the scheduler is invoked.
2060 * ------------------------------------------------------------------------ */
2061
2062 void
2063 scheduleThread(Capability *cap, StgTSO *tso)
2064 {
2065 // The thread goes at the *end* of the run-queue, to avoid possible
2066 // starvation of any threads already on the queue.
2067 appendToRunQueue(cap,tso);
2068 }
2069
2070 void
2071 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2072 {
2073 #if defined(THREADED_RTS)
2074 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2075 // move this thread from now on.
2076 cpu %= RtsFlags.ParFlags.nNodes;
2077 if (cpu == cap->no) {
2078 appendToRunQueue(cap,tso);
2079 } else {
2080 postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
2081 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2082 }
2083 #else
2084 appendToRunQueue(cap,tso);
2085 #endif
2086 }
2087
2088 Capability *
2089 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2090 {
2091 Task *task;
2092
2093 // We already created/initialised the Task
2094 task = cap->running_task;
2095
2096 // This TSO is now a bound thread; make the Task and TSO
2097 // point to each other.
2098 tso->bound = task;
2099 tso->cap = cap;
2100
2101 task->tso = tso;
2102 task->ret = ret;
2103 task->stat = NoStatus;
2104
2105 appendToRunQueue(cap,tso);
2106
2107 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2108
2109 cap = schedule(cap,task);
2110
2111 ASSERT(task->stat != NoStatus);
2112 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2113
2114 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2115 return cap;
2116 }
2117
2118 /* ----------------------------------------------------------------------------
2119 * Starting Tasks
2120 * ------------------------------------------------------------------------- */
2121
2122 #if defined(THREADED_RTS)
2123 void OSThreadProcAttr
2124 workerStart(Task *task)
2125 {
2126 Capability *cap;
2127
2128 // See startWorkerTask().
2129 ACQUIRE_LOCK(&task->lock);
2130 cap = task->cap;
2131 RELEASE_LOCK(&task->lock);
2132
2133 if (RtsFlags.ParFlags.setAffinity) {
2134 setThreadAffinity(cap->no, n_capabilities);
2135 }
2136
2137 // set the thread-local pointer to the Task:
2138 taskEnter(task);
2139
2140 // schedule() runs without a lock.
2141 cap = schedule(cap,task);
2142
2143 // On exit from schedule(), we have a Capability, but possibly not
2144 // the same one we started with.
2145
2146 // During shutdown, the requirement is that after all the
2147 // Capabilities are shut down, all workers that are shutting down
2148 // have finished workerTaskStop(). This is why we hold on to
2149 // cap->lock until we've finished workerTaskStop() below.
2150 //
2151 // There may be workers still involved in foreign calls; those
2152 // will just block in waitForReturnCapability() because the
2153 // Capability has been shut down.
2154 //
2155 ACQUIRE_LOCK(&cap->lock);
2156 releaseCapability_(cap,rtsFalse);
2157 workerTaskStop(task);
2158 RELEASE_LOCK(&cap->lock);
2159 }
2160 #endif
2161
2162 /* ---------------------------------------------------------------------------
2163 * initScheduler()
2164 *
2165 * Initialise the scheduler. This resets all the queues - if the
2166 * queues contained any threads, they'll be garbage collected at the
2167 * next pass.
2168 *
2169 * ------------------------------------------------------------------------ */
2170
2171 void
2172 initScheduler(void)
2173 {
2174 #if !defined(THREADED_RTS)
2175 blocked_queue_hd = END_TSO_QUEUE;
2176 blocked_queue_tl = END_TSO_QUEUE;
2177 sleeping_queue = END_TSO_QUEUE;
2178 #endif
2179
2180 blackhole_queue = END_TSO_QUEUE;
2181
2182 sched_state = SCHED_RUNNING;
2183 recent_activity = ACTIVITY_YES;
2184
2185 #if defined(THREADED_RTS)
2186 /* Initialise the mutex and condition variables used by
2187 * the scheduler. */
2188 initMutex(&sched_mutex);
2189 #endif
2190
2191 ACQUIRE_LOCK(&sched_mutex);
2192
2193 /* A capability holds the state a native thread needs in
2194 * order to execute STG code. At least one capability is
2195 * floating around (only THREADED_RTS builds have more than one).
2196 */
2197 initCapabilities();
2198
2199 initTaskManager();
2200
2201 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2202 initSparkPools();
2203 #endif
2204
2205 #if defined(THREADED_RTS)
2206 /*
2207 * Eagerly start one worker to run each Capability, except for
2208 * Capability 0. The idea is that we're probably going to start a
2209 * bound thread on Capability 0 pretty soon, so we don't want a
2210 * worker task hogging it.
2211 */
2212 {
2213 nat i;
2214 Capability *cap;
2215 for (i = 1; i < n_capabilities; i++) {
2216 cap = &capabilities[i];
2217 ACQUIRE_LOCK(&cap->lock);
2218 startWorkerTask(cap, workerStart);
2219 RELEASE_LOCK(&cap->lock);
2220 }
2221 }
2222 #endif
2223
2224 RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 void
2228 exitScheduler(
2229 rtsBool wait_foreign
2230 #if !defined(THREADED_RTS)
2231 __attribute__((unused))
2232 #endif
2233 )
2234 /* see Capability.c, shutdownCapability() */
2235 {
2236 Task *task = NULL;
2237
2238 task = newBoundTask();
2239
2240 // If we haven't killed all the threads yet, do it now.
2241 if (sched_state < SCHED_SHUTTING_DOWN) {
2242 sched_state = SCHED_INTERRUPTING;
2243 waitForReturnCapability(&task->cap,task);
2244 scheduleDoGC(task->cap,task,rtsFalse);
2245 releaseCapability(task->cap);
2246 }
2247 sched_state = SCHED_SHUTTING_DOWN;
2248
2249 #if defined(THREADED_RTS)
2250 {
2251 nat i;
2252
2253 for (i = 0; i < n_capabilities; i++) {
2254 shutdownCapability(&capabilities[i], task, wait_foreign);
2255 }
2256 boundTaskExiting(task);
2257 }
2258 #endif
2259 }
2260
2261 void
2262 freeScheduler( void )
2263 {
2264 nat still_running;
2265
2266 ACQUIRE_LOCK(&sched_mutex);
2267 still_running = freeTaskManager();
2268 // We can only free the Capabilities if there are no Tasks still
2269 // running. We might have a Task about to return from a foreign
2270 // call into waitForReturnCapability(), for example (actually,
2271 // this should be the *only* thing that a still-running Task can
2272 // do at this point, and it will block waiting for the
2273 // Capability).
2274 if (still_running == 0) {
2275 freeCapabilities();
2276 if (n_capabilities != 1) {
2277 stgFree(capabilities);
2278 }
2279 }
2280 RELEASE_LOCK(&sched_mutex);
2281 #if defined(THREADED_RTS)
2282 closeMutex(&sched_mutex);
2283 #endif
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287 performGC
2288
2289 This is the interface to the garbage collector from Haskell land.
2290 We provide this so that external C code can allocate and garbage
2291 collect when called from Haskell via _ccall_GC.
2292 -------------------------------------------------------------------------- */
2293
2294 static void
2295 performGC_(rtsBool force_major)
2296 {
2297 Task *task;
2298
2299 // We must grab a new Task here, because the existing Task may be
2300 // associated with a particular Capability, and chained onto the
2301 // suspended_ccalling_tasks queue.
2302 task = newBoundTask();
2303
2304 waitForReturnCapability(&task->cap,task);
2305 scheduleDoGC(task->cap,task,force_major);
2306 releaseCapability(task->cap);
2307 boundTaskExiting(task);
2308 }
2309
2310 void
2311 performGC(void)
2312 {
2313 performGC_(rtsFalse);
2314 }
2315
2316 void
2317 performMajorGC(void)
2318 {
2319 performGC_(rtsTrue);
2320 }
2321
2322 /* -----------------------------------------------------------------------------
2323 Stack overflow
2324
2325 If the thread has reached its maximum stack size, then raise the
2326 StackOverflow exception in the offending thread. Otherwise
2327 relocate the TSO into a larger chunk of memory and adjust its stack
2328 size appropriately.
2329 -------------------------------------------------------------------------- */
2330
2331 static StgTSO *
2332 threadStackOverflow(Capability *cap, StgTSO *tso)
2333 {
2334 nat new_stack_size, stack_words;
2335 lnat new_tso_size;
2336 StgPtr new_sp;
2337 StgTSO *dest;
2338
2339 IF_DEBUG(sanity,checkTSO(tso));
2340
2341 // don't allow throwTo() to modify the blocked_exceptions queue
2342 // while we are moving the TSO:
2343 lockClosure((StgClosure *)tso);
2344
2345 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2346 // NB. never raise a StackOverflow exception if the thread is
2347 // inside Control.Exceptino.block. It is impractical to protect
2348 // against stack overflow exceptions, since virtually anything
2349 // can raise one (even 'catch'), so this is the only sensible
2350 // thing to do here. See bug #767.
2351
2352 debugTrace(DEBUG_gc,
2353 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2354 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2355 IF_DEBUG(gc,
2356 /* If we're debugging, just print out the top of the stack */
2357 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2358 tso->sp+64)));
2359
2360 // Send this thread the StackOverflow exception
2361 unlockTSO(tso);
2362 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2363 return tso;
2364 }
2365
2366 /* Try to double the current stack size. If that takes us over the
2367 * maximum stack size for this thread, then use the maximum instead
2368 * (that is, unless we're already at or over the max size and we
2369 * can't raise the StackOverflow exception (see above), in which
2370 * case just double the size). Finally round up so the TSO ends up as
2371 * a whole number of blocks.
2372 */
2373 if (tso->stack_size >= tso->max_stack_size) {
2374 new_stack_size = tso->stack_size * 2;
2375 } else {
2376 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2377 }
2378 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2379 TSO_STRUCT_SIZE)/sizeof(W_);
2380 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2381 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2382
2383 debugTrace(DEBUG_sched,
2384 "increasing stack size from %ld words to %d.",
2385 (long)tso->stack_size, new_stack_size);
2386
2387 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2388 TICK_ALLOC_TSO(new_stack_size,0);
2389
2390 /* copy the TSO block and the old stack into the new area */
2391 memcpy(dest,tso,TSO_STRUCT_SIZE);
2392 stack_words = tso->stack + tso->stack_size - tso->sp;
2393 new_sp = (P_)dest + new_tso_size - stack_words;
2394 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2395
2396 /* relocate the stack pointers... */
2397 dest->sp = new_sp;
2398 dest->stack_size = new_stack_size;
2399
2400 /* Mark the old TSO as relocated. We have to check for relocated
2401 * TSOs in the garbage collector and any primops that deal with TSOs.
2402 *
2403 * It's important to set the sp value to just beyond the end
2404 * of the stack, so we don't attempt to scavenge any part of the
2405 * dead TSO's stack.
2406 */
2407 tso->what_next = ThreadRelocated;
2408 setTSOLink(cap,tso,dest);
2409 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2410 tso->why_blocked = NotBlocked;
2411
2412 IF_PAR_DEBUG(verbose,
2413 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2414 tso->id, tso, tso->stack_size);
2415 /* If we're debugging, just print out the top of the stack */
2416 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2417 tso->sp+64)));
2418
2419 unlockTSO(dest);
2420 unlockTSO(tso);
2421
2422 IF_DEBUG(sanity,checkTSO(dest));
2423 #if 0
2424 IF_DEBUG(scheduler,printTSO(dest));
2425 #endif
2426
2427 return dest;
2428 }
2429
2430 static StgTSO *
2431 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2432 {
2433 bdescr *bd, *new_bd;
2434 lnat free_w, tso_size_w;
2435 StgTSO *new_tso;
2436
2437 tso_size_w = tso_sizeW(tso);
2438
2439 if (tso_size_w < MBLOCK_SIZE_W ||
2440 // TSO is less than 2 mblocks (since the first mblock is
2441 // shorter than MBLOCK_SIZE_W)
2442 (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2443 // or TSO is not a whole number of megablocks (ensuring
2444 // precondition of splitLargeBlock() below)
2445 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2446 // or stack is using more than 1/4 of the available space
2447 {
2448 // then do nothing
2449 return tso;
2450 }
2451
2452 // don't allow throwTo() to modify the blocked_exceptions queue
2453 // while we are moving the TSO:
2454 lockClosure((StgClosure *)tso);
2455
2456 // this is the number of words we'll free
2457 free_w = round_to_mblocks(tso_size_w/2);
2458
2459 bd = Bdescr((StgPtr)tso);
2460 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2461 bd->free = bd->start + TSO_STRUCT_SIZEW;
2462
2463 new_tso = (StgTSO *)new_bd->start;
2464 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2465 new_tso->stack_size = new_bd->free - new_tso->stack;
2466
2467 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2468 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2469
2470 tso->what_next = ThreadRelocated;
2471 tso->_link = new_tso; // no write barrier reqd: same generation
2472
2473 // The TSO attached to this Task may have moved, so update the
2474 // pointer to it.
2475 if (task->tso == tso) {
2476 task->tso = new_tso;
2477 }
2478
2479 unlockTSO(new_tso);
2480 unlockTSO(tso);
2481
2482 IF_DEBUG(sanity,checkTSO(new_tso));
2483
2484 return new_tso;
2485 }
2486
2487 /* ---------------------------------------------------------------------------
2488 Interrupt execution
2489 - usually called inside a signal handler so it mustn't do anything fancy.
2490 ------------------------------------------------------------------------ */
2491
2492 void
2493 interruptStgRts(void)
2494 {
2495 sched_state = SCHED_INTERRUPTING;
2496 setContextSwitches();
2497 wakeUpRts();
2498 }
2499
2500 /* -----------------------------------------------------------------------------
2501 Wake up the RTS
2502
2503 This function causes at least one OS thread to wake up and run the
2504 scheduler loop. It is invoked when the RTS might be deadlocked, or
2505 an external event has arrived that may need servicing (eg. a
2506 keyboard interrupt).
2507
2508 In the single-threaded RTS we don't do anything here; we only have
2509 one thread anyway, and the event that caused us to want to wake up
2510 will have interrupted any blocking system call in progress anyway.
2511 -------------------------------------------------------------------------- */
2512
2513 void
2514 wakeUpRts(void)
2515 {
2516 #if defined(THREADED_RTS)
2517 // This forces the IO Manager thread to wakeup, which will
2518 // in turn ensure that some OS thread wakes up and runs the
2519 // scheduler loop, which will cause a GC and deadlock check.
2520 ioManagerWakeup();
2521 #endif
2522 }
2523
2524 /* -----------------------------------------------------------------------------
2525 * checkBlackHoles()
2526 *
2527 * Check the blackhole_queue for threads that can be woken up. We do
2528 * this periodically: before every GC, and whenever the run queue is
2529 * empty.
2530 *
2531 * An elegant solution might be to just wake up all the blocked
2532 * threads with awakenBlockedQueue occasionally: they'll go back to
2533 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2534 * doesn't give us a way to tell whether we've actually managed to
2535 * wake up any threads, so we would be busy-waiting.
2536 *
2537 * -------------------------------------------------------------------------- */
2538
2539 static rtsBool
2540 checkBlackHoles (Capability *cap)
2541 {
2542 StgTSO **prev, *t;
2543 rtsBool any_woke_up = rtsFalse;
2544 StgHalfWord type;
2545
2546 // blackhole_queue is global:
2547 ASSERT_LOCK_HELD(&sched_mutex);
2548
2549 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2550
2551 // ASSUMES: sched_mutex
2552 prev = &blackhole_queue;
2553 t = blackhole_queue;
2554 while (t != END_TSO_QUEUE) {
2555 if (t->what_next == ThreadRelocated) {
2556 t = t->_link;
2557 continue;
2558 }
2559 ASSERT(t->why_blocked == BlockedOnBlackHole);
2560 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2561 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2562 IF_DEBUG(sanity,checkTSO(t));
2563 t = unblockOne(cap, t);
2564 *prev = t;
2565 any_woke_up = rtsTrue;
2566 } else {
2567 prev = &t->_link;
2568 t = t->_link;
2569 }
2570 }
2571
2572 return any_woke_up;
2573 }
2574
2575 /* -----------------------------------------------------------------------------
2576 Deleting threads
2577
2578 This is used for interruption (^C) and forking, and corresponds to
2579 raising an exception but without letting the thread catch the
2580 exception.
2581 -------------------------------------------------------------------------- */
2582
2583 static void
2584 deleteThread (Capability *cap, StgTSO *tso)
2585 {
2586 // NOTE: must only be called on a TSO that we have exclusive
2587 // access to, because we will call throwToSingleThreaded() below.
2588 // The TSO must be on the run queue of the Capability we own, or
2589 // we must own all Capabilities.
2590
2591 if (tso->why_blocked != BlockedOnCCall &&
2592 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2593 throwToSingleThreaded(cap,tso,NULL);
2594 }
2595 }
2596
2597 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2598 static void
2599 deleteThread_(Capability *cap, StgTSO *tso)
2600 { // for forkProcess only:
2601 // like deleteThread(), but we delete threads in foreign calls, too.
2602
2603 if (tso->why_blocked == BlockedOnCCall ||
2604 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2605 unblockOne(cap,tso);
2606 tso->what_next = ThreadKilled;
2607 } else {
2608 deleteThread(cap,tso);
2609 }
2610 }
2611 #endif
2612
2613 /* -----------------------------------------------------------------------------
2614 raiseExceptionHelper
2615
2616 This function is called by the raise# primitve, just so that we can
2617 move some of the tricky bits of raising an exception from C-- into
2618 C. Who knows, it might be a useful re-useable thing here too.
2619 -------------------------------------------------------------------------- */
2620
2621 StgWord
2622 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2623 {
2624 Capability *cap = regTableToCapability(reg);
2625 StgThunk *raise_closure = NULL;
2626 StgPtr p, next;
2627 StgRetInfoTable *info;
2628 //
2629 // This closure represents the expression 'raise# E' where E
2630 // is the exception raise. It is used to overwrite all the
2631 // thunks which are currently under evaluataion.
2632 //
2633
2634 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2635 // LDV profiling: stg_raise_info has THUNK as its closure
2636 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2637 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2638 // 1 does not cause any problem unless profiling is performed.
2639 // However, when LDV profiling goes on, we need to linearly scan
2640 // small object pool, where raise_closure is stored, so we should
2641 // use MIN_UPD_SIZE.
2642 //
2643 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2644 // sizeofW(StgClosure)+1);
2645 //
2646
2647 //
2648 // Walk up the stack, looking for the catch frame. On the way,
2649 // we update any closures pointed to from update frames with the
2650 // raise closure that we just built.
2651 //
2652 p = tso->sp;
2653 while(1) {
2654 info = get_ret_itbl((StgClosure *)p);
2655 next = p + stack_frame_sizeW((StgClosure *)p);
2656 switch (info->i.type) {
2657
2658 case UPDATE_FRAME:
2659 // Only create raise_closure if we need to.
2660 if (raise_closure == NULL) {
2661 raise_closure =
2662 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2663 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2664 raise_closure->payload[0] = exception;
2665 }
2666 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2667 p = next;
2668 continue;
2669
2670 case ATOMICALLY_FRAME:
2671 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2672 tso->sp = p;
2673 return ATOMICALLY_FRAME;
2674
2675 case CATCH_FRAME:
2676 tso->sp = p;
2677 return CATCH_FRAME;
2678
2679 case CATCH_STM_FRAME:
2680 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2681 tso->sp = p;
2682 return CATCH_STM_FRAME;
2683
2684 case STOP_FRAME:
2685 tso->sp = p;
2686 return STOP_FRAME;
2687
2688 case CATCH_RETRY_FRAME:
2689 default:
2690 p = next;
2691 continue;
2692 }
2693 }
2694 }
2695
2696
2697 /* -----------------------------------------------------------------------------
2698 findRetryFrameHelper
2699
2700 This function is called by the retry# primitive. It traverses the stack
2701 leaving tso->sp referring to the frame which should handle the retry.
2702
2703 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2704 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2705
2706 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2707 create) because retries are not considered to be exceptions, despite the
2708 similar implementation.
2709
2710 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2711 not be created within memory transactions.
2712 -------------------------------------------------------------------------- */
2713
2714 StgWord
2715 findRetryFrameHelper (StgTSO *tso)
2716 {
2717 StgPtr p, next;
2718 StgRetInfoTable *info;
2719
2720 p = tso -> sp;
2721 while (1) {
2722 info = get_ret_itbl((StgClosure *)p);
2723 next = p + stack_frame_sizeW((StgClosure *)p);
2724 switch (info->i.type) {
2725
2726 case ATOMICALLY_FRAME:
2727 debugTrace(DEBUG_stm,
2728 "found ATOMICALLY_FRAME at %p during retry", p);
2729 tso->sp = p;
2730 return ATOMICALLY_FRAME;
2731
2732 case CATCH_RETRY_FRAME:
2733 debugTrace(DEBUG_stm,
2734 "found CATCH_RETRY_FRAME at %p during retrry", p);
2735 tso->sp = p;
2736 return CATCH_RETRY_FRAME;
2737
2738 case CATCH_STM_FRAME: {
2739 StgTRecHeader *trec = tso -> trec;
2740 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2741 debugTrace(DEBUG_stm,
2742 "found CATCH_STM_FRAME at %p during retry", p);
2743 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2744 stmAbortTransaction(tso -> cap, trec);
2745 stmFreeAbortedTRec(tso -> cap, trec);
2746 tso -> trec = outer;
2747 p = next;
2748 continue;
2749 }
2750
2751
2752 default:
2753 ASSERT(info->i.type != CATCH_FRAME);
2754 ASSERT(info->i.type != STOP_FRAME);
2755 p = next;
2756 continue;
2757 }
2758 }
2759 }
2760
2761 /* -----------------------------------------------------------------------------
2762 resurrectThreads is called after garbage collection on the list of
2763 threads found to be garbage. Each of these threads will be woken
2764 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2765 on an MVar, or NonTermination if the thread was blocked on a Black
2766 Hole.
2767
2768 Locks: assumes we hold *all* the capabilities.
2769 -------------------------------------------------------------------------- */
2770
2771 void
2772 resurrectThreads (StgTSO *threads)
2773 {
2774 StgTSO *tso, *next;
2775 Capability *cap;
2776 step *step;
2777
2778 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2779 next = tso->global_link;
2780
2781 step = Bdescr((P_)tso)->step;
2782 tso->global_link = step->threads;
2783 step->threads = tso;
2784
2785 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2786
2787 // Wake up the thread on the Capability it was last on
2788 cap = tso->cap;
2789
2790 switch (tso->why_blocked) {
2791 case BlockedOnMVar:
2792 case BlockedOnException:
2793 /* Called by GC - sched_mutex lock is currently held. */
2794 throwToSingleThreaded(cap, tso,
2795 (StgClosure *)blockedOnDeadMVar_closure);
2796 break;
2797 case BlockedOnBlackHole:
2798 throwToSingleThreaded(cap, tso,
2799 (StgClosure *)nonTermination_closure);
2800 break;
2801 case BlockedOnSTM:
2802 throwToSingleThreaded(cap, tso,
2803 (StgClosure *)blockedIndefinitely_closure);
2804 break;
2805 case NotBlocked:
2806 /* This might happen if the thread was blocked on a black hole
2807 * belonging to a thread that we've just woken up (raiseAsync
2808 * can wake up threads, remember...).
2809 */
2810 continue;
2811 default:
2812 barf("resurrectThreads: thread blocked in a strange way");
2813 }
2814 }
2815 }
2816
2817 /* -----------------------------------------------------------------------------
2818 performPendingThrowTos is called after garbage collection, and
2819 passed a list of threads that were found to have pending throwTos
2820 (tso->blocked_exceptions was not empty), and were blocked.
2821 Normally this doesn't happen, because we would deliver the
2822 exception directly if the target thread is blocked, but there are
2823 small windows where it might occur on a multiprocessor (see
2824 throwTo()).
2825
2826 NB. we must be holding all the capabilities at this point, just
2827 like resurrectThreads().
2828 -------------------------------------------------------------------------- */
2829
2830 void
2831 performPendingThrowTos (StgTSO *threads)
2832 {
2833 StgTSO *tso, *next;
2834 Capability *cap;
2835 step *step;
2836
2837 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2838 next = tso->global_link;
2839
2840 step = Bdescr((P_)tso)->step;
2841 tso->global_link = step->threads;
2842 step->threads = tso;
2843
2844 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2845
2846 cap = tso->cap;
2847 maybePerformBlockedException(cap, tso);
2848 }
2849 }