CNF: Silence pointer fix-up message unless gc debugging is enabled
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30
31 #if !defined(mingw32_HOST_OS)
32 #include "rts/IOManager.h" // for setIOManagerControlFd()
33 #endif
34
35 #include <string.h>
36
37 // one global capability, this is the Capability for non-threaded
38 // builds, and for +RTS -N1
39 Capability MainCapability;
40
41 uint32_t n_capabilities = 0;
42 uint32_t enabled_capabilities = 0;
43
44 // The array of Capabilities. It's important that when we need
45 // to allocate more Capabilities we don't have to move the existing
46 // Capabilities, because there may be pointers to them in use
47 // (e.g. threads in waitForCapability(), see #8209), so this is
48 // an array of Capability* rather than an array of Capability.
49 Capability **capabilities = NULL;
50
51 // Holds the Capability which last became free. This is used so that
52 // an in-call has a chance of quickly finding a free Capability.
53 // Maintaining a global free list of Capabilities would require global
54 // locking, so we don't do that.
55 static Capability *last_free_capability[MAX_NUMA_NODES];
56
57 /*
58 * Indicates that the RTS wants to synchronise all the Capabilities
59 * for some reason. All Capabilities should yieldCapability().
60 */
61 PendingSync * volatile pending_sync = 0;
62
63 // Number of logical NUMA nodes
64 uint32_t n_numa_nodes;
65
66 // Map logical NUMA node to OS node numbers
67 uint32_t numa_map[MAX_NUMA_NODES];
68
69 /* Let foreign code get the current Capability -- assuming there is one!
70 * This is useful for unsafe foreign calls because they are called with
71 * the current Capability held, but they are not passed it. For example,
72 * see see the integer-gmp package which calls allocate() in its
73 * stgAllocForGMP() function (which gets called by gmp functions).
74 * */
75 Capability * rts_unsafeGetMyCapability (void)
76 {
77 #if defined(THREADED_RTS)
78 return myTask()->cap;
79 #else
80 return &MainCapability;
81 #endif
82 }
83
84 #if defined(THREADED_RTS)
85 STATIC_INLINE bool
86 globalWorkToDo (void)
87 {
88 return sched_state >= SCHED_INTERRUPTING
89 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
90 }
91 #endif
92
93 #if defined(THREADED_RTS)
94 StgClosure *
95 findSpark (Capability *cap)
96 {
97 Capability *robbed;
98 StgClosurePtr spark;
99 bool retry;
100 uint32_t i = 0;
101
102 if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
103 // If there are other threads, don't try to run any new
104 // sparks: sparks might be speculative, we don't want to take
105 // resources away from the main computation.
106 return 0;
107 }
108
109 do {
110 retry = false;
111
112 // first try to get a spark from our own pool.
113 // We should be using reclaimSpark(), because it works without
114 // needing any atomic instructions:
115 // spark = reclaimSpark(cap->sparks);
116 // However, measurements show that this makes at least one benchmark
117 // slower (prsa) and doesn't affect the others.
118 spark = tryStealSpark(cap->sparks);
119 while (spark != NULL && fizzledSpark(spark)) {
120 cap->spark_stats.fizzled++;
121 traceEventSparkFizzle(cap);
122 spark = tryStealSpark(cap->sparks);
123 }
124 if (spark != NULL) {
125 cap->spark_stats.converted++;
126
127 // Post event for running a spark from capability's own pool.
128 traceEventSparkRun(cap);
129
130 return spark;
131 }
132 if (!emptySparkPoolCap(cap)) {
133 retry = true;
134 }
135
136 if (n_capabilities == 1) { return NULL; } // makes no sense...
137
138 debugTrace(DEBUG_sched,
139 "cap %d: Trying to steal work from other capabilities",
140 cap->no);
141
142 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
143 start at a random place instead of 0 as well. */
144 for ( i=0 ; i < n_capabilities ; i++ ) {
145 robbed = capabilities[i];
146 if (cap == robbed) // ourselves...
147 continue;
148
149 if (emptySparkPoolCap(robbed)) // nothing to steal here
150 continue;
151
152 spark = tryStealSpark(robbed->sparks);
153 while (spark != NULL && fizzledSpark(spark)) {
154 cap->spark_stats.fizzled++;
155 traceEventSparkFizzle(cap);
156 spark = tryStealSpark(robbed->sparks);
157 }
158 if (spark == NULL && !emptySparkPoolCap(robbed)) {
159 // we conflicted with another thread while trying to steal;
160 // try again later.
161 retry = true;
162 }
163
164 if (spark != NULL) {
165 cap->spark_stats.converted++;
166 traceEventSparkSteal(cap, robbed->no);
167
168 return spark;
169 }
170 // otherwise: no success, try next one
171 }
172 } while (retry);
173
174 debugTrace(DEBUG_sched, "No sparks stolen");
175 return NULL;
176 }
177
178 // Returns True if any spark pool is non-empty at this moment in time
179 // The result is only valid for an instant, of course, so in a sense
180 // is immediately invalid, and should not be relied upon for
181 // correctness.
182 bool
183 anySparks (void)
184 {
185 uint32_t i;
186
187 for (i=0; i < n_capabilities; i++) {
188 if (!emptySparkPoolCap(capabilities[i])) {
189 return true;
190 }
191 }
192 return false;
193 }
194 #endif
195
196 /* -----------------------------------------------------------------------------
197 * Manage the returning_tasks lists.
198 *
199 * These functions require cap->lock
200 * -------------------------------------------------------------------------- */
201
202 #if defined(THREADED_RTS)
203 STATIC_INLINE void
204 newReturningTask (Capability *cap, Task *task)
205 {
206 ASSERT_LOCK_HELD(&cap->lock);
207 ASSERT(task->next == NULL);
208 if (cap->returning_tasks_hd) {
209 ASSERT(cap->returning_tasks_tl->next == NULL);
210 cap->returning_tasks_tl->next = task;
211 } else {
212 cap->returning_tasks_hd = task;
213 }
214 cap->returning_tasks_tl = task;
215 cap->n_returning_tasks++;
216 ASSERT_RETURNING_TASKS(cap,task);
217 }
218
219 STATIC_INLINE Task *
220 popReturningTask (Capability *cap)
221 {
222 ASSERT_LOCK_HELD(&cap->lock);
223 Task *task;
224 task = cap->returning_tasks_hd;
225 ASSERT(task);
226 cap->returning_tasks_hd = task->next;
227 if (!cap->returning_tasks_hd) {
228 cap->returning_tasks_tl = NULL;
229 }
230 task->next = NULL;
231 cap->n_returning_tasks--;
232 ASSERT_RETURNING_TASKS(cap,task);
233 return task;
234 }
235 #endif
236
237 /* ----------------------------------------------------------------------------
238 * Initialisation
239 *
240 * The Capability is initially marked not free.
241 * ------------------------------------------------------------------------- */
242
243 static void
244 initCapability (Capability *cap, uint32_t i)
245 {
246 uint32_t g;
247
248 cap->no = i;
249 cap->node = capNoToNumaNode(i);
250 cap->in_haskell = false;
251 cap->idle = 0;
252 cap->disabled = false;
253
254 cap->run_queue_hd = END_TSO_QUEUE;
255 cap->run_queue_tl = END_TSO_QUEUE;
256 cap->n_run_queue = 0;
257
258 #if defined(THREADED_RTS)
259 initMutex(&cap->lock);
260 cap->running_task = NULL; // indicates cap is free
261 cap->spare_workers = NULL;
262 cap->n_spare_workers = 0;
263 cap->suspended_ccalls = NULL;
264 cap->n_suspended_ccalls = 0;
265 cap->returning_tasks_hd = NULL;
266 cap->returning_tasks_tl = NULL;
267 cap->n_returning_tasks = 0;
268 cap->inbox = (Message*)END_TSO_QUEUE;
269 cap->putMVars = NULL;
270 cap->sparks = allocSparkPool();
271 cap->spark_stats.created = 0;
272 cap->spark_stats.dud = 0;
273 cap->spark_stats.overflowed = 0;
274 cap->spark_stats.converted = 0;
275 cap->spark_stats.gcd = 0;
276 cap->spark_stats.fizzled = 0;
277 #if !defined(mingw32_HOST_OS)
278 cap->io_manager_control_wr_fd = -1;
279 #endif
280 #endif
281 cap->total_allocated = 0;
282
283 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
284 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
285 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
286
287 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
288 RtsFlags.GcFlags.generations,
289 "initCapability");
290 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
291 RtsFlags.GcFlags.generations,
292 "initCapability");
293
294 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
295 cap->mut_lists[g] = NULL;
296 }
297
298 cap->weak_ptr_list_hd = NULL;
299 cap->weak_ptr_list_tl = NULL;
300 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
301 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
302 cap->free_trec_chunks = END_STM_CHUNK_LIST;
303 cap->free_trec_headers = NO_TREC;
304 cap->transaction_tokens = 0;
305 cap->context_switch = 0;
306 cap->pinned_object_block = NULL;
307 cap->pinned_object_blocks = NULL;
308
309 #if defined(PROFILING)
310 cap->r.rCCCS = CCS_SYSTEM;
311 #else
312 cap->r.rCCCS = NULL;
313 #endif
314
315 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
316 // don't want it set when not running a Haskell thread.
317 cap->r.rCurrentTSO = NULL;
318
319 traceCapCreate(cap);
320 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
321 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
322 #if defined(THREADED_RTS)
323 traceSparkCounters(cap);
324 #endif
325 }
326
327 /* ---------------------------------------------------------------------------
328 * Function: initCapabilities()
329 *
330 * Purpose: set up the Capability handling. For the THREADED_RTS build,
331 * we keep a table of them, the size of which is
332 * controlled by the user via the RTS flag -N.
333 *
334 * ------------------------------------------------------------------------- */
335 void initCapabilities (void)
336 {
337 uint32_t i;
338
339 /* Declare a couple capability sets representing the process and
340 clock domain. Each capability will get added to these capsets. */
341 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
342 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
343
344 // Initialise NUMA
345 if (!RtsFlags.GcFlags.numa) {
346 n_numa_nodes = 1;
347 for (i = 0; i < MAX_NUMA_NODES; i++) {
348 numa_map[i] = 0;
349 }
350 } else {
351 uint32_t nNodes = osNumaNodes();
352 if (nNodes > MAX_NUMA_NODES) {
353 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
354 }
355 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
356 uint32_t logical = 0, physical = 0;
357 for (; physical < MAX_NUMA_NODES; physical++) {
358 if (mask & 1) {
359 numa_map[logical++] = physical;
360 }
361 mask = mask >> 1;
362 }
363 n_numa_nodes = logical;
364 if (logical == 0) {
365 barf("%s: available NUMA node set is empty");
366 }
367 }
368
369 #if defined(THREADED_RTS)
370
371 #if !defined(REG_Base)
372 // We can't support multiple CPUs if BaseReg is not a register
373 if (RtsFlags.ParFlags.nCapabilities > 1) {
374 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
375 RtsFlags.ParFlags.nCapabilities = 1;
376 }
377 #endif
378
379 n_capabilities = 0;
380 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
381 n_capabilities = RtsFlags.ParFlags.nCapabilities;
382
383 #else /* !THREADED_RTS */
384
385 n_capabilities = 1;
386 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
387 capabilities[0] = &MainCapability;
388
389 initCapability(&MainCapability, 0);
390
391 #endif
392
393 enabled_capabilities = n_capabilities;
394
395 // There are no free capabilities to begin with. We will start
396 // a worker Task to each Capability, which will quickly put the
397 // Capability on the free list when it finds nothing to do.
398 for (i = 0; i < n_numa_nodes; i++) {
399 last_free_capability[i] = capabilities[0];
400 }
401 }
402
403 void
404 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
405 {
406 #if defined(THREADED_RTS)
407 uint32_t i;
408 Capability **old_capabilities = capabilities;
409
410 capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
411
412 if (to == 1) {
413 // THREADED_RTS must work on builds that don't have a mutable
414 // BaseReg (eg. unregisterised), so in this case
415 // capabilities[0] must coincide with &MainCapability.
416 capabilities[0] = &MainCapability;
417 initCapability(&MainCapability, 0);
418 }
419 else
420 {
421 for (i = 0; i < to; i++) {
422 if (i < from) {
423 capabilities[i] = old_capabilities[i];
424 } else {
425 capabilities[i] = stgMallocBytes(sizeof(Capability),
426 "moreCapabilities");
427 initCapability(capabilities[i], i);
428 }
429 }
430 }
431
432 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
433
434 if (old_capabilities != NULL) {
435 stgFree(old_capabilities);
436 }
437 #endif
438 }
439
440 /* ----------------------------------------------------------------------------
441 * setContextSwitches: cause all capabilities to context switch as
442 * soon as possible.
443 * ------------------------------------------------------------------------- */
444
445 void contextSwitchAllCapabilities(void)
446 {
447 uint32_t i;
448 for (i=0; i < n_capabilities; i++) {
449 contextSwitchCapability(capabilities[i]);
450 }
451 }
452
453 void interruptAllCapabilities(void)
454 {
455 uint32_t i;
456 for (i=0; i < n_capabilities; i++) {
457 interruptCapability(capabilities[i]);
458 }
459 }
460
461 /* ----------------------------------------------------------------------------
462 * Give a Capability to a Task. The task must currently be sleeping
463 * on its condition variable.
464 *
465 * Requires cap->lock (modifies cap->running_task).
466 *
467 * When migrating a Task, the migrater must take task->lock before
468 * modifying task->cap, to synchronise with the waking up Task.
469 * Additionally, the migrater should own the Capability (when
470 * migrating the run queue), or cap->lock (when migrating
471 * returning_workers).
472 *
473 * ------------------------------------------------------------------------- */
474
475 #if defined(THREADED_RTS)
476 static void
477 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
478 {
479 ASSERT_LOCK_HELD(&cap->lock);
480 ASSERT(task->cap == cap);
481 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
482 cap->no, task->incall->tso ? "bound task" : "worker",
483 serialisableTaskId(task));
484 ACQUIRE_LOCK(&task->lock);
485 if (task->wakeup == false) {
486 task->wakeup = true;
487 // the wakeup flag is needed because signalCondition() doesn't
488 // flag the condition if the thread is already runniing, but we want
489 // it to be sticky.
490 signalCondition(&task->cond);
491 }
492 RELEASE_LOCK(&task->lock);
493 }
494 #endif
495
496 /* ----------------------------------------------------------------------------
497 * releaseCapability
498 *
499 * The current Task (cap->task) releases the Capability. The Capability is
500 * marked free, and if there is any work to do, an appropriate Task is woken up.
501 * ------------------------------------------------------------------------- */
502
503 #if defined(THREADED_RTS)
504 void
505 releaseCapability_ (Capability* cap,
506 bool always_wakeup)
507 {
508 Task *task;
509
510 task = cap->running_task;
511
512 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
513 ASSERT_RETURNING_TASKS(cap,task);
514
515 cap->running_task = NULL;
516
517 // Check to see whether a worker thread can be given
518 // the go-ahead to return the result of an external call..
519 if (cap->n_returning_tasks != 0) {
520 giveCapabilityToTask(cap,cap->returning_tasks_hd);
521 // The Task pops itself from the queue (see waitForCapability())
522 return;
523 }
524
525 // If there is a pending sync, then we should just leave the Capability
526 // free. The thread trying to sync will be about to call
527 // waitForCapability().
528 //
529 // Note: this is *after* we check for a returning task above,
530 // because the task attempting to acquire all the capabilities may
531 // be currently in waitForCapability() waiting for this
532 // capability, in which case simply setting it as free would not
533 // wake up the waiting task.
534 PendingSync *sync = pending_sync;
535 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
536 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
537 return;
538 }
539
540 // If the next thread on the run queue is a bound thread,
541 // give this Capability to the appropriate Task.
542 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
543 // Make sure we're not about to try to wake ourselves up
544 // ASSERT(task != cap->run_queue_hd->bound);
545 // assertion is false: in schedule() we force a yield after
546 // ThreadBlocked, but the thread may be back on the run queue
547 // by now.
548 task = peekRunQueue(cap)->bound->task;
549 giveCapabilityToTask(cap, task);
550 return;
551 }
552
553 if (!cap->spare_workers) {
554 // Create a worker thread if we don't have one. If the system
555 // is interrupted, we only create a worker task if there
556 // are threads that need to be completed. If the system is
557 // shutting down, we never create a new worker.
558 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
559 debugTrace(DEBUG_sched,
560 "starting new worker on capability %d", cap->no);
561 startWorkerTask(cap);
562 return;
563 }
564 }
565
566 // If we have an unbound thread on the run queue, or if there's
567 // anything else to do, give the Capability to a worker thread.
568 if (always_wakeup ||
569 !emptyRunQueue(cap) || !emptyInbox(cap) ||
570 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
571 if (cap->spare_workers) {
572 giveCapabilityToTask(cap, cap->spare_workers);
573 // The worker Task pops itself from the queue;
574 return;
575 }
576 }
577
578 #if defined(PROFILING)
579 cap->r.rCCCS = CCS_IDLE;
580 #endif
581 last_free_capability[cap->node] = cap;
582 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
583 }
584
585 void
586 releaseCapability (Capability* cap USED_IF_THREADS)
587 {
588 ACQUIRE_LOCK(&cap->lock);
589 releaseCapability_(cap, false);
590 RELEASE_LOCK(&cap->lock);
591 }
592
593 void
594 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
595 {
596 ACQUIRE_LOCK(&cap->lock);
597 releaseCapability_(cap, true);
598 RELEASE_LOCK(&cap->lock);
599 }
600
601 static void
602 enqueueWorker (Capability* cap USED_IF_THREADS)
603 {
604 Task *task;
605
606 task = cap->running_task;
607
608 // If the Task is stopped, we shouldn't be yielding, we should
609 // be just exiting.
610 ASSERT(!task->stopped);
611 ASSERT(task->worker);
612
613 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
614 {
615 task->next = cap->spare_workers;
616 cap->spare_workers = task;
617 cap->n_spare_workers++;
618 }
619 else
620 {
621 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
622 cap->n_spare_workers);
623 releaseCapability_(cap,false);
624 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
625 workerTaskStop(task);
626 RELEASE_LOCK(&cap->lock);
627 shutdownThread();
628 }
629 }
630
631 #endif
632
633 /* ----------------------------------------------------------------------------
634 * waitForWorkerCapability(task)
635 *
636 * waits to be given a Capability, and then returns the Capability. The task
637 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
638 * ------------------------------------------------------------------------- */
639
640 #if defined(THREADED_RTS)
641
642 static Capability * waitForWorkerCapability (Task *task)
643 {
644 Capability *cap;
645
646 for (;;) {
647 ACQUIRE_LOCK(&task->lock);
648 // task->lock held, cap->lock not held
649 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
650 cap = task->cap;
651 task->wakeup = false;
652 RELEASE_LOCK(&task->lock);
653
654 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
655
656 ACQUIRE_LOCK(&cap->lock);
657 if (cap->running_task != NULL) {
658 debugTrace(DEBUG_sched,
659 "capability %d is owned by another task", cap->no);
660 RELEASE_LOCK(&cap->lock);
661 continue;
662 }
663
664 if (task->cap != cap) {
665 // see Note [migrated bound threads]
666 debugTrace(DEBUG_sched,
667 "task has been migrated to cap %d", task->cap->no);
668 RELEASE_LOCK(&cap->lock);
669 continue;
670 }
671
672 if (task->incall->tso == NULL) {
673 ASSERT(cap->spare_workers != NULL);
674 // if we're not at the front of the queue, release it
675 // again. This is unlikely to happen.
676 if (cap->spare_workers != task) {
677 giveCapabilityToTask(cap,cap->spare_workers);
678 RELEASE_LOCK(&cap->lock);
679 continue;
680 }
681 cap->spare_workers = task->next;
682 task->next = NULL;
683 cap->n_spare_workers--;
684 }
685
686 cap->running_task = task;
687 RELEASE_LOCK(&cap->lock);
688 break;
689 }
690
691 return cap;
692 }
693
694 #endif /* THREADED_RTS */
695
696 /* ----------------------------------------------------------------------------
697 * waitForReturnCapability (Task *task)
698 *
699 * The Task should be on the cap->returning_tasks queue of a Capability. This
700 * function waits for the Task to be woken up, and returns the Capability that
701 * it was woken up on.
702 *
703 * ------------------------------------------------------------------------- */
704
705 #if defined(THREADED_RTS)
706
707 static Capability * waitForReturnCapability (Task *task)
708 {
709 Capability *cap;
710
711 for (;;) {
712 ACQUIRE_LOCK(&task->lock);
713 // task->lock held, cap->lock not held
714 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
715 cap = task->cap;
716 task->wakeup = false;
717 RELEASE_LOCK(&task->lock);
718
719 // now check whether we should wake up...
720 ACQUIRE_LOCK(&cap->lock);
721 if (cap->running_task == NULL) {
722 if (cap->returning_tasks_hd != task) {
723 giveCapabilityToTask(cap,cap->returning_tasks_hd);
724 RELEASE_LOCK(&cap->lock);
725 continue;
726 }
727 cap->running_task = task;
728 popReturningTask(cap);
729 RELEASE_LOCK(&cap->lock);
730 break;
731 }
732 RELEASE_LOCK(&cap->lock);
733 }
734
735 return cap;
736 }
737
738 #endif /* THREADED_RTS */
739
740 /* ----------------------------------------------------------------------------
741 * waitForCapability (Capability **pCap, Task *task)
742 *
743 * Purpose: when an OS thread returns from an external call,
744 * it calls waitForCapability() (via Schedule.resumeThread())
745 * to wait for permission to enter the RTS & communicate the
746 * result of the external call back to the Haskell thread that
747 * made it.
748 *
749 * ------------------------------------------------------------------------- */
750
751 void waitForCapability (Capability **pCap, Task *task)
752 {
753 #if !defined(THREADED_RTS)
754
755 MainCapability.running_task = task;
756 task->cap = &MainCapability;
757 *pCap = &MainCapability;
758
759 #else
760 uint32_t i;
761 Capability *cap = *pCap;
762
763 if (cap == NULL) {
764 if (task->preferred_capability != -1) {
765 cap = capabilities[task->preferred_capability %
766 enabled_capabilities];
767 } else {
768 // Try last_free_capability first
769 cap = last_free_capability[task->node];
770 if (cap->running_task) {
771 // Otherwise, search for a free capability on this node.
772 cap = NULL;
773 for (i = task->node; i < enabled_capabilities;
774 i += n_numa_nodes) {
775 // visits all the capabilities on this node, because
776 // cap[i]->node == i % n_numa_nodes
777 if (!capabilities[i]->running_task) {
778 cap = capabilities[i];
779 break;
780 }
781 }
782 if (cap == NULL) {
783 // Can't find a free one, use last_free_capability.
784 cap = last_free_capability[task->node];
785 }
786 }
787 }
788
789 // record the Capability as the one this Task is now assocated with.
790 task->cap = cap;
791
792 } else {
793 ASSERT(task->cap == cap);
794 }
795
796 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
797
798 ACQUIRE_LOCK(&cap->lock);
799 if (!cap->running_task) {
800 // It's free; just grab it
801 cap->running_task = task;
802 RELEASE_LOCK(&cap->lock);
803 } else {
804 newReturningTask(cap,task);
805 RELEASE_LOCK(&cap->lock);
806 cap = waitForReturnCapability(task);
807 }
808
809 #if defined(PROFILING)
810 cap->r.rCCCS = CCS_SYSTEM;
811 #endif
812
813 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
814
815 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
816
817 *pCap = cap;
818 #endif
819 }
820
821 /* ----------------------------------------------------------------------------
822 * yieldCapability
823 *
824 * Give up the Capability, and return when we have it again. This is called
825 * when either we know that the Capability should be given to another Task, or
826 * there is nothing to do right now. One of the following is true:
827 *
828 * - The current Task is a worker, and there's a bound thread at the head of
829 * the run queue (or vice versa)
830 *
831 * - The run queue is empty. We'll be woken up again when there's work to
832 * do.
833 *
834 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
835 * We should become a GC worker for a while.
836 *
837 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
838 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
839 * setNumCapabilities. We should give up the Capability temporarily.
840 *
841 * ------------------------------------------------------------------------- */
842
843 #if defined (THREADED_RTS)
844
845 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
846 and return the bool */
847 bool /* Did we GC? */
848 yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
849 {
850 Capability *cap = *pCap;
851
852 if (gcAllowed)
853 {
854 PendingSync *sync = pending_sync;
855
856 if (sync && sync->type == SYNC_GC_PAR) {
857 if (! sync->idle[cap->no]) {
858 traceEventGcStart(cap);
859 gcWorkerThread(cap);
860 traceEventGcEnd(cap);
861 traceSparkCounters(cap);
862 // See Note [migrated bound threads 2]
863 if (task->cap == cap) {
864 return true;
865 }
866 }
867 }
868 }
869
870 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
871
872 // We must now release the capability and wait to be woken up again.
873 task->wakeup = false;
874
875 ACQUIRE_LOCK(&cap->lock);
876
877 // If this is a worker thread, put it on the spare_workers queue
878 if (isWorker(task)) {
879 enqueueWorker(cap);
880 }
881
882 releaseCapability_(cap, false);
883
884 if (isWorker(task) || isBoundTask(task)) {
885 RELEASE_LOCK(&cap->lock);
886 cap = waitForWorkerCapability(task);
887 } else {
888 // Not a worker Task, or a bound Task. The only way we can be woken up
889 // again is to put ourselves on the returning_tasks queue, so that's
890 // what we do. We still hold cap->lock at this point
891 // The Task waiting for this Capability does not have it
892 // yet, so we can be sure to be woken up later. (see #10545)
893 newReturningTask(cap,task);
894 RELEASE_LOCK(&cap->lock);
895 cap = waitForReturnCapability(task);
896 }
897
898 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
899 ASSERT(cap->running_task == task);
900
901 #if defined(PROFILING)
902 cap->r.rCCCS = CCS_SYSTEM;
903 #endif
904
905 *pCap = cap;
906
907 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
908
909 return false;
910 }
911
912 #endif /* THREADED_RTS */
913
914 // Note [migrated bound threads]
915 //
916 // There's a tricky case where:
917 // - cap A is running an unbound thread T1
918 // - there is a bound thread T2 at the head of the run queue on cap A
919 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
920 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
921 // - T1 blocks, the scheduler migrates T2 to cap B
922 // - the task bound to T2 wakes up on cap B
923 //
924 // We take advantage of the following invariant:
925 //
926 // - A bound thread can only be migrated by the holder of the
927 // Capability on which the bound thread currently lives. So, if we
928 // hold Capability C, and task->cap == C, then task cannot be
929 // migrated under our feet.
930
931 // Note [migrated bound threads 2]
932 //
933 // Second tricky case;
934 // - A bound Task becomes a GC thread
935 // - scheduleDoGC() migrates the thread belonging to this Task,
936 // because the Capability it is on is disabled
937 // - after GC, gcWorkerThread() returns, but now we are
938 // holding a Capability that is not the same as task->cap
939 // - Hence we must check for this case and immediately give up the
940 // cap we hold.
941
942 /* ----------------------------------------------------------------------------
943 * prodCapability
944 *
945 * If a Capability is currently idle, wake up a Task on it. Used to
946 * get every Capability into the GC.
947 * ------------------------------------------------------------------------- */
948
949 #if defined (THREADED_RTS)
950
951 void
952 prodCapability (Capability *cap, Task *task)
953 {
954 ACQUIRE_LOCK(&cap->lock);
955 if (!cap->running_task) {
956 cap->running_task = task;
957 releaseCapability_(cap,true);
958 }
959 RELEASE_LOCK(&cap->lock);
960 }
961
962 #endif /* THREADED_RTS */
963
964 /* ----------------------------------------------------------------------------
965 * tryGrabCapability
966 *
967 * Attempt to gain control of a Capability if it is free.
968 *
969 * ------------------------------------------------------------------------- */
970
971 #if defined (THREADED_RTS)
972
973 bool
974 tryGrabCapability (Capability *cap, Task *task)
975 {
976 int r;
977 if (cap->running_task != NULL) return false;
978 r = TRY_ACQUIRE_LOCK(&cap->lock);
979 if (r != 0) return false;
980 if (cap->running_task != NULL) {
981 RELEASE_LOCK(&cap->lock);
982 return false;
983 }
984 task->cap = cap;
985 cap->running_task = task;
986 RELEASE_LOCK(&cap->lock);
987 return true;
988 }
989
990
991 #endif /* THREADED_RTS */
992
993 /* ----------------------------------------------------------------------------
994 * shutdownCapability
995 *
996 * At shutdown time, we want to let everything exit as cleanly as
997 * possible. For each capability, we let its run queue drain, and
998 * allow the workers to stop.
999 *
1000 * This function should be called when interrupted and
1001 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
1002 * will exit the scheduler and call taskStop(), and any bound thread
1003 * that wakes up will return to its caller. Runnable threads are
1004 * killed.
1005 *
1006 * ------------------------------------------------------------------------- */
1007
1008 static void
1009 shutdownCapability (Capability *cap USED_IF_THREADS,
1010 Task *task USED_IF_THREADS,
1011 bool safe USED_IF_THREADS)
1012 {
1013 #if defined(THREADED_RTS)
1014 uint32_t i;
1015
1016 task->cap = cap;
1017
1018 // Loop indefinitely until all the workers have exited and there
1019 // are no Haskell threads left. We used to bail out after 50
1020 // iterations of this loop, but that occasionally left a worker
1021 // running which caused problems later (the closeMutex() below
1022 // isn't safe, for one thing).
1023
1024 for (i = 0; /* i < 50 */; i++) {
1025 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1026
1027 debugTrace(DEBUG_sched,
1028 "shutting down capability %d, attempt %d", cap->no, i);
1029 ACQUIRE_LOCK(&cap->lock);
1030 if (cap->running_task) {
1031 RELEASE_LOCK(&cap->lock);
1032 debugTrace(DEBUG_sched, "not owner, yielding");
1033 yieldThread();
1034 continue;
1035 }
1036 cap->running_task = task;
1037
1038 if (cap->spare_workers) {
1039 // Look for workers that have died without removing
1040 // themselves from the list; this could happen if the OS
1041 // summarily killed the thread, for example. This
1042 // actually happens on Windows when the system is
1043 // terminating the program, and the RTS is running in a
1044 // DLL.
1045 Task *t, *prev;
1046 prev = NULL;
1047 for (t = cap->spare_workers; t != NULL; t = t->next) {
1048 if (!osThreadIsAlive(t->id)) {
1049 debugTrace(DEBUG_sched,
1050 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1051 cap->n_spare_workers--;
1052 if (!prev) {
1053 cap->spare_workers = t->next;
1054 } else {
1055 prev->next = t->next;
1056 }
1057 prev = t;
1058 }
1059 }
1060 }
1061
1062 if (!emptyRunQueue(cap) || cap->spare_workers) {
1063 debugTrace(DEBUG_sched,
1064 "runnable threads or workers still alive, yielding");
1065 releaseCapability_(cap,false); // this will wake up a worker
1066 RELEASE_LOCK(&cap->lock);
1067 yieldThread();
1068 continue;
1069 }
1070
1071 // If "safe", then busy-wait for any threads currently doing
1072 // foreign calls. If we're about to unload this DLL, for
1073 // example, we need to be sure that there are no OS threads
1074 // that will try to return to code that has been unloaded.
1075 // We can be a bit more relaxed when this is a standalone
1076 // program that is about to terminate, and let safe=false.
1077 if (cap->suspended_ccalls && safe) {
1078 debugTrace(DEBUG_sched,
1079 "thread(s) are involved in foreign calls, yielding");
1080 cap->running_task = NULL;
1081 RELEASE_LOCK(&cap->lock);
1082 // The IO manager thread might have been slow to start up,
1083 // so the first attempt to kill it might not have
1084 // succeeded. Just in case, try again - the kill message
1085 // will only be sent once.
1086 //
1087 // To reproduce this deadlock: run ffi002(threaded1)
1088 // repeatedly on a loaded machine.
1089 ioManagerDie();
1090 yieldThread();
1091 continue;
1092 }
1093
1094 traceSparkCounters(cap);
1095 RELEASE_LOCK(&cap->lock);
1096 break;
1097 }
1098 // we now have the Capability, its run queue and spare workers
1099 // list are both empty.
1100
1101 // ToDo: we can't drop this mutex, because there might still be
1102 // threads performing foreign calls that will eventually try to
1103 // return via resumeThread() and attempt to grab cap->lock.
1104 // closeMutex(&cap->lock);
1105 #endif
1106 }
1107
1108 void
1109 shutdownCapabilities(Task *task, bool safe)
1110 {
1111 uint32_t i;
1112 for (i=0; i < n_capabilities; i++) {
1113 ASSERT(task->incall->tso == NULL);
1114 shutdownCapability(capabilities[i], task, safe);
1115 }
1116 #if defined(THREADED_RTS)
1117 ASSERT(checkSparkCountInvariant());
1118 #endif
1119 }
1120
1121 static void
1122 freeCapability (Capability *cap)
1123 {
1124 stgFree(cap->mut_lists);
1125 stgFree(cap->saved_mut_lists);
1126 #if defined(THREADED_RTS)
1127 freeSparkPool(cap->sparks);
1128 #endif
1129 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1130 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1131 traceCapDelete(cap);
1132 }
1133
1134 void
1135 freeCapabilities (void)
1136 {
1137 #if defined(THREADED_RTS)
1138 uint32_t i;
1139 for (i=0; i < n_capabilities; i++) {
1140 freeCapability(capabilities[i]);
1141 if (capabilities[i] != &MainCapability)
1142 stgFree(capabilities[i]);
1143 }
1144 #else
1145 freeCapability(&MainCapability);
1146 #endif
1147 stgFree(capabilities);
1148 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1149 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1150 }
1151
1152 /* ---------------------------------------------------------------------------
1153 Mark everything directly reachable from the Capabilities. When
1154 using multiple GC threads, each GC thread marks all Capabilities
1155 for which (c `mod` n == 0), for Capability c and thread n.
1156 ------------------------------------------------------------------------ */
1157
1158 void
1159 markCapability (evac_fn evac, void *user, Capability *cap,
1160 bool no_mark_sparks USED_IF_THREADS)
1161 {
1162 InCall *incall;
1163
1164 // Each GC thread is responsible for following roots from the
1165 // Capability of the same number. There will usually be the same
1166 // or fewer Capabilities as GC threads, but just in case there
1167 // are more, we mark every Capability whose number is the GC
1168 // thread's index plus a multiple of the number of GC threads.
1169 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1170 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1171 #if defined(THREADED_RTS)
1172 evac(user, (StgClosure **)(void *)&cap->inbox);
1173 #endif
1174 for (incall = cap->suspended_ccalls; incall != NULL;
1175 incall=incall->next) {
1176 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1177 }
1178
1179 #if defined(THREADED_RTS)
1180 if (!no_mark_sparks) {
1181 traverseSparkQueue (evac, user, cap);
1182 }
1183 #endif
1184
1185 // Free STM structures for this Capability
1186 stmPreGCHook(cap);
1187 }
1188
1189 void
1190 markCapabilities (evac_fn evac, void *user)
1191 {
1192 uint32_t n;
1193 for (n = 0; n < n_capabilities; n++) {
1194 markCapability(evac, user, capabilities[n], false);
1195 }
1196 }
1197
1198 #if defined(THREADED_RTS)
1199 bool checkSparkCountInvariant (void)
1200 {
1201 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1202 StgWord64 remaining = 0;
1203 uint32_t i;
1204
1205 for (i = 0; i < n_capabilities; i++) {
1206 sparks.created += capabilities[i]->spark_stats.created;
1207 sparks.dud += capabilities[i]->spark_stats.dud;
1208 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1209 sparks.converted += capabilities[i]->spark_stats.converted;
1210 sparks.gcd += capabilities[i]->spark_stats.gcd;
1211 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1212 remaining += sparkPoolSize(capabilities[i]->sparks);
1213 }
1214
1215 /* The invariant is
1216 * created = converted + remaining + gcd + fizzled
1217 */
1218 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1219 "(created == converted + remaining + gcd + fizzled)",
1220 sparks.created, sparks.converted, remaining,
1221 sparks.gcd, sparks.fizzled);
1222
1223 return (sparks.created ==
1224 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1225
1226 }
1227 #endif
1228
1229 #if !defined(mingw32_HOST_OS)
1230 void
1231 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1232 #if defined(THREADED_RTS)
1233 if (cap_no < n_capabilities) {
1234 capabilities[cap_no]->io_manager_control_wr_fd = fd;
1235 } else {
1236 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1237 }
1238 #endif
1239 }
1240 #endif