rts: enable parallel GC scan of large (32M+) allocation area
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30
31 #if !defined(mingw32_HOST_OS)
32 #include "rts/IOManager.h" // for setIOManagerControlFd()
33 #endif
34
35 #include <string.h>
36
37 // one global capability, this is the Capability for non-threaded
38 // builds, and for +RTS -N1
39 Capability MainCapability;
40
41 uint32_t n_capabilities = 0;
42 uint32_t enabled_capabilities = 0;
43
44 // The array of Capabilities. It's important that when we need
45 // to allocate more Capabilities we don't have to move the existing
46 // Capabilities, because there may be pointers to them in use
47 // (e.g. threads in waitForCapability(), see #8209), so this is
48 // an array of Capability* rather than an array of Capability.
49 Capability **capabilities = NULL;
50
51 // Holds the Capability which last became free. This is used so that
52 // an in-call has a chance of quickly finding a free Capability.
53 // Maintaining a global free list of Capabilities would require global
54 // locking, so we don't do that.
55 static Capability *last_free_capability[MAX_NUMA_NODES];
56
57 /*
58 * Indicates that the RTS wants to synchronise all the Capabilities
59 * for some reason. All Capabilities should yieldCapability().
60 */
61 PendingSync * volatile pending_sync = 0;
62
63 // Number of logical NUMA nodes
64 uint32_t n_numa_nodes;
65
66 // Map logical NUMA node to OS node numbers
67 uint32_t numa_map[MAX_NUMA_NODES];
68
69 /* Let foreign code get the current Capability -- assuming there is one!
70 * This is useful for unsafe foreign calls because they are called with
71 * the current Capability held, but they are not passed it. For example,
72 * see see the integer-gmp package which calls allocate() in its
73 * stgAllocForGMP() function (which gets called by gmp functions).
74 * */
75 Capability * rts_unsafeGetMyCapability (void)
76 {
77 #if defined(THREADED_RTS)
78 return myTask()->cap;
79 #else
80 return &MainCapability;
81 #endif
82 }
83
84 #if defined(THREADED_RTS)
85 STATIC_INLINE rtsBool
86 globalWorkToDo (void)
87 {
88 return sched_state >= SCHED_INTERRUPTING
89 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
90 }
91 #endif
92
93 #if defined(THREADED_RTS)
94 StgClosure *
95 findSpark (Capability *cap)
96 {
97 Capability *robbed;
98 StgClosurePtr spark;
99 rtsBool retry;
100 uint32_t i = 0;
101
102 if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
103 // If there are other threads, don't try to run any new
104 // sparks: sparks might be speculative, we don't want to take
105 // resources away from the main computation.
106 return 0;
107 }
108
109 do {
110 retry = rtsFalse;
111
112 // first try to get a spark from our own pool.
113 // We should be using reclaimSpark(), because it works without
114 // needing any atomic instructions:
115 // spark = reclaimSpark(cap->sparks);
116 // However, measurements show that this makes at least one benchmark
117 // slower (prsa) and doesn't affect the others.
118 spark = tryStealSpark(cap->sparks);
119 while (spark != NULL && fizzledSpark(spark)) {
120 cap->spark_stats.fizzled++;
121 traceEventSparkFizzle(cap);
122 spark = tryStealSpark(cap->sparks);
123 }
124 if (spark != NULL) {
125 cap->spark_stats.converted++;
126
127 // Post event for running a spark from capability's own pool.
128 traceEventSparkRun(cap);
129
130 return spark;
131 }
132 if (!emptySparkPoolCap(cap)) {
133 retry = rtsTrue;
134 }
135
136 if (n_capabilities == 1) { return NULL; } // makes no sense...
137
138 debugTrace(DEBUG_sched,
139 "cap %d: Trying to steal work from other capabilities",
140 cap->no);
141
142 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
143 start at a random place instead of 0 as well. */
144 for ( i=0 ; i < n_capabilities ; i++ ) {
145 robbed = capabilities[i];
146 if (cap == robbed) // ourselves...
147 continue;
148
149 if (emptySparkPoolCap(robbed)) // nothing to steal here
150 continue;
151
152 spark = tryStealSpark(robbed->sparks);
153 while (spark != NULL && fizzledSpark(spark)) {
154 cap->spark_stats.fizzled++;
155 traceEventSparkFizzle(cap);
156 spark = tryStealSpark(robbed->sparks);
157 }
158 if (spark == NULL && !emptySparkPoolCap(robbed)) {
159 // we conflicted with another thread while trying to steal;
160 // try again later.
161 retry = rtsTrue;
162 }
163
164 if (spark != NULL) {
165 cap->spark_stats.converted++;
166 traceEventSparkSteal(cap, robbed->no);
167
168 return spark;
169 }
170 // otherwise: no success, try next one
171 }
172 } while (retry);
173
174 debugTrace(DEBUG_sched, "No sparks stolen");
175 return NULL;
176 }
177
178 // Returns True if any spark pool is non-empty at this moment in time
179 // The result is only valid for an instant, of course, so in a sense
180 // is immediately invalid, and should not be relied upon for
181 // correctness.
182 rtsBool
183 anySparks (void)
184 {
185 uint32_t i;
186
187 for (i=0; i < n_capabilities; i++) {
188 if (!emptySparkPoolCap(capabilities[i])) {
189 return rtsTrue;
190 }
191 }
192 return rtsFalse;
193 }
194 #endif
195
196 /* -----------------------------------------------------------------------------
197 * Manage the returning_tasks lists.
198 *
199 * These functions require cap->lock
200 * -------------------------------------------------------------------------- */
201
202 #if defined(THREADED_RTS)
203 STATIC_INLINE void
204 newReturningTask (Capability *cap, Task *task)
205 {
206 ASSERT_LOCK_HELD(&cap->lock);
207 ASSERT(task->next == NULL);
208 if (cap->returning_tasks_hd) {
209 ASSERT(cap->returning_tasks_tl->next == NULL);
210 cap->returning_tasks_tl->next = task;
211 } else {
212 cap->returning_tasks_hd = task;
213 }
214 cap->returning_tasks_tl = task;
215 cap->n_returning_tasks++;
216 ASSERT_RETURNING_TASKS(cap,task);
217 }
218
219 STATIC_INLINE Task *
220 popReturningTask (Capability *cap)
221 {
222 ASSERT_LOCK_HELD(&cap->lock);
223 Task *task;
224 task = cap->returning_tasks_hd;
225 ASSERT(task);
226 cap->returning_tasks_hd = task->next;
227 if (!cap->returning_tasks_hd) {
228 cap->returning_tasks_tl = NULL;
229 }
230 task->next = NULL;
231 cap->n_returning_tasks--;
232 ASSERT_RETURNING_TASKS(cap,task);
233 return task;
234 }
235 #endif
236
237 /* ----------------------------------------------------------------------------
238 * Initialisation
239 *
240 * The Capability is initially marked not free.
241 * ------------------------------------------------------------------------- */
242
243 static void
244 initCapability (Capability *cap, uint32_t i)
245 {
246 uint32_t g;
247
248 cap->no = i;
249 cap->node = capNoToNumaNode(i);
250 cap->in_haskell = rtsFalse;
251 cap->idle = 0;
252 cap->disabled = rtsFalse;
253
254 cap->run_queue_hd = END_TSO_QUEUE;
255 cap->run_queue_tl = END_TSO_QUEUE;
256 cap->n_run_queue = 0;
257
258 #if defined(THREADED_RTS)
259 initMutex(&cap->lock);
260 cap->running_task = NULL; // indicates cap is free
261 cap->spare_workers = NULL;
262 cap->n_spare_workers = 0;
263 cap->suspended_ccalls = NULL;
264 cap->n_suspended_ccalls = 0;
265 cap->returning_tasks_hd = NULL;
266 cap->returning_tasks_tl = NULL;
267 cap->n_returning_tasks = 0;
268 cap->inbox = (Message*)END_TSO_QUEUE;
269 cap->sparks = allocSparkPool();
270 cap->spark_stats.created = 0;
271 cap->spark_stats.dud = 0;
272 cap->spark_stats.overflowed = 0;
273 cap->spark_stats.converted = 0;
274 cap->spark_stats.gcd = 0;
275 cap->spark_stats.fizzled = 0;
276 #if !defined(mingw32_HOST_OS)
277 cap->io_manager_control_wr_fd = -1;
278 #endif
279 #endif
280 cap->total_allocated = 0;
281
282 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
283 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
284 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
285
286 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
287 RtsFlags.GcFlags.generations,
288 "initCapability");
289 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
290 RtsFlags.GcFlags.generations,
291 "initCapability");
292
293 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
294 cap->mut_lists[g] = NULL;
295 }
296
297 cap->weak_ptr_list_hd = NULL;
298 cap->weak_ptr_list_tl = NULL;
299 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
300 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
301 cap->free_trec_chunks = END_STM_CHUNK_LIST;
302 cap->free_trec_headers = NO_TREC;
303 cap->transaction_tokens = 0;
304 cap->context_switch = 0;
305 cap->pinned_object_block = NULL;
306 cap->pinned_object_blocks = NULL;
307
308 #ifdef PROFILING
309 cap->r.rCCCS = CCS_SYSTEM;
310 #else
311 cap->r.rCCCS = NULL;
312 #endif
313
314 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
315 // don't want it set when not running a Haskell thread.
316 cap->r.rCurrentTSO = NULL;
317
318 traceCapCreate(cap);
319 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
320 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
321 #if defined(THREADED_RTS)
322 traceSparkCounters(cap);
323 #endif
324 }
325
326 /* ---------------------------------------------------------------------------
327 * Function: initCapabilities()
328 *
329 * Purpose: set up the Capability handling. For the THREADED_RTS build,
330 * we keep a table of them, the size of which is
331 * controlled by the user via the RTS flag -N.
332 *
333 * ------------------------------------------------------------------------- */
334 void initCapabilities (void)
335 {
336 uint32_t i;
337
338 /* Declare a couple capability sets representing the process and
339 clock domain. Each capability will get added to these capsets. */
340 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
341 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
342
343 // Initialise NUMA
344 if (!RtsFlags.GcFlags.numa) {
345 n_numa_nodes = 1;
346 for (i = 0; i < MAX_NUMA_NODES; i++) {
347 numa_map[i] = 0;
348 }
349 } else {
350 uint32_t nNodes = osNumaNodes();
351 if (nNodes > MAX_NUMA_NODES) {
352 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
353 }
354 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
355 uint32_t logical = 0, physical = 0;
356 for (; physical < MAX_NUMA_NODES; physical++) {
357 if (mask & 1) {
358 numa_map[logical++] = physical;
359 }
360 mask = mask >> 1;
361 }
362 n_numa_nodes = logical;
363 if (logical == 0) {
364 barf("%s: available NUMA node set is empty");
365 }
366 }
367
368 #if defined(THREADED_RTS)
369
370 #ifndef REG_Base
371 // We can't support multiple CPUs if BaseReg is not a register
372 if (RtsFlags.ParFlags.nCapabilities > 1) {
373 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
374 RtsFlags.ParFlags.nCapabilities = 1;
375 }
376 #endif
377
378 n_capabilities = 0;
379 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
380 n_capabilities = RtsFlags.ParFlags.nCapabilities;
381
382 #else /* !THREADED_RTS */
383
384 n_capabilities = 1;
385 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
386 capabilities[0] = &MainCapability;
387
388 initCapability(&MainCapability, 0);
389
390 #endif
391
392 enabled_capabilities = n_capabilities;
393
394 // There are no free capabilities to begin with. We will start
395 // a worker Task to each Capability, which will quickly put the
396 // Capability on the free list when it finds nothing to do.
397 for (i = 0; i < n_numa_nodes; i++) {
398 last_free_capability[i] = capabilities[0];
399 }
400 }
401
402 void
403 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
404 {
405 #if defined(THREADED_RTS)
406 uint32_t i;
407 Capability **old_capabilities = capabilities;
408
409 capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
410
411 if (to == 1) {
412 // THREADED_RTS must work on builds that don't have a mutable
413 // BaseReg (eg. unregisterised), so in this case
414 // capabilities[0] must coincide with &MainCapability.
415 capabilities[0] = &MainCapability;
416 initCapability(&MainCapability, 0);
417 }
418 else
419 {
420 for (i = 0; i < to; i++) {
421 if (i < from) {
422 capabilities[i] = old_capabilities[i];
423 } else {
424 capabilities[i] = stgMallocBytes(sizeof(Capability),
425 "moreCapabilities");
426 initCapability(capabilities[i], i);
427 }
428 }
429 }
430
431 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
432
433 if (old_capabilities != NULL) {
434 stgFree(old_capabilities);
435 }
436 #endif
437 }
438
439 /* ----------------------------------------------------------------------------
440 * setContextSwitches: cause all capabilities to context switch as
441 * soon as possible.
442 * ------------------------------------------------------------------------- */
443
444 void contextSwitchAllCapabilities(void)
445 {
446 uint32_t i;
447 for (i=0; i < n_capabilities; i++) {
448 contextSwitchCapability(capabilities[i]);
449 }
450 }
451
452 void interruptAllCapabilities(void)
453 {
454 uint32_t i;
455 for (i=0; i < n_capabilities; i++) {
456 interruptCapability(capabilities[i]);
457 }
458 }
459
460 /* ----------------------------------------------------------------------------
461 * Give a Capability to a Task. The task must currently be sleeping
462 * on its condition variable.
463 *
464 * Requires cap->lock (modifies cap->running_task).
465 *
466 * When migrating a Task, the migrater must take task->lock before
467 * modifying task->cap, to synchronise with the waking up Task.
468 * Additionally, the migrater should own the Capability (when
469 * migrating the run queue), or cap->lock (when migrating
470 * returning_workers).
471 *
472 * ------------------------------------------------------------------------- */
473
474 #if defined(THREADED_RTS)
475 static void
476 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
477 {
478 ASSERT_LOCK_HELD(&cap->lock);
479 ASSERT(task->cap == cap);
480 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
481 cap->no, task->incall->tso ? "bound task" : "worker",
482 serialisableTaskId(task));
483 ACQUIRE_LOCK(&task->lock);
484 if (task->wakeup == rtsFalse) {
485 task->wakeup = rtsTrue;
486 // the wakeup flag is needed because signalCondition() doesn't
487 // flag the condition if the thread is already runniing, but we want
488 // it to be sticky.
489 signalCondition(&task->cond);
490 }
491 RELEASE_LOCK(&task->lock);
492 }
493 #endif
494
495 /* ----------------------------------------------------------------------------
496 * releaseCapability
497 *
498 * The current Task (cap->task) releases the Capability. The Capability is
499 * marked free, and if there is any work to do, an appropriate Task is woken up.
500 * ------------------------------------------------------------------------- */
501
502 #if defined(THREADED_RTS)
503 void
504 releaseCapability_ (Capability* cap,
505 rtsBool always_wakeup)
506 {
507 Task *task;
508
509 task = cap->running_task;
510
511 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
512 ASSERT_RETURNING_TASKS(cap,task);
513
514 cap->running_task = NULL;
515
516 // Check to see whether a worker thread can be given
517 // the go-ahead to return the result of an external call..
518 if (cap->n_returning_tasks != 0) {
519 giveCapabilityToTask(cap,cap->returning_tasks_hd);
520 // The Task pops itself from the queue (see waitForCapability())
521 return;
522 }
523
524 // If there is a pending sync, then we should just leave the Capability
525 // free. The thread trying to sync will be about to call
526 // waitForCapability().
527 //
528 // Note: this is *after* we check for a returning task above,
529 // because the task attempting to acquire all the capabilities may
530 // be currently in waitForCapability() waiting for this
531 // capability, in which case simply setting it as free would not
532 // wake up the waiting task.
533 PendingSync *sync = pending_sync;
534 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
535 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
536 return;
537 }
538
539 // If the next thread on the run queue is a bound thread,
540 // give this Capability to the appropriate Task.
541 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
542 // Make sure we're not about to try to wake ourselves up
543 // ASSERT(task != cap->run_queue_hd->bound);
544 // assertion is false: in schedule() we force a yield after
545 // ThreadBlocked, but the thread may be back on the run queue
546 // by now.
547 task = peekRunQueue(cap)->bound->task;
548 giveCapabilityToTask(cap, task);
549 return;
550 }
551
552 if (!cap->spare_workers) {
553 // Create a worker thread if we don't have one. If the system
554 // is interrupted, we only create a worker task if there
555 // are threads that need to be completed. If the system is
556 // shutting down, we never create a new worker.
557 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
558 debugTrace(DEBUG_sched,
559 "starting new worker on capability %d", cap->no);
560 startWorkerTask(cap);
561 return;
562 }
563 }
564
565 // If we have an unbound thread on the run queue, or if there's
566 // anything else to do, give the Capability to a worker thread.
567 if (always_wakeup ||
568 !emptyRunQueue(cap) || !emptyInbox(cap) ||
569 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
570 if (cap->spare_workers) {
571 giveCapabilityToTask(cap, cap->spare_workers);
572 // The worker Task pops itself from the queue;
573 return;
574 }
575 }
576
577 #ifdef PROFILING
578 cap->r.rCCCS = CCS_IDLE;
579 #endif
580 last_free_capability[cap->node] = cap;
581 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
582 }
583
584 void
585 releaseCapability (Capability* cap USED_IF_THREADS)
586 {
587 ACQUIRE_LOCK(&cap->lock);
588 releaseCapability_(cap, rtsFalse);
589 RELEASE_LOCK(&cap->lock);
590 }
591
592 void
593 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
594 {
595 ACQUIRE_LOCK(&cap->lock);
596 releaseCapability_(cap, rtsTrue);
597 RELEASE_LOCK(&cap->lock);
598 }
599
600 static void
601 enqueueWorker (Capability* cap USED_IF_THREADS)
602 {
603 Task *task;
604
605 task = cap->running_task;
606
607 // If the Task is stopped, we shouldn't be yielding, we should
608 // be just exiting.
609 ASSERT(!task->stopped);
610 ASSERT(task->worker);
611
612 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
613 {
614 task->next = cap->spare_workers;
615 cap->spare_workers = task;
616 cap->n_spare_workers++;
617 }
618 else
619 {
620 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
621 cap->n_spare_workers);
622 releaseCapability_(cap,rtsFalse);
623 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
624 workerTaskStop(task);
625 RELEASE_LOCK(&cap->lock);
626 shutdownThread();
627 }
628 }
629
630 #endif
631
632 /* ----------------------------------------------------------------------------
633 * waitForWorkerCapability(task)
634 *
635 * waits to be given a Capability, and then returns the Capability. The task
636 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
637 * ------------------------------------------------------------------------- */
638
639 #if defined(THREADED_RTS)
640
641 static Capability * waitForWorkerCapability (Task *task)
642 {
643 Capability *cap;
644
645 for (;;) {
646 ACQUIRE_LOCK(&task->lock);
647 // task->lock held, cap->lock not held
648 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
649 cap = task->cap;
650 task->wakeup = rtsFalse;
651 RELEASE_LOCK(&task->lock);
652
653 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
654
655 ACQUIRE_LOCK(&cap->lock);
656 if (cap->running_task != NULL) {
657 debugTrace(DEBUG_sched,
658 "capability %d is owned by another task", cap->no);
659 RELEASE_LOCK(&cap->lock);
660 continue;
661 }
662
663 if (task->cap != cap) {
664 // see Note [migrated bound threads]
665 debugTrace(DEBUG_sched,
666 "task has been migrated to cap %d", task->cap->no);
667 RELEASE_LOCK(&cap->lock);
668 continue;
669 }
670
671 if (task->incall->tso == NULL) {
672 ASSERT(cap->spare_workers != NULL);
673 // if we're not at the front of the queue, release it
674 // again. This is unlikely to happen.
675 if (cap->spare_workers != task) {
676 giveCapabilityToTask(cap,cap->spare_workers);
677 RELEASE_LOCK(&cap->lock);
678 continue;
679 }
680 cap->spare_workers = task->next;
681 task->next = NULL;
682 cap->n_spare_workers--;
683 }
684
685 cap->running_task = task;
686 RELEASE_LOCK(&cap->lock);
687 break;
688 }
689
690 return cap;
691 }
692
693 #endif /* THREADED_RTS */
694
695 /* ----------------------------------------------------------------------------
696 * waitForReturnCapability (Task *task)
697 *
698 * The Task should be on the cap->returning_tasks queue of a Capability. This
699 * function waits for the Task to be woken up, and returns the Capability that
700 * it was woken up on.
701 *
702 * ------------------------------------------------------------------------- */
703
704 #if defined(THREADED_RTS)
705
706 static Capability * waitForReturnCapability (Task *task)
707 {
708 Capability *cap;
709
710 for (;;) {
711 ACQUIRE_LOCK(&task->lock);
712 // task->lock held, cap->lock not held
713 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
714 cap = task->cap;
715 task->wakeup = rtsFalse;
716 RELEASE_LOCK(&task->lock);
717
718 // now check whether we should wake up...
719 ACQUIRE_LOCK(&cap->lock);
720 if (cap->running_task == NULL) {
721 if (cap->returning_tasks_hd != task) {
722 giveCapabilityToTask(cap,cap->returning_tasks_hd);
723 RELEASE_LOCK(&cap->lock);
724 continue;
725 }
726 cap->running_task = task;
727 popReturningTask(cap);
728 RELEASE_LOCK(&cap->lock);
729 break;
730 }
731 RELEASE_LOCK(&cap->lock);
732 }
733
734 return cap;
735 }
736
737 #endif /* THREADED_RTS */
738
739 /* ----------------------------------------------------------------------------
740 * waitForCapability (Capability **pCap, Task *task)
741 *
742 * Purpose: when an OS thread returns from an external call,
743 * it calls waitForCapability() (via Schedule.resumeThread())
744 * to wait for permission to enter the RTS & communicate the
745 * result of the external call back to the Haskell thread that
746 * made it.
747 *
748 * ------------------------------------------------------------------------- */
749
750 void waitForCapability (Capability **pCap, Task *task)
751 {
752 #if !defined(THREADED_RTS)
753
754 MainCapability.running_task = task;
755 task->cap = &MainCapability;
756 *pCap = &MainCapability;
757
758 #else
759 uint32_t i;
760 Capability *cap = *pCap;
761
762 if (cap == NULL) {
763 if (task->preferred_capability != -1) {
764 cap = capabilities[task->preferred_capability %
765 enabled_capabilities];
766 } else {
767 // Try last_free_capability first
768 cap = last_free_capability[task->node];
769 if (cap->running_task) {
770 // Otherwise, search for a free capability on this node.
771 cap = NULL;
772 for (i = task->node; i < enabled_capabilities;
773 i += n_numa_nodes) {
774 // visits all the capabilities on this node, because
775 // cap[i]->node == i % n_numa_nodes
776 if (!capabilities[i]->running_task) {
777 cap = capabilities[i];
778 break;
779 }
780 }
781 if (cap == NULL) {
782 // Can't find a free one, use last_free_capability.
783 cap = last_free_capability[task->node];
784 }
785 }
786 }
787
788 // record the Capability as the one this Task is now assocated with.
789 task->cap = cap;
790
791 } else {
792 ASSERT(task->cap == cap);
793 }
794
795 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
796
797 ACQUIRE_LOCK(&cap->lock);
798 if (!cap->running_task) {
799 // It's free; just grab it
800 cap->running_task = task;
801 RELEASE_LOCK(&cap->lock);
802 } else {
803 newReturningTask(cap,task);
804 RELEASE_LOCK(&cap->lock);
805 cap = waitForReturnCapability(task);
806 }
807
808 #ifdef PROFILING
809 cap->r.rCCCS = CCS_SYSTEM;
810 #endif
811
812 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
813
814 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
815
816 *pCap = cap;
817 #endif
818 }
819
820 /* ----------------------------------------------------------------------------
821 * yieldCapability
822 *
823 * Give up the Capability, and return when we have it again. This is called
824 * when either we know that the Capability should be given to another Task, or
825 * there is nothing to do right now. One of the following is true:
826 *
827 * - The current Task is a worker, and there's a bound thread at the head of
828 * the run queue (or vice versa)
829 *
830 * - The run queue is empty. We'll be woken up again when there's work to
831 * do.
832 *
833 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
834 * We should become a GC worker for a while.
835 *
836 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
837 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
838 * setNumCapabilities. We should give up the Capability temporarily.
839 *
840 * ------------------------------------------------------------------------- */
841
842 #if defined (THREADED_RTS)
843
844 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
845 and return the rtsBool */
846 rtsBool /* Did we GC? */
847 yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
848 {
849 Capability *cap = *pCap;
850
851 if (gcAllowed)
852 {
853 PendingSync *sync = pending_sync;
854
855 if (sync && sync->type == SYNC_GC_PAR) {
856 if (! sync->idle[cap->no]) {
857 traceEventGcStart(cap);
858 gcWorkerThread(cap);
859 traceEventGcEnd(cap);
860 traceSparkCounters(cap);
861 // See Note [migrated bound threads 2]
862 if (task->cap == cap) {
863 return rtsTrue;
864 }
865 }
866 }
867 }
868
869 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
870
871 // We must now release the capability and wait to be woken up again.
872 task->wakeup = rtsFalse;
873
874 ACQUIRE_LOCK(&cap->lock);
875
876 // If this is a worker thread, put it on the spare_workers queue
877 if (isWorker(task)) {
878 enqueueWorker(cap);
879 }
880
881 releaseCapability_(cap, rtsFalse);
882
883 if (isWorker(task) || isBoundTask(task)) {
884 RELEASE_LOCK(&cap->lock);
885 cap = waitForWorkerCapability(task);
886 } else {
887 // Not a worker Task, or a bound Task. The only way we can be woken up
888 // again is to put ourselves on the returning_tasks queue, so that's
889 // what we do. We still hold cap->lock at this point
890 // The Task waiting for this Capability does not have it
891 // yet, so we can be sure to be woken up later. (see #10545)
892 newReturningTask(cap,task);
893 RELEASE_LOCK(&cap->lock);
894 cap = waitForReturnCapability(task);
895 }
896
897 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
898 ASSERT(cap->running_task == task);
899
900 #ifdef PROFILING
901 cap->r.rCCCS = CCS_SYSTEM;
902 #endif
903
904 *pCap = cap;
905
906 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
907
908 return rtsFalse;
909 }
910
911 #endif /* THREADED_RTS */
912
913 // Note [migrated bound threads]
914 //
915 // There's a tricky case where:
916 // - cap A is running an unbound thread T1
917 // - there is a bound thread T2 at the head of the run queue on cap A
918 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
919 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
920 // - T1 blocks, the scheduler migrates T2 to cap B
921 // - the task bound to T2 wakes up on cap B
922 //
923 // We take advantage of the following invariant:
924 //
925 // - A bound thread can only be migrated by the holder of the
926 // Capability on which the bound thread currently lives. So, if we
927 // hold Capability C, and task->cap == C, then task cannot be
928 // migrated under our feet.
929
930 // Note [migrated bound threads 2]
931 //
932 // Second tricky case;
933 // - A bound Task becomes a GC thread
934 // - scheduleDoGC() migrates the thread belonging to this Task,
935 // because the Capability it is on is disabled
936 // - after GC, gcWorkerThread() returns, but now we are
937 // holding a Capability that is not the same as task->cap
938 // - Hence we must check for this case and immediately give up the
939 // cap we hold.
940
941 /* ----------------------------------------------------------------------------
942 * prodCapability
943 *
944 * If a Capability is currently idle, wake up a Task on it. Used to
945 * get every Capability into the GC.
946 * ------------------------------------------------------------------------- */
947
948 #if defined (THREADED_RTS)
949
950 void
951 prodCapability (Capability *cap, Task *task)
952 {
953 ACQUIRE_LOCK(&cap->lock);
954 if (!cap->running_task) {
955 cap->running_task = task;
956 releaseCapability_(cap,rtsTrue);
957 }
958 RELEASE_LOCK(&cap->lock);
959 }
960
961 #endif /* THREADED_RTS */
962
963 /* ----------------------------------------------------------------------------
964 * tryGrabCapability
965 *
966 * Attempt to gain control of a Capability if it is free.
967 *
968 * ------------------------------------------------------------------------- */
969
970 #if defined (THREADED_RTS)
971
972 rtsBool
973 tryGrabCapability (Capability *cap, Task *task)
974 {
975 if (cap->running_task != NULL) return rtsFalse;
976 ACQUIRE_LOCK(&cap->lock);
977 if (cap->running_task != NULL) {
978 RELEASE_LOCK(&cap->lock);
979 return rtsFalse;
980 }
981 task->cap = cap;
982 cap->running_task = task;
983 RELEASE_LOCK(&cap->lock);
984 return rtsTrue;
985 }
986
987
988 #endif /* THREADED_RTS */
989
990 /* ----------------------------------------------------------------------------
991 * shutdownCapability
992 *
993 * At shutdown time, we want to let everything exit as cleanly as
994 * possible. For each capability, we let its run queue drain, and
995 * allow the workers to stop.
996 *
997 * This function should be called when interrupted and
998 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
999 * will exit the scheduler and call taskStop(), and any bound thread
1000 * that wakes up will return to its caller. Runnable threads are
1001 * killed.
1002 *
1003 * ------------------------------------------------------------------------- */
1004
1005 static void
1006 shutdownCapability (Capability *cap USED_IF_THREADS,
1007 Task *task USED_IF_THREADS,
1008 rtsBool safe USED_IF_THREADS)
1009 {
1010 #if defined(THREADED_RTS)
1011 uint32_t i;
1012
1013 task->cap = cap;
1014
1015 // Loop indefinitely until all the workers have exited and there
1016 // are no Haskell threads left. We used to bail out after 50
1017 // iterations of this loop, but that occasionally left a worker
1018 // running which caused problems later (the closeMutex() below
1019 // isn't safe, for one thing).
1020
1021 for (i = 0; /* i < 50 */; i++) {
1022 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1023
1024 debugTrace(DEBUG_sched,
1025 "shutting down capability %d, attempt %d", cap->no, i);
1026 ACQUIRE_LOCK(&cap->lock);
1027 if (cap->running_task) {
1028 RELEASE_LOCK(&cap->lock);
1029 debugTrace(DEBUG_sched, "not owner, yielding");
1030 yieldThread();
1031 continue;
1032 }
1033 cap->running_task = task;
1034
1035 if (cap->spare_workers) {
1036 // Look for workers that have died without removing
1037 // themselves from the list; this could happen if the OS
1038 // summarily killed the thread, for example. This
1039 // actually happens on Windows when the system is
1040 // terminating the program, and the RTS is running in a
1041 // DLL.
1042 Task *t, *prev;
1043 prev = NULL;
1044 for (t = cap->spare_workers; t != NULL; t = t->next) {
1045 if (!osThreadIsAlive(t->id)) {
1046 debugTrace(DEBUG_sched,
1047 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1048 cap->n_spare_workers--;
1049 if (!prev) {
1050 cap->spare_workers = t->next;
1051 } else {
1052 prev->next = t->next;
1053 }
1054 prev = t;
1055 }
1056 }
1057 }
1058
1059 if (!emptyRunQueue(cap) || cap->spare_workers) {
1060 debugTrace(DEBUG_sched,
1061 "runnable threads or workers still alive, yielding");
1062 releaseCapability_(cap,rtsFalse); // this will wake up a worker
1063 RELEASE_LOCK(&cap->lock);
1064 yieldThread();
1065 continue;
1066 }
1067
1068 // If "safe", then busy-wait for any threads currently doing
1069 // foreign calls. If we're about to unload this DLL, for
1070 // example, we need to be sure that there are no OS threads
1071 // that will try to return to code that has been unloaded.
1072 // We can be a bit more relaxed when this is a standalone
1073 // program that is about to terminate, and let safe=false.
1074 if (cap->suspended_ccalls && safe) {
1075 debugTrace(DEBUG_sched,
1076 "thread(s) are involved in foreign calls, yielding");
1077 cap->running_task = NULL;
1078 RELEASE_LOCK(&cap->lock);
1079 // The IO manager thread might have been slow to start up,
1080 // so the first attempt to kill it might not have
1081 // succeeded. Just in case, try again - the kill message
1082 // will only be sent once.
1083 //
1084 // To reproduce this deadlock: run ffi002(threaded1)
1085 // repeatedly on a loaded machine.
1086 ioManagerDie();
1087 yieldThread();
1088 continue;
1089 }
1090
1091 traceSparkCounters(cap);
1092 RELEASE_LOCK(&cap->lock);
1093 break;
1094 }
1095 // we now have the Capability, its run queue and spare workers
1096 // list are both empty.
1097
1098 // ToDo: we can't drop this mutex, because there might still be
1099 // threads performing foreign calls that will eventually try to
1100 // return via resumeThread() and attempt to grab cap->lock.
1101 // closeMutex(&cap->lock);
1102 #endif
1103 }
1104
1105 void
1106 shutdownCapabilities(Task *task, rtsBool safe)
1107 {
1108 uint32_t i;
1109 for (i=0; i < n_capabilities; i++) {
1110 ASSERT(task->incall->tso == NULL);
1111 shutdownCapability(capabilities[i], task, safe);
1112 }
1113 #if defined(THREADED_RTS)
1114 ASSERT(checkSparkCountInvariant());
1115 #endif
1116 }
1117
1118 static void
1119 freeCapability (Capability *cap)
1120 {
1121 stgFree(cap->mut_lists);
1122 stgFree(cap->saved_mut_lists);
1123 #if defined(THREADED_RTS)
1124 freeSparkPool(cap->sparks);
1125 #endif
1126 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1127 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1128 traceCapDelete(cap);
1129 }
1130
1131 void
1132 freeCapabilities (void)
1133 {
1134 #if defined(THREADED_RTS)
1135 uint32_t i;
1136 for (i=0; i < n_capabilities; i++) {
1137 freeCapability(capabilities[i]);
1138 if (capabilities[i] != &MainCapability)
1139 stgFree(capabilities[i]);
1140 }
1141 #else
1142 freeCapability(&MainCapability);
1143 #endif
1144 stgFree(capabilities);
1145 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1146 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1147 }
1148
1149 /* ---------------------------------------------------------------------------
1150 Mark everything directly reachable from the Capabilities. When
1151 using multiple GC threads, each GC thread marks all Capabilities
1152 for which (c `mod` n == 0), for Capability c and thread n.
1153 ------------------------------------------------------------------------ */
1154
1155 void
1156 markCapability (evac_fn evac, void *user, Capability *cap,
1157 rtsBool no_mark_sparks USED_IF_THREADS)
1158 {
1159 InCall *incall;
1160
1161 // Each GC thread is responsible for following roots from the
1162 // Capability of the same number. There will usually be the same
1163 // or fewer Capabilities as GC threads, but just in case there
1164 // are more, we mark every Capability whose number is the GC
1165 // thread's index plus a multiple of the number of GC threads.
1166 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1167 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1168 #if defined(THREADED_RTS)
1169 evac(user, (StgClosure **)(void *)&cap->inbox);
1170 #endif
1171 for (incall = cap->suspended_ccalls; incall != NULL;
1172 incall=incall->next) {
1173 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1174 }
1175
1176 #if defined(THREADED_RTS)
1177 if (!no_mark_sparks) {
1178 traverseSparkQueue (evac, user, cap);
1179 }
1180 #endif
1181
1182 // Free STM structures for this Capability
1183 stmPreGCHook(cap);
1184 }
1185
1186 void
1187 markCapabilities (evac_fn evac, void *user)
1188 {
1189 uint32_t n;
1190 for (n = 0; n < n_capabilities; n++) {
1191 markCapability(evac, user, capabilities[n], rtsFalse);
1192 }
1193 }
1194
1195 #if defined(THREADED_RTS)
1196 rtsBool checkSparkCountInvariant (void)
1197 {
1198 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1199 StgWord64 remaining = 0;
1200 uint32_t i;
1201
1202 for (i = 0; i < n_capabilities; i++) {
1203 sparks.created += capabilities[i]->spark_stats.created;
1204 sparks.dud += capabilities[i]->spark_stats.dud;
1205 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1206 sparks.converted += capabilities[i]->spark_stats.converted;
1207 sparks.gcd += capabilities[i]->spark_stats.gcd;
1208 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1209 remaining += sparkPoolSize(capabilities[i]->sparks);
1210 }
1211
1212 /* The invariant is
1213 * created = converted + remaining + gcd + fizzled
1214 */
1215 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1216 "(created == converted + remaining + gcd + fizzled)",
1217 sparks.created, sparks.converted, remaining,
1218 sparks.gcd, sparks.fizzled);
1219
1220 return (sparks.created ==
1221 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1222
1223 }
1224 #endif
1225
1226 #if !defined(mingw32_HOST_OS)
1227 void
1228 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1229 #if defined(THREADED_RTS)
1230 if (cap_no < n_capabilities) {
1231 capabilities[cap_no]->io_manager_control_wr_fd = fd;
1232 } else {
1233 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1234 }
1235 #endif
1236 }
1237 #endif