Merge branch 'master' of http://darcs.haskell.org//ghc
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2006
4 *
5 * Capabilities
6 *
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29
30 #include <string.h>
31
32 // one global capability, this is the Capability for non-threaded
33 // builds, and for +RTS -N1
34 Capability MainCapability;
35
36 nat n_capabilities = 0;
37 Capability *capabilities = NULL;
38
39 // Holds the Capability which last became free. This is used so that
40 // an in-call has a chance of quickly finding a free Capability.
41 // Maintaining a global free list of Capabilities would require global
42 // locking, so we don't do that.
43 Capability *last_free_capability = NULL;
44
45 /*
46 * Indicates that the RTS wants to synchronise all the Capabilities
47 * for some reason. All Capabilities should stop and return to the
48 * scheduler.
49 */
50 volatile StgWord pending_sync = 0;
51
52 /* Let foreign code get the current Capability -- assuming there is one!
53 * This is useful for unsafe foreign calls because they are called with
54 * the current Capability held, but they are not passed it. For example,
55 * see see the integer-gmp package which calls allocate() in its
56 * stgAllocForGMP() function (which gets called by gmp functions).
57 * */
58 Capability * rts_unsafeGetMyCapability (void)
59 {
60 #if defined(THREADED_RTS)
61 return myTask()->cap;
62 #else
63 return &MainCapability;
64 #endif
65 }
66
67 #if defined(THREADED_RTS)
68 STATIC_INLINE rtsBool
69 globalWorkToDo (void)
70 {
71 return sched_state >= SCHED_INTERRUPTING
72 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
73 }
74 #endif
75
76 #if defined(THREADED_RTS)
77 StgClosure *
78 findSpark (Capability *cap)
79 {
80 Capability *robbed;
81 StgClosurePtr spark;
82 rtsBool retry;
83 nat i = 0;
84
85 if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
86 // If there are other threads, don't try to run any new
87 // sparks: sparks might be speculative, we don't want to take
88 // resources away from the main computation.
89 return 0;
90 }
91
92 do {
93 retry = rtsFalse;
94
95 // first try to get a spark from our own pool.
96 // We should be using reclaimSpark(), because it works without
97 // needing any atomic instructions:
98 // spark = reclaimSpark(cap->sparks);
99 // However, measurements show that this makes at least one benchmark
100 // slower (prsa) and doesn't affect the others.
101 spark = tryStealSpark(cap->sparks);
102 while (spark != NULL && fizzledSpark(spark)) {
103 cap->spark_stats.fizzled++;
104 traceEventSparkFizzle(cap);
105 spark = tryStealSpark(cap->sparks);
106 }
107 if (spark != NULL) {
108 cap->spark_stats.converted++;
109
110 // Post event for running a spark from capability's own pool.
111 traceEventSparkRun(cap);
112
113 return spark;
114 }
115 if (!emptySparkPoolCap(cap)) {
116 retry = rtsTrue;
117 }
118
119 if (n_capabilities == 1) { return NULL; } // makes no sense...
120
121 debugTrace(DEBUG_sched,
122 "cap %d: Trying to steal work from other capabilities",
123 cap->no);
124
125 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
126 start at a random place instead of 0 as well. */
127 for ( i=0 ; i < n_capabilities ; i++ ) {
128 robbed = &capabilities[i];
129 if (cap == robbed) // ourselves...
130 continue;
131
132 if (emptySparkPoolCap(robbed)) // nothing to steal here
133 continue;
134
135 spark = tryStealSpark(robbed->sparks);
136 while (spark != NULL && fizzledSpark(spark)) {
137 cap->spark_stats.fizzled++;
138 traceEventSparkFizzle(cap);
139 spark = tryStealSpark(robbed->sparks);
140 }
141 if (spark == NULL && !emptySparkPoolCap(robbed)) {
142 // we conflicted with another thread while trying to steal;
143 // try again later.
144 retry = rtsTrue;
145 }
146
147 if (spark != NULL) {
148 cap->spark_stats.converted++;
149 traceEventSparkSteal(cap, robbed->no);
150
151 return spark;
152 }
153 // otherwise: no success, try next one
154 }
155 } while (retry);
156
157 debugTrace(DEBUG_sched, "No sparks stolen");
158 return NULL;
159 }
160
161 // Returns True if any spark pool is non-empty at this moment in time
162 // The result is only valid for an instant, of course, so in a sense
163 // is immediately invalid, and should not be relied upon for
164 // correctness.
165 rtsBool
166 anySparks (void)
167 {
168 nat i;
169
170 for (i=0; i < n_capabilities; i++) {
171 if (!emptySparkPoolCap(&capabilities[i])) {
172 return rtsTrue;
173 }
174 }
175 return rtsFalse;
176 }
177 #endif
178
179 /* -----------------------------------------------------------------------------
180 * Manage the returning_tasks lists.
181 *
182 * These functions require cap->lock
183 * -------------------------------------------------------------------------- */
184
185 #if defined(THREADED_RTS)
186 STATIC_INLINE void
187 newReturningTask (Capability *cap, Task *task)
188 {
189 ASSERT_LOCK_HELD(&cap->lock);
190 ASSERT(task->next == NULL);
191 if (cap->returning_tasks_hd) {
192 ASSERT(cap->returning_tasks_tl->next == NULL);
193 cap->returning_tasks_tl->next = task;
194 } else {
195 cap->returning_tasks_hd = task;
196 }
197 cap->returning_tasks_tl = task;
198 }
199
200 STATIC_INLINE Task *
201 popReturningTask (Capability *cap)
202 {
203 ASSERT_LOCK_HELD(&cap->lock);
204 Task *task;
205 task = cap->returning_tasks_hd;
206 ASSERT(task);
207 cap->returning_tasks_hd = task->next;
208 if (!cap->returning_tasks_hd) {
209 cap->returning_tasks_tl = NULL;
210 }
211 task->next = NULL;
212 return task;
213 }
214 #endif
215
216 /* ----------------------------------------------------------------------------
217 * Initialisation
218 *
219 * The Capability is initially marked not free.
220 * ------------------------------------------------------------------------- */
221
222 static void
223 initCapability( Capability *cap, nat i )
224 {
225 nat g;
226
227 cap->no = i;
228 cap->in_haskell = rtsFalse;
229 cap->idle = 0;
230
231 cap->run_queue_hd = END_TSO_QUEUE;
232 cap->run_queue_tl = END_TSO_QUEUE;
233
234 #if defined(THREADED_RTS)
235 initMutex(&cap->lock);
236 cap->running_task = NULL; // indicates cap is free
237 cap->spare_workers = NULL;
238 cap->n_spare_workers = 0;
239 cap->suspended_ccalls = NULL;
240 cap->returning_tasks_hd = NULL;
241 cap->returning_tasks_tl = NULL;
242 cap->inbox = (Message*)END_TSO_QUEUE;
243 cap->sparks = allocSparkPool();
244 cap->spark_stats.created = 0;
245 cap->spark_stats.dud = 0;
246 cap->spark_stats.overflowed = 0;
247 cap->spark_stats.converted = 0;
248 cap->spark_stats.gcd = 0;
249 cap->spark_stats.fizzled = 0;
250 #endif
251
252 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
253 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
254 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
255
256 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
257 RtsFlags.GcFlags.generations,
258 "initCapability");
259 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
260 RtsFlags.GcFlags.generations,
261 "initCapability");
262
263 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
264 cap->mut_lists[g] = NULL;
265 }
266
267 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
268 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
269 cap->free_trec_chunks = END_STM_CHUNK_LIST;
270 cap->free_trec_headers = NO_TREC;
271 cap->transaction_tokens = 0;
272 cap->context_switch = 0;
273 cap->pinned_object_block = NULL;
274
275 #ifdef PROFILING
276 cap->r.rCCCS = CCS_SYSTEM;
277 #else
278 cap->r.rCCCS = NULL;
279 #endif
280
281 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
282 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
283 #if defined(THREADED_RTS)
284 traceSparkCounters(cap);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289 * Function: initCapabilities()
290 *
291 * Purpose: set up the Capability handling. For the THREADED_RTS build,
292 * we keep a table of them, the size of which is
293 * controlled by the user via the RTS flag -N.
294 *
295 * ------------------------------------------------------------------------- */
296 void
297 initCapabilities( void )
298 {
299 /* Declare a couple capability sets representing the process and
300 clock domain. Each capability will get added to these capsets. */
301 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
302 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
303
304 #if defined(THREADED_RTS)
305
306 #ifndef REG_Base
307 // We can't support multiple CPUs if BaseReg is not a register
308 if (RtsFlags.ParFlags.nNodes > 1) {
309 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
310 RtsFlags.ParFlags.nNodes = 1;
311 }
312 #endif
313
314 n_capabilities = 0;
315 moreCapabilities(0, RtsFlags.ParFlags.nNodes);
316 n_capabilities = RtsFlags.ParFlags.nNodes;
317
318 #else /* !THREADED_RTS */
319
320 n_capabilities = 1;
321 capabilities = &MainCapability;
322 initCapability(&MainCapability, 0);
323
324 #endif
325
326 // There are no free capabilities to begin with. We will start
327 // a worker Task to each Capability, which will quickly put the
328 // Capability on the free list when it finds nothing to do.
329 last_free_capability = &capabilities[0];
330 }
331
332 Capability *
333 moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
334 {
335 #if defined(THREADED_RTS)
336 nat i;
337 Capability *old_capabilities = capabilities;
338
339 if (to == 1) {
340 // THREADED_RTS must work on builds that don't have a mutable
341 // BaseReg (eg. unregisterised), so in this case
342 // capabilities[0] must coincide with &MainCapability.
343 capabilities = &MainCapability;
344 } else {
345 capabilities = stgMallocBytes(to * sizeof(Capability),
346 "moreCapabilities");
347
348 if (from > 0) {
349 memcpy(capabilities, old_capabilities, from * sizeof(Capability));
350 }
351 }
352
353 for (i = from; i < to; i++) {
354 initCapability(&capabilities[i], i);
355 }
356
357 last_free_capability = NULL;
358
359 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
360
361 // Return the old array to free later.
362 if (from > 1) {
363 return old_capabilities;
364 } else {
365 return NULL;
366 }
367 #else
368 return NULL;
369 #endif
370 }
371
372 /* ----------------------------------------------------------------------------
373 * setContextSwitches: cause all capabilities to context switch as
374 * soon as possible.
375 * ------------------------------------------------------------------------- */
376
377 void contextSwitchAllCapabilities(void)
378 {
379 nat i;
380 for (i=0; i < n_capabilities; i++) {
381 contextSwitchCapability(&capabilities[i]);
382 }
383 }
384
385 void interruptAllCapabilities(void)
386 {
387 nat i;
388 for (i=0; i < n_capabilities; i++) {
389 interruptCapability(&capabilities[i]);
390 }
391 }
392
393 /* ----------------------------------------------------------------------------
394 * Give a Capability to a Task. The task must currently be sleeping
395 * on its condition variable.
396 *
397 * Requires cap->lock (modifies cap->running_task).
398 *
399 * When migrating a Task, the migrater must take task->lock before
400 * modifying task->cap, to synchronise with the waking up Task.
401 * Additionally, the migrater should own the Capability (when
402 * migrating the run queue), or cap->lock (when migrating
403 * returning_workers).
404 *
405 * ------------------------------------------------------------------------- */
406
407 #if defined(THREADED_RTS)
408 STATIC_INLINE void
409 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
410 {
411 ASSERT_LOCK_HELD(&cap->lock);
412 ASSERT(task->cap == cap);
413 debugTrace(DEBUG_sched, "passing capability %d to %s %p",
414 cap->no, task->incall->tso ? "bound task" : "worker",
415 (void *)task->id);
416 ACQUIRE_LOCK(&task->lock);
417 if (task->wakeup == rtsFalse) {
418 task->wakeup = rtsTrue;
419 // the wakeup flag is needed because signalCondition() doesn't
420 // flag the condition if the thread is already runniing, but we want
421 // it to be sticky.
422 signalCondition(&task->cond);
423 }
424 RELEASE_LOCK(&task->lock);
425 }
426 #endif
427
428 /* ----------------------------------------------------------------------------
429 * Function: releaseCapability(Capability*)
430 *
431 * Purpose: Letting go of a capability. Causes a
432 * 'returning worker' thread or a 'waiting worker'
433 * to wake up, in that order.
434 * ------------------------------------------------------------------------- */
435
436 #if defined(THREADED_RTS)
437 void
438 releaseCapability_ (Capability* cap,
439 rtsBool always_wakeup)
440 {
441 Task *task;
442
443 task = cap->running_task;
444
445 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
446
447 cap->running_task = NULL;
448
449 // Check to see whether a worker thread can be given
450 // the go-ahead to return the result of an external call..
451 if (cap->returning_tasks_hd != NULL) {
452 giveCapabilityToTask(cap,cap->returning_tasks_hd);
453 // The Task pops itself from the queue (see waitForReturnCapability())
454 return;
455 }
456
457 // If there is a pending sync, then we should just leave the
458 // Capability free. The thread trying to sync will be about to
459 // call waitForReturnCapability().
460 if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
461 last_free_capability = cap; // needed?
462 debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
463 return;
464 }
465
466 // If the next thread on the run queue is a bound thread,
467 // give this Capability to the appropriate Task.
468 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
469 // Make sure we're not about to try to wake ourselves up
470 // ASSERT(task != cap->run_queue_hd->bound);
471 // assertion is false: in schedule() we force a yield after
472 // ThreadBlocked, but the thread may be back on the run queue
473 // by now.
474 task = cap->run_queue_hd->bound->task;
475 giveCapabilityToTask(cap,task);
476 return;
477 }
478
479 if (!cap->spare_workers) {
480 // Create a worker thread if we don't have one. If the system
481 // is interrupted, we only create a worker task if there
482 // are threads that need to be completed. If the system is
483 // shutting down, we never create a new worker.
484 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
485 debugTrace(DEBUG_sched,
486 "starting new worker on capability %d", cap->no);
487 startWorkerTask(cap);
488 return;
489 }
490 }
491
492 // If we have an unbound thread on the run queue, or if there's
493 // anything else to do, give the Capability to a worker thread.
494 if (always_wakeup ||
495 !emptyRunQueue(cap) || !emptyInbox(cap) ||
496 !emptySparkPoolCap(cap) || globalWorkToDo()) {
497 if (cap->spare_workers) {
498 giveCapabilityToTask(cap,cap->spare_workers);
499 // The worker Task pops itself from the queue;
500 return;
501 }
502 }
503
504 #ifdef PROFILING
505 cap->r.rCCCS = CCS_IDLE;
506 #endif
507 last_free_capability = cap;
508 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
509 }
510
511 void
512 releaseCapability (Capability* cap USED_IF_THREADS)
513 {
514 ACQUIRE_LOCK(&cap->lock);
515 releaseCapability_(cap, rtsFalse);
516 RELEASE_LOCK(&cap->lock);
517 }
518
519 void
520 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
521 {
522 ACQUIRE_LOCK(&cap->lock);
523 releaseCapability_(cap, rtsTrue);
524 RELEASE_LOCK(&cap->lock);
525 }
526
527 static void
528 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
529 {
530 Task *task;
531
532 ACQUIRE_LOCK(&cap->lock);
533
534 task = cap->running_task;
535
536 // If the Task is stopped, we shouldn't be yielding, we should
537 // be just exiting.
538 ASSERT(!task->stopped);
539
540 // If the current task is a worker, save it on the spare_workers
541 // list of this Capability. A worker can mark itself as stopped,
542 // in which case it is not replaced on the spare_worker queue.
543 // This happens when the system is shutting down (see
544 // Schedule.c:workerStart()).
545 if (!isBoundTask(task))
546 {
547 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
548 {
549 task->next = cap->spare_workers;
550 cap->spare_workers = task;
551 cap->n_spare_workers++;
552 }
553 else
554 {
555 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
556 cap->n_spare_workers);
557 releaseCapability_(cap,rtsFalse);
558 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
559 workerTaskStop(task);
560 RELEASE_LOCK(&cap->lock);
561 shutdownThread();
562 }
563 }
564 // Bound tasks just float around attached to their TSOs.
565
566 releaseCapability_(cap,rtsFalse);
567
568 RELEASE_LOCK(&cap->lock);
569 }
570 #endif
571
572 /* ----------------------------------------------------------------------------
573 * waitForReturnCapability (Capability **pCap, Task *task)
574 *
575 * Purpose: when an OS thread returns from an external call,
576 * it calls waitForReturnCapability() (via Schedule.resumeThread())
577 * to wait for permission to enter the RTS & communicate the
578 * result of the external call back to the Haskell thread that
579 * made it.
580 *
581 * ------------------------------------------------------------------------- */
582 void
583 waitForReturnCapability (Capability **pCap, Task *task)
584 {
585 #if !defined(THREADED_RTS)
586
587 MainCapability.running_task = task;
588 task->cap = &MainCapability;
589 *pCap = &MainCapability;
590
591 #else
592 Capability *cap = *pCap;
593
594 if (cap == NULL) {
595 // Try last_free_capability first
596 cap = last_free_capability;
597 if (cap->running_task) {
598 nat i;
599 // otherwise, search for a free capability
600 cap = NULL;
601 for (i = 0; i < n_capabilities; i++) {
602 if (!capabilities[i].running_task) {
603 cap = &capabilities[i];
604 break;
605 }
606 }
607 if (cap == NULL) {
608 // Can't find a free one, use last_free_capability.
609 cap = last_free_capability;
610 }
611 }
612
613 // record the Capability as the one this Task is now assocated with.
614 task->cap = cap;
615
616 } else {
617 ASSERT(task->cap == cap);
618 }
619
620 ACQUIRE_LOCK(&cap->lock);
621
622 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
623
624 if (!cap->running_task) {
625 // It's free; just grab it
626 cap->running_task = task;
627 RELEASE_LOCK(&cap->lock);
628 } else {
629 newReturningTask(cap,task);
630 RELEASE_LOCK(&cap->lock);
631
632 for (;;) {
633 ACQUIRE_LOCK(&task->lock);
634 // task->lock held, cap->lock not held
635 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
636 cap = task->cap;
637 task->wakeup = rtsFalse;
638 RELEASE_LOCK(&task->lock);
639
640 // now check whether we should wake up...
641 ACQUIRE_LOCK(&cap->lock);
642 if (cap->running_task == NULL) {
643 if (cap->returning_tasks_hd != task) {
644 giveCapabilityToTask(cap,cap->returning_tasks_hd);
645 RELEASE_LOCK(&cap->lock);
646 continue;
647 }
648 cap->running_task = task;
649 popReturningTask(cap);
650 RELEASE_LOCK(&cap->lock);
651 break;
652 }
653 RELEASE_LOCK(&cap->lock);
654 }
655
656 }
657
658 #ifdef PROFILING
659 cap->r.rCCCS = CCS_SYSTEM;
660 #endif
661
662 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
663
664 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
665
666 *pCap = cap;
667 #endif
668 }
669
670 #if defined(THREADED_RTS)
671 /* ----------------------------------------------------------------------------
672 * yieldCapability
673 * ------------------------------------------------------------------------- */
674
675 void
676 yieldCapability (Capability** pCap, Task *task)
677 {
678 Capability *cap = *pCap;
679
680 if (pending_sync == SYNC_GC_PAR) {
681 traceEventGcStart(cap);
682 gcWorkerThread(cap);
683 traceEventGcEnd(cap);
684 traceSparkCounters(cap);
685 return;
686 }
687
688 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
689
690 // We must now release the capability and wait to be woken up
691 // again.
692 task->wakeup = rtsFalse;
693 releaseCapabilityAndQueueWorker(cap);
694
695 for (;;) {
696 ACQUIRE_LOCK(&task->lock);
697 // task->lock held, cap->lock not held
698 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
699 cap = task->cap;
700 task->wakeup = rtsFalse;
701 RELEASE_LOCK(&task->lock);
702
703 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
704
705 ACQUIRE_LOCK(&cap->lock);
706 if (cap->running_task != NULL) {
707 debugTrace(DEBUG_sched,
708 "capability %d is owned by another task", cap->no);
709 RELEASE_LOCK(&cap->lock);
710 continue;
711 }
712
713 if (task->cap != cap) {
714 // see Note [migrated bound threads]
715 debugTrace(DEBUG_sched,
716 "task has been migrated to cap %d", task->cap->no);
717 RELEASE_LOCK(&cap->lock);
718 continue;
719 }
720
721 if (task->incall->tso == NULL) {
722 ASSERT(cap->spare_workers != NULL);
723 // if we're not at the front of the queue, release it
724 // again. This is unlikely to happen.
725 if (cap->spare_workers != task) {
726 giveCapabilityToTask(cap,cap->spare_workers);
727 RELEASE_LOCK(&cap->lock);
728 continue;
729 }
730 cap->spare_workers = task->next;
731 task->next = NULL;
732 cap->n_spare_workers--;
733 }
734
735 cap->running_task = task;
736 RELEASE_LOCK(&cap->lock);
737 break;
738 }
739
740 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
741 ASSERT(cap->running_task == task);
742
743 #ifdef PROFILING
744 cap->r.rCCCS = CCS_SYSTEM;
745 #endif
746
747 *pCap = cap;
748
749 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
750
751 return;
752 }
753
754 // Note [migrated bound threads]
755 //
756 // There's a tricky case where:
757 // - cap A is running an unbound thread T1
758 // - there is a bound thread T2 at the head of the run queue on cap A
759 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
760 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
761 // - T1 blocks, the scheduler migrates T2 to cap B
762 // - the task bound to T2 wakes up on cap B
763 //
764 // We take advantage of the following invariant:
765 //
766 // - A bound thread can only be migrated by the holder of the
767 // Capability on which the bound thread currently lives. So, if we
768 // hold Capabilty C, and task->cap == C, then task cannot be
769 // migrated under our feet.
770
771 /* ----------------------------------------------------------------------------
772 * prodCapability
773 *
774 * If a Capability is currently idle, wake up a Task on it. Used to
775 * get every Capability into the GC.
776 * ------------------------------------------------------------------------- */
777
778 void
779 prodCapability (Capability *cap, Task *task)
780 {
781 ACQUIRE_LOCK(&cap->lock);
782 if (!cap->running_task) {
783 cap->running_task = task;
784 releaseCapability_(cap,rtsTrue);
785 }
786 RELEASE_LOCK(&cap->lock);
787 }
788
789 /* ----------------------------------------------------------------------------
790 * tryGrabCapability
791 *
792 * Attempt to gain control of a Capability if it is free.
793 *
794 * ------------------------------------------------------------------------- */
795
796 rtsBool
797 tryGrabCapability (Capability *cap, Task *task)
798 {
799 if (cap->running_task != NULL) return rtsFalse;
800 ACQUIRE_LOCK(&cap->lock);
801 if (cap->running_task != NULL) {
802 RELEASE_LOCK(&cap->lock);
803 return rtsFalse;
804 }
805 task->cap = cap;
806 cap->running_task = task;
807 RELEASE_LOCK(&cap->lock);
808 return rtsTrue;
809 }
810
811
812 #endif /* THREADED_RTS */
813
814 /* ----------------------------------------------------------------------------
815 * shutdownCapability
816 *
817 * At shutdown time, we want to let everything exit as cleanly as
818 * possible. For each capability, we let its run queue drain, and
819 * allow the workers to stop.
820 *
821 * This function should be called when interrupted and
822 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
823 * will exit the scheduler and call taskStop(), and any bound thread
824 * that wakes up will return to its caller. Runnable threads are
825 * killed.
826 *
827 * ------------------------------------------------------------------------- */
828
829 void
830 shutdownCapability (Capability *cap,
831 Task *task USED_IF_THREADS,
832 rtsBool safe USED_IF_THREADS)
833 {
834 #if defined(THREADED_RTS)
835 nat i;
836
837 task->cap = cap;
838
839 // Loop indefinitely until all the workers have exited and there
840 // are no Haskell threads left. We used to bail out after 50
841 // iterations of this loop, but that occasionally left a worker
842 // running which caused problems later (the closeMutex() below
843 // isn't safe, for one thing).
844
845 for (i = 0; /* i < 50 */; i++) {
846 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
847
848 debugTrace(DEBUG_sched,
849 "shutting down capability %d, attempt %d", cap->no, i);
850 ACQUIRE_LOCK(&cap->lock);
851 if (cap->running_task) {
852 RELEASE_LOCK(&cap->lock);
853 debugTrace(DEBUG_sched, "not owner, yielding");
854 yieldThread();
855 continue;
856 }
857 cap->running_task = task;
858
859 if (cap->spare_workers) {
860 // Look for workers that have died without removing
861 // themselves from the list; this could happen if the OS
862 // summarily killed the thread, for example. This
863 // actually happens on Windows when the system is
864 // terminating the program, and the RTS is running in a
865 // DLL.
866 Task *t, *prev;
867 prev = NULL;
868 for (t = cap->spare_workers; t != NULL; t = t->next) {
869 if (!osThreadIsAlive(t->id)) {
870 debugTrace(DEBUG_sched,
871 "worker thread %p has died unexpectedly", (void *)t->id);
872 cap->n_spare_workers--;
873 if (!prev) {
874 cap->spare_workers = t->next;
875 } else {
876 prev->next = t->next;
877 }
878 prev = t;
879 }
880 }
881 }
882
883 if (!emptyRunQueue(cap) || cap->spare_workers) {
884 debugTrace(DEBUG_sched,
885 "runnable threads or workers still alive, yielding");
886 releaseCapability_(cap,rtsFalse); // this will wake up a worker
887 RELEASE_LOCK(&cap->lock);
888 yieldThread();
889 continue;
890 }
891
892 // If "safe", then busy-wait for any threads currently doing
893 // foreign calls. If we're about to unload this DLL, for
894 // example, we need to be sure that there are no OS threads
895 // that will try to return to code that has been unloaded.
896 // We can be a bit more relaxed when this is a standalone
897 // program that is about to terminate, and let safe=false.
898 if (cap->suspended_ccalls && safe) {
899 debugTrace(DEBUG_sched,
900 "thread(s) are involved in foreign calls, yielding");
901 cap->running_task = NULL;
902 RELEASE_LOCK(&cap->lock);
903 // The IO manager thread might have been slow to start up,
904 // so the first attempt to kill it might not have
905 // succeeded. Just in case, try again - the kill message
906 // will only be sent once.
907 //
908 // To reproduce this deadlock: run ffi002(threaded1)
909 // repeatedly on a loaded machine.
910 ioManagerDie();
911 yieldThread();
912 continue;
913 }
914
915 traceEventShutdown(cap);
916 RELEASE_LOCK(&cap->lock);
917 break;
918 }
919 // we now have the Capability, its run queue and spare workers
920 // list are both empty.
921
922 // ToDo: we can't drop this mutex, because there might still be
923 // threads performing foreign calls that will eventually try to
924 // return via resumeThread() and attempt to grab cap->lock.
925 // closeMutex(&cap->lock);
926
927 traceSparkCounters(cap);
928
929 #endif /* THREADED_RTS */
930
931 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
932 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
933 }
934
935 void
936 shutdownCapabilities(Task *task, rtsBool safe)
937 {
938 nat i;
939 for (i=0; i < n_capabilities; i++) {
940 ASSERT(task->incall->tso == NULL);
941 shutdownCapability(&capabilities[i], task, safe);
942 }
943 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
944 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
945
946 #if defined(THREADED_RTS)
947 ASSERT(checkSparkCountInvariant());
948 #endif
949 }
950
951 static void
952 freeCapability (Capability *cap)
953 {
954 stgFree(cap->mut_lists);
955 stgFree(cap->saved_mut_lists);
956 #if defined(THREADED_RTS)
957 freeSparkPool(cap->sparks);
958 #endif
959 }
960
961 void
962 freeCapabilities (void)
963 {
964 #if defined(THREADED_RTS)
965 nat i;
966 for (i=0; i < n_capabilities; i++) {
967 freeCapability(&capabilities[i]);
968 }
969 #else
970 freeCapability(&MainCapability);
971 #endif
972 }
973
974 /* ---------------------------------------------------------------------------
975 Mark everything directly reachable from the Capabilities. When
976 using multiple GC threads, each GC thread marks all Capabilities
977 for which (c `mod` n == 0), for Capability c and thread n.
978 ------------------------------------------------------------------------ */
979
980 void
981 markCapability (evac_fn evac, void *user, Capability *cap,
982 rtsBool no_mark_sparks USED_IF_THREADS)
983 {
984 InCall *incall;
985
986 // Each GC thread is responsible for following roots from the
987 // Capability of the same number. There will usually be the same
988 // or fewer Capabilities as GC threads, but just in case there
989 // are more, we mark every Capability whose number is the GC
990 // thread's index plus a multiple of the number of GC threads.
991 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
992 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
993 #if defined(THREADED_RTS)
994 evac(user, (StgClosure **)(void *)&cap->inbox);
995 #endif
996 for (incall = cap->suspended_ccalls; incall != NULL;
997 incall=incall->next) {
998 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
999 }
1000
1001 #if defined(THREADED_RTS)
1002 if (!no_mark_sparks) {
1003 traverseSparkQueue (evac, user, cap);
1004 }
1005 #endif
1006
1007 // Free STM structures for this Capability
1008 stmPreGCHook(cap);
1009 }
1010
1011 void
1012 markCapabilities (evac_fn evac, void *user)
1013 {
1014 nat n;
1015 for (n = 0; n < n_capabilities; n++) {
1016 markCapability(evac, user, &capabilities[n], rtsFalse);
1017 }
1018 }
1019
1020 #if defined(THREADED_RTS)
1021 rtsBool checkSparkCountInvariant (void)
1022 {
1023 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1024 StgWord64 remaining = 0;
1025 nat i;
1026
1027 for (i = 0; i < n_capabilities; i++) {
1028 sparks.created += capabilities[i].spark_stats.created;
1029 sparks.dud += capabilities[i].spark_stats.dud;
1030 sparks.overflowed+= capabilities[i].spark_stats.overflowed;
1031 sparks.converted += capabilities[i].spark_stats.converted;
1032 sparks.gcd += capabilities[i].spark_stats.gcd;
1033 sparks.fizzled += capabilities[i].spark_stats.fizzled;
1034 remaining += sparkPoolSize(capabilities[i].sparks);
1035 }
1036
1037 /* The invariant is
1038 * created = converted + remaining + gcd + fizzled
1039 */
1040 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1041 "(created == converted + remaining + gcd + fizzled)",
1042 sparks.created, sparks.converted, remaining,
1043 sparks.gcd, sparks.fizzled);
1044
1045 return (sparks.created ==
1046 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1047
1048 }
1049 #endif