Fix NUMA support on Windows (#15049)
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in a THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30
31 #if !defined(mingw32_HOST_OS)
32 #include "rts/IOManager.h" // for setIOManagerControlFd()
33 #endif
34
35 #include <string.h>
36
37 // one global capability, this is the Capability for non-threaded
38 // builds, and for +RTS -N1
39 Capability MainCapability;
40
41 uint32_t n_capabilities = 0;
42 uint32_t enabled_capabilities = 0;
43
44 // The array of Capabilities. It's important that when we need
45 // to allocate more Capabilities we don't have to move the existing
46 // Capabilities, because there may be pointers to them in use
47 // (e.g. threads in waitForCapability(), see #8209), so this is
48 // an array of Capability* rather than an array of Capability.
49 Capability **capabilities = NULL;
50
51 // Holds the Capability which last became free. This is used so that
52 // an in-call has a chance of quickly finding a free Capability.
53 // Maintaining a global free list of Capabilities would require global
54 // locking, so we don't do that.
55 static Capability *last_free_capability[MAX_NUMA_NODES];
56
57 /*
58 * Indicates that the RTS wants to synchronise all the Capabilities
59 * for some reason. All Capabilities should yieldCapability().
60 */
61 PendingSync * volatile pending_sync = 0;
62
63 // Number of logical NUMA nodes
64 uint32_t n_numa_nodes;
65
66 // Map logical NUMA node to OS node numbers
67 uint32_t numa_map[MAX_NUMA_NODES];
68
69 /* Let foreign code get the current Capability -- assuming there is one!
70 * This is useful for unsafe foreign calls because they are called with
71 * the current Capability held, but they are not passed it. For example,
72 * see see the integer-gmp package which calls allocate() in its
73 * stgAllocForGMP() function (which gets called by gmp functions).
74 * */
75 Capability * rts_unsafeGetMyCapability (void)
76 {
77 #if defined(THREADED_RTS)
78 return myTask()->cap;
79 #else
80 return &MainCapability;
81 #endif
82 }
83
84 #if defined(THREADED_RTS)
85 STATIC_INLINE bool
86 globalWorkToDo (void)
87 {
88 return sched_state >= SCHED_INTERRUPTING
89 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
90 }
91 #endif
92
93 #if defined(THREADED_RTS)
94 StgClosure *
95 findSpark (Capability *cap)
96 {
97 Capability *robbed;
98 StgClosurePtr spark;
99 bool retry;
100 uint32_t i = 0;
101
102 if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
103 // If there are other threads, don't try to run any new
104 // sparks: sparks might be speculative, we don't want to take
105 // resources away from the main computation.
106 return 0;
107 }
108
109 do {
110 retry = false;
111
112 // first try to get a spark from our own pool.
113 // We should be using reclaimSpark(), because it works without
114 // needing any atomic instructions:
115 // spark = reclaimSpark(cap->sparks);
116 // However, measurements show that this makes at least one benchmark
117 // slower (prsa) and doesn't affect the others.
118 spark = tryStealSpark(cap->sparks);
119 while (spark != NULL && fizzledSpark(spark)) {
120 cap->spark_stats.fizzled++;
121 traceEventSparkFizzle(cap);
122 spark = tryStealSpark(cap->sparks);
123 }
124 if (spark != NULL) {
125 cap->spark_stats.converted++;
126
127 // Post event for running a spark from capability's own pool.
128 traceEventSparkRun(cap);
129
130 return spark;
131 }
132 if (!emptySparkPoolCap(cap)) {
133 retry = true;
134 }
135
136 if (n_capabilities == 1) { return NULL; } // makes no sense...
137
138 debugTrace(DEBUG_sched,
139 "cap %d: Trying to steal work from other capabilities",
140 cap->no);
141
142 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
143 start at a random place instead of 0 as well. */
144 for ( i=0 ; i < n_capabilities ; i++ ) {
145 robbed = capabilities[i];
146 if (cap == robbed) // ourselves...
147 continue;
148
149 if (emptySparkPoolCap(robbed)) // nothing to steal here
150 continue;
151
152 spark = tryStealSpark(robbed->sparks);
153 while (spark != NULL && fizzledSpark(spark)) {
154 cap->spark_stats.fizzled++;
155 traceEventSparkFizzle(cap);
156 spark = tryStealSpark(robbed->sparks);
157 }
158 if (spark == NULL && !emptySparkPoolCap(robbed)) {
159 // we conflicted with another thread while trying to steal;
160 // try again later.
161 retry = true;
162 }
163
164 if (spark != NULL) {
165 cap->spark_stats.converted++;
166 traceEventSparkSteal(cap, robbed->no);
167
168 return spark;
169 }
170 // otherwise: no success, try next one
171 }
172 } while (retry);
173
174 debugTrace(DEBUG_sched, "No sparks stolen");
175 return NULL;
176 }
177
178 // Returns True if any spark pool is non-empty at this moment in time
179 // The result is only valid for an instant, of course, so in a sense
180 // is immediately invalid, and should not be relied upon for
181 // correctness.
182 bool
183 anySparks (void)
184 {
185 uint32_t i;
186
187 for (i=0; i < n_capabilities; i++) {
188 if (!emptySparkPoolCap(capabilities[i])) {
189 return true;
190 }
191 }
192 return false;
193 }
194 #endif
195
196 /* -----------------------------------------------------------------------------
197 * Manage the returning_tasks lists.
198 *
199 * These functions require cap->lock
200 * -------------------------------------------------------------------------- */
201
202 #if defined(THREADED_RTS)
203 STATIC_INLINE void
204 newReturningTask (Capability *cap, Task *task)
205 {
206 ASSERT_LOCK_HELD(&cap->lock);
207 ASSERT(task->next == NULL);
208 if (cap->returning_tasks_hd) {
209 ASSERT(cap->returning_tasks_tl->next == NULL);
210 cap->returning_tasks_tl->next = task;
211 } else {
212 cap->returning_tasks_hd = task;
213 }
214 cap->returning_tasks_tl = task;
215 cap->n_returning_tasks++;
216 ASSERT_RETURNING_TASKS(cap,task);
217 }
218
219 STATIC_INLINE Task *
220 popReturningTask (Capability *cap)
221 {
222 ASSERT_LOCK_HELD(&cap->lock);
223 Task *task;
224 task = cap->returning_tasks_hd;
225 ASSERT(task);
226 cap->returning_tasks_hd = task->next;
227 if (!cap->returning_tasks_hd) {
228 cap->returning_tasks_tl = NULL;
229 }
230 task->next = NULL;
231 cap->n_returning_tasks--;
232 ASSERT_RETURNING_TASKS(cap,task);
233 return task;
234 }
235 #endif
236
237 /* ----------------------------------------------------------------------------
238 * Initialisation
239 *
240 * The Capability is initially marked not free.
241 * ------------------------------------------------------------------------- */
242
243 static void
244 initCapability (Capability *cap, uint32_t i)
245 {
246 uint32_t g;
247
248 cap->no = i;
249 cap->node = capNoToNumaNode(i);
250 cap->in_haskell = false;
251 cap->idle = 0;
252 cap->disabled = false;
253
254 cap->run_queue_hd = END_TSO_QUEUE;
255 cap->run_queue_tl = END_TSO_QUEUE;
256 cap->n_run_queue = 0;
257
258 #if defined(THREADED_RTS)
259 initMutex(&cap->lock);
260 cap->running_task = NULL; // indicates cap is free
261 cap->spare_workers = NULL;
262 cap->n_spare_workers = 0;
263 cap->suspended_ccalls = NULL;
264 cap->n_suspended_ccalls = 0;
265 cap->returning_tasks_hd = NULL;
266 cap->returning_tasks_tl = NULL;
267 cap->n_returning_tasks = 0;
268 cap->inbox = (Message*)END_TSO_QUEUE;
269 cap->putMVars = NULL;
270 cap->sparks = allocSparkPool();
271 cap->spark_stats.created = 0;
272 cap->spark_stats.dud = 0;
273 cap->spark_stats.overflowed = 0;
274 cap->spark_stats.converted = 0;
275 cap->spark_stats.gcd = 0;
276 cap->spark_stats.fizzled = 0;
277 #if !defined(mingw32_HOST_OS)
278 cap->io_manager_control_wr_fd = -1;
279 #endif
280 #endif
281 cap->total_allocated = 0;
282
283 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
284 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
285 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
286
287 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
288 RtsFlags.GcFlags.generations,
289 "initCapability");
290 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
291 RtsFlags.GcFlags.generations,
292 "initCapability");
293
294 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
295 cap->mut_lists[g] = NULL;
296 }
297
298 cap->weak_ptr_list_hd = NULL;
299 cap->weak_ptr_list_tl = NULL;
300 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
301 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
302 cap->free_trec_chunks = END_STM_CHUNK_LIST;
303 cap->free_trec_headers = NO_TREC;
304 cap->transaction_tokens = 0;
305 cap->context_switch = 0;
306 cap->pinned_object_block = NULL;
307 cap->pinned_object_blocks = NULL;
308
309 #if defined(PROFILING)
310 cap->r.rCCCS = CCS_SYSTEM;
311 #else
312 cap->r.rCCCS = NULL;
313 #endif
314
315 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
316 // don't want it set when not running a Haskell thread.
317 cap->r.rCurrentTSO = NULL;
318
319 traceCapCreate(cap);
320 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
321 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
322 #if defined(THREADED_RTS)
323 traceSparkCounters(cap);
324 #endif
325 }
326
327 /* ---------------------------------------------------------------------------
328 * Function: initCapabilities()
329 *
330 * Purpose: set up the Capability handling. For the THREADED_RTS build,
331 * we keep a table of them, the size of which is
332 * controlled by the user via the RTS flag -N.
333 *
334 * ------------------------------------------------------------------------- */
335 void initCapabilities (void)
336 {
337 uint32_t i;
338
339 /* Declare a couple capability sets representing the process and
340 clock domain. Each capability will get added to these capsets. */
341 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
342 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
343
344 // Initialise NUMA
345 if (!RtsFlags.GcFlags.numa) {
346 n_numa_nodes = 1;
347 for (i = 0; i < MAX_NUMA_NODES; i++) {
348 numa_map[i] = 0;
349 }
350 } else {
351 uint32_t nNodes = osNumaNodes();
352 if (nNodes > MAX_NUMA_NODES) {
353 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
354 }
355 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
356 uint32_t logical = 0, physical = 0;
357 for (; physical < MAX_NUMA_NODES; physical++) {
358 if (mask & 1) {
359 numa_map[logical++] = physical;
360 }
361 mask = mask >> 1;
362 }
363 n_numa_nodes = logical;
364 if (logical == 0) {
365 barf("available NUMA node set is empty");
366 }
367 }
368
369 #if defined(THREADED_RTS)
370
371 #if !defined(REG_Base)
372 // We can't support multiple CPUs if BaseReg is not a register
373 if (RtsFlags.ParFlags.nCapabilities > 1) {
374 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
375 RtsFlags.ParFlags.nCapabilities = 1;
376 }
377 #endif
378
379 n_capabilities = 0;
380 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
381 n_capabilities = RtsFlags.ParFlags.nCapabilities;
382
383 #else /* !THREADED_RTS */
384
385 n_capabilities = 1;
386 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
387 capabilities[0] = &MainCapability;
388
389 initCapability(&MainCapability, 0);
390
391 #endif
392
393 enabled_capabilities = n_capabilities;
394
395 // There are no free capabilities to begin with. We will start
396 // a worker Task to each Capability, which will quickly put the
397 // Capability on the free list when it finds nothing to do.
398 for (i = 0; i < n_numa_nodes; i++) {
399 last_free_capability[i] = capabilities[0];
400 }
401 }
402
403 void
404 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
405 {
406 #if defined(THREADED_RTS)
407 uint32_t i;
408 Capability **old_capabilities = capabilities;
409
410 capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
411
412 if (to == 1) {
413 // THREADED_RTS must work on builds that don't have a mutable
414 // BaseReg (eg. unregisterised), so in this case
415 // capabilities[0] must coincide with &MainCapability.
416 capabilities[0] = &MainCapability;
417 initCapability(&MainCapability, 0);
418 }
419 else
420 {
421 for (i = 0; i < to; i++) {
422 if (i < from) {
423 capabilities[i] = old_capabilities[i];
424 } else {
425 capabilities[i] = stgMallocBytes(sizeof(Capability),
426 "moreCapabilities");
427 initCapability(capabilities[i], i);
428 }
429 }
430 }
431
432 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
433
434 if (old_capabilities != NULL) {
435 stgFree(old_capabilities);
436 }
437 #endif
438 }
439
440 /* ----------------------------------------------------------------------------
441 * setContextSwitches: cause all capabilities to context switch as
442 * soon as possible.
443 * ------------------------------------------------------------------------- */
444
445 void contextSwitchAllCapabilities(void)
446 {
447 uint32_t i;
448 for (i=0; i < n_capabilities; i++) {
449 contextSwitchCapability(capabilities[i]);
450 }
451 }
452
453 void interruptAllCapabilities(void)
454 {
455 uint32_t i;
456 for (i=0; i < n_capabilities; i++) {
457 interruptCapability(capabilities[i]);
458 }
459 }
460
461 /* ----------------------------------------------------------------------------
462 * Give a Capability to a Task. The task must currently be sleeping
463 * on its condition variable.
464 *
465 * Requires cap->lock (modifies cap->running_task).
466 *
467 * When migrating a Task, the migrater must take task->lock before
468 * modifying task->cap, to synchronise with the waking up Task.
469 * Additionally, the migrater should own the Capability (when
470 * migrating the run queue), or cap->lock (when migrating
471 * returning_workers).
472 *
473 * ------------------------------------------------------------------------- */
474
475 #if defined(THREADED_RTS)
476 static void
477 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
478 {
479 ASSERT_LOCK_HELD(&cap->lock);
480 ASSERT(task->cap == cap);
481 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
482 cap->no, task->incall->tso ? "bound task" : "worker",
483 serialisableTaskId(task));
484 ACQUIRE_LOCK(&task->lock);
485 if (task->wakeup == false) {
486 task->wakeup = true;
487 // the wakeup flag is needed because signalCondition() doesn't
488 // flag the condition if the thread is already runniing, but we want
489 // it to be sticky.
490 signalCondition(&task->cond);
491 }
492 RELEASE_LOCK(&task->lock);
493 }
494 #endif
495
496 /* ----------------------------------------------------------------------------
497 * releaseCapability
498 *
499 * The current Task (cap->task) releases the Capability. The Capability is
500 * marked free, and if there is any work to do, an appropriate Task is woken up.
501 *
502 * N.B. May need to take all_tasks_mutex.
503 *
504 * ------------------------------------------------------------------------- */
505
506 #if defined(THREADED_RTS)
507 void
508 releaseCapability_ (Capability* cap,
509 bool always_wakeup)
510 {
511 Task *task;
512
513 task = cap->running_task;
514
515 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
516 ASSERT_RETURNING_TASKS(cap,task);
517
518 cap->running_task = NULL;
519
520 // Check to see whether a worker thread can be given
521 // the go-ahead to return the result of an external call..
522 if (cap->n_returning_tasks != 0) {
523 giveCapabilityToTask(cap,cap->returning_tasks_hd);
524 // The Task pops itself from the queue (see waitForCapability())
525 return;
526 }
527
528 // If there is a pending sync, then we should just leave the Capability
529 // free. The thread trying to sync will be about to call
530 // waitForCapability().
531 //
532 // Note: this is *after* we check for a returning task above,
533 // because the task attempting to acquire all the capabilities may
534 // be currently in waitForCapability() waiting for this
535 // capability, in which case simply setting it as free would not
536 // wake up the waiting task.
537 PendingSync *sync = pending_sync;
538 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
539 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
540 return;
541 }
542
543 // If the next thread on the run queue is a bound thread,
544 // give this Capability to the appropriate Task.
545 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
546 // Make sure we're not about to try to wake ourselves up
547 // ASSERT(task != cap->run_queue_hd->bound);
548 // assertion is false: in schedule() we force a yield after
549 // ThreadBlocked, but the thread may be back on the run queue
550 // by now.
551 task = peekRunQueue(cap)->bound->task;
552 giveCapabilityToTask(cap, task);
553 return;
554 }
555
556 if (!cap->spare_workers) {
557 // Create a worker thread if we don't have one. If the system
558 // is interrupted, we only create a worker task if there
559 // are threads that need to be completed. If the system is
560 // shutting down, we never create a new worker.
561 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
562 debugTrace(DEBUG_sched,
563 "starting new worker on capability %d", cap->no);
564 startWorkerTask(cap);
565 return;
566 }
567 }
568
569 // If we have an unbound thread on the run queue, or if there's
570 // anything else to do, give the Capability to a worker thread.
571 if (always_wakeup ||
572 !emptyRunQueue(cap) || !emptyInbox(cap) ||
573 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
574 if (cap->spare_workers) {
575 giveCapabilityToTask(cap, cap->spare_workers);
576 // The worker Task pops itself from the queue;
577 return;
578 }
579 }
580
581 #if defined(PROFILING)
582 cap->r.rCCCS = CCS_IDLE;
583 #endif
584 last_free_capability[cap->node] = cap;
585 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
586 }
587
588 void
589 releaseCapability (Capability* cap USED_IF_THREADS)
590 {
591 ACQUIRE_LOCK(&cap->lock);
592 releaseCapability_(cap, false);
593 RELEASE_LOCK(&cap->lock);
594 }
595
596 void
597 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
598 {
599 ACQUIRE_LOCK(&cap->lock);
600 releaseCapability_(cap, true);
601 RELEASE_LOCK(&cap->lock);
602 }
603
604 static void
605 enqueueWorker (Capability* cap USED_IF_THREADS)
606 {
607 Task *task;
608
609 task = cap->running_task;
610
611 // If the Task is stopped, we shouldn't be yielding, we should
612 // be just exiting.
613 ASSERT(!task->stopped);
614 ASSERT(task->worker);
615
616 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
617 {
618 task->next = cap->spare_workers;
619 cap->spare_workers = task;
620 cap->n_spare_workers++;
621 }
622 else
623 {
624 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
625 cap->n_spare_workers);
626 releaseCapability_(cap,false);
627 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
628 workerTaskStop(task);
629 RELEASE_LOCK(&cap->lock);
630 shutdownThread();
631 }
632 }
633
634 #endif
635
636 /* ----------------------------------------------------------------------------
637 * waitForWorkerCapability(task)
638 *
639 * waits to be given a Capability, and then returns the Capability. The task
640 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
641 * ------------------------------------------------------------------------- */
642
643 #if defined(THREADED_RTS)
644
645 static Capability * waitForWorkerCapability (Task *task)
646 {
647 Capability *cap;
648
649 for (;;) {
650 ACQUIRE_LOCK(&task->lock);
651 // task->lock held, cap->lock not held
652 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
653 cap = task->cap;
654 task->wakeup = false;
655 RELEASE_LOCK(&task->lock);
656
657 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
658
659 ACQUIRE_LOCK(&cap->lock);
660 if (cap->running_task != NULL) {
661 debugTrace(DEBUG_sched,
662 "capability %d is owned by another task", cap->no);
663 RELEASE_LOCK(&cap->lock);
664 continue;
665 }
666
667 if (task->cap != cap) {
668 // see Note [migrated bound threads]
669 debugTrace(DEBUG_sched,
670 "task has been migrated to cap %d", task->cap->no);
671 RELEASE_LOCK(&cap->lock);
672 continue;
673 }
674
675 if (task->incall->tso == NULL) {
676 ASSERT(cap->spare_workers != NULL);
677 // if we're not at the front of the queue, release it
678 // again. This is unlikely to happen.
679 if (cap->spare_workers != task) {
680 giveCapabilityToTask(cap,cap->spare_workers);
681 RELEASE_LOCK(&cap->lock);
682 continue;
683 }
684 cap->spare_workers = task->next;
685 task->next = NULL;
686 cap->n_spare_workers--;
687 }
688
689 cap->running_task = task;
690 RELEASE_LOCK(&cap->lock);
691 break;
692 }
693
694 return cap;
695 }
696
697 #endif /* THREADED_RTS */
698
699 /* ----------------------------------------------------------------------------
700 * waitForReturnCapability (Task *task)
701 *
702 * The Task should be on the cap->returning_tasks queue of a Capability. This
703 * function waits for the Task to be woken up, and returns the Capability that
704 * it was woken up on.
705 *
706 * ------------------------------------------------------------------------- */
707
708 #if defined(THREADED_RTS)
709
710 static Capability * waitForReturnCapability (Task *task)
711 {
712 Capability *cap;
713
714 for (;;) {
715 ACQUIRE_LOCK(&task->lock);
716 // task->lock held, cap->lock not held
717 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
718 cap = task->cap;
719 task->wakeup = false;
720 RELEASE_LOCK(&task->lock);
721
722 // now check whether we should wake up...
723 ACQUIRE_LOCK(&cap->lock);
724 if (cap->running_task == NULL) {
725 if (cap->returning_tasks_hd != task) {
726 giveCapabilityToTask(cap,cap->returning_tasks_hd);
727 RELEASE_LOCK(&cap->lock);
728 continue;
729 }
730 cap->running_task = task;
731 popReturningTask(cap);
732 RELEASE_LOCK(&cap->lock);
733 break;
734 }
735 RELEASE_LOCK(&cap->lock);
736 }
737
738 return cap;
739 }
740
741 #endif /* THREADED_RTS */
742
743 /* ----------------------------------------------------------------------------
744 * waitForCapability (Capability **pCap, Task *task)
745 *
746 * Purpose: when an OS thread returns from an external call,
747 * it calls waitForCapability() (via Schedule.resumeThread())
748 * to wait for permission to enter the RTS & communicate the
749 * result of the external call back to the Haskell thread that
750 * made it.
751 *
752 * ------------------------------------------------------------------------- */
753
754 void waitForCapability (Capability **pCap, Task *task)
755 {
756 #if !defined(THREADED_RTS)
757
758 MainCapability.running_task = task;
759 task->cap = &MainCapability;
760 *pCap = &MainCapability;
761
762 #else
763 uint32_t i;
764 Capability *cap = *pCap;
765
766 if (cap == NULL) {
767 if (task->preferred_capability != -1) {
768 cap = capabilities[task->preferred_capability %
769 enabled_capabilities];
770 } else {
771 // Try last_free_capability first
772 cap = last_free_capability[task->node];
773 if (cap->running_task) {
774 // Otherwise, search for a free capability on this node.
775 cap = NULL;
776 for (i = task->node; i < enabled_capabilities;
777 i += n_numa_nodes) {
778 // visits all the capabilities on this node, because
779 // cap[i]->node == i % n_numa_nodes
780 if (!capabilities[i]->running_task) {
781 cap = capabilities[i];
782 break;
783 }
784 }
785 if (cap == NULL) {
786 // Can't find a free one, use last_free_capability.
787 cap = last_free_capability[task->node];
788 }
789 }
790 }
791
792 // record the Capability as the one this Task is now assocated with.
793 task->cap = cap;
794
795 } else {
796 ASSERT(task->cap == cap);
797 }
798
799 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
800
801 ACQUIRE_LOCK(&cap->lock);
802 if (!cap->running_task) {
803 // It's free; just grab it
804 cap->running_task = task;
805 RELEASE_LOCK(&cap->lock);
806 } else {
807 newReturningTask(cap,task);
808 RELEASE_LOCK(&cap->lock);
809 cap = waitForReturnCapability(task);
810 }
811
812 #if defined(PROFILING)
813 cap->r.rCCCS = CCS_SYSTEM;
814 #endif
815
816 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
817
818 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
819
820 *pCap = cap;
821 #endif
822 }
823
824 /* ----------------------------------------------------------------------------
825 * yieldCapability
826 *
827 * Give up the Capability, and return when we have it again. This is called
828 * when either we know that the Capability should be given to another Task, or
829 * there is nothing to do right now. One of the following is true:
830 *
831 * - The current Task is a worker, and there's a bound thread at the head of
832 * the run queue (or vice versa)
833 *
834 * - The run queue is empty. We'll be woken up again when there's work to
835 * do.
836 *
837 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
838 * We should become a GC worker for a while.
839 *
840 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
841 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
842 * setNumCapabilities. We should give up the Capability temporarily.
843 *
844 * ------------------------------------------------------------------------- */
845
846 #if defined (THREADED_RTS)
847
848 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
849 and return the bool */
850 bool /* Did we GC? */
851 yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
852 {
853 Capability *cap = *pCap;
854
855 if (gcAllowed)
856 {
857 PendingSync *sync = pending_sync;
858
859 if (sync && sync->type == SYNC_GC_PAR) {
860 if (! sync->idle[cap->no]) {
861 traceEventGcStart(cap);
862 gcWorkerThread(cap);
863 traceEventGcEnd(cap);
864 traceSparkCounters(cap);
865 // See Note [migrated bound threads 2]
866 if (task->cap == cap) {
867 return true;
868 }
869 }
870 }
871 }
872
873 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
874
875 // We must now release the capability and wait to be woken up again.
876 task->wakeup = false;
877
878 ACQUIRE_LOCK(&cap->lock);
879
880 // If this is a worker thread, put it on the spare_workers queue
881 if (isWorker(task)) {
882 enqueueWorker(cap);
883 }
884
885 releaseCapability_(cap, false);
886
887 if (isWorker(task) || isBoundTask(task)) {
888 RELEASE_LOCK(&cap->lock);
889 cap = waitForWorkerCapability(task);
890 } else {
891 // Not a worker Task, or a bound Task. The only way we can be woken up
892 // again is to put ourselves on the returning_tasks queue, so that's
893 // what we do. We still hold cap->lock at this point
894 // The Task waiting for this Capability does not have it
895 // yet, so we can be sure to be woken up later. (see #10545)
896 newReturningTask(cap,task);
897 RELEASE_LOCK(&cap->lock);
898 cap = waitForReturnCapability(task);
899 }
900
901 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
902 ASSERT(cap->running_task == task);
903
904 #if defined(PROFILING)
905 cap->r.rCCCS = CCS_SYSTEM;
906 #endif
907
908 *pCap = cap;
909
910 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
911
912 return false;
913 }
914
915 #endif /* THREADED_RTS */
916
917 // Note [migrated bound threads]
918 //
919 // There's a tricky case where:
920 // - cap A is running an unbound thread T1
921 // - there is a bound thread T2 at the head of the run queue on cap A
922 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
923 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
924 // - T1 blocks, the scheduler migrates T2 to cap B
925 // - the task bound to T2 wakes up on cap B
926 //
927 // We take advantage of the following invariant:
928 //
929 // - A bound thread can only be migrated by the holder of the
930 // Capability on which the bound thread currently lives. So, if we
931 // hold Capability C, and task->cap == C, then task cannot be
932 // migrated under our feet.
933
934 // Note [migrated bound threads 2]
935 //
936 // Second tricky case;
937 // - A bound Task becomes a GC thread
938 // - scheduleDoGC() migrates the thread belonging to this Task,
939 // because the Capability it is on is disabled
940 // - after GC, gcWorkerThread() returns, but now we are
941 // holding a Capability that is not the same as task->cap
942 // - Hence we must check for this case and immediately give up the
943 // cap we hold.
944
945 /* ----------------------------------------------------------------------------
946 * prodCapability
947 *
948 * If a Capability is currently idle, wake up a Task on it. Used to
949 * get every Capability into the GC.
950 * ------------------------------------------------------------------------- */
951
952 #if defined (THREADED_RTS)
953
954 void
955 prodCapability (Capability *cap, Task *task)
956 {
957 ACQUIRE_LOCK(&cap->lock);
958 if (!cap->running_task) {
959 cap->running_task = task;
960 releaseCapability_(cap,true);
961 }
962 RELEASE_LOCK(&cap->lock);
963 }
964
965 #endif /* THREADED_RTS */
966
967 /* ----------------------------------------------------------------------------
968 * tryGrabCapability
969 *
970 * Attempt to gain control of a Capability if it is free.
971 *
972 * ------------------------------------------------------------------------- */
973
974 #if defined (THREADED_RTS)
975
976 bool
977 tryGrabCapability (Capability *cap, Task *task)
978 {
979 int r;
980 if (cap->running_task != NULL) return false;
981 r = TRY_ACQUIRE_LOCK(&cap->lock);
982 if (r != 0) return false;
983 if (cap->running_task != NULL) {
984 RELEASE_LOCK(&cap->lock);
985 return false;
986 }
987 task->cap = cap;
988 cap->running_task = task;
989 RELEASE_LOCK(&cap->lock);
990 return true;
991 }
992
993
994 #endif /* THREADED_RTS */
995
996 /* ----------------------------------------------------------------------------
997 * shutdownCapability
998 *
999 * At shutdown time, we want to let everything exit as cleanly as
1000 * possible. For each capability, we let its run queue drain, and
1001 * allow the workers to stop.
1002 *
1003 * This function should be called when interrupted and
1004 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
1005 * will exit the scheduler and call taskStop(), and any bound thread
1006 * that wakes up will return to its caller. Runnable threads are
1007 * killed.
1008 *
1009 * ------------------------------------------------------------------------- */
1010
1011 static void
1012 shutdownCapability (Capability *cap USED_IF_THREADS,
1013 Task *task USED_IF_THREADS,
1014 bool safe USED_IF_THREADS)
1015 {
1016 #if defined(THREADED_RTS)
1017 uint32_t i;
1018
1019 task->cap = cap;
1020
1021 // Loop indefinitely until all the workers have exited and there
1022 // are no Haskell threads left. We used to bail out after 50
1023 // iterations of this loop, but that occasionally left a worker
1024 // running which caused problems later (the closeMutex() below
1025 // isn't safe, for one thing).
1026
1027 for (i = 0; /* i < 50 */; i++) {
1028 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1029
1030 debugTrace(DEBUG_sched,
1031 "shutting down capability %d, attempt %d", cap->no, i);
1032 ACQUIRE_LOCK(&cap->lock);
1033 if (cap->running_task) {
1034 RELEASE_LOCK(&cap->lock);
1035 debugTrace(DEBUG_sched, "not owner, yielding");
1036 yieldThread();
1037 continue;
1038 }
1039 cap->running_task = task;
1040
1041 if (cap->spare_workers) {
1042 // Look for workers that have died without removing
1043 // themselves from the list; this could happen if the OS
1044 // summarily killed the thread, for example. This
1045 // actually happens on Windows when the system is
1046 // terminating the program, and the RTS is running in a
1047 // DLL.
1048 Task *t, *prev;
1049 prev = NULL;
1050 for (t = cap->spare_workers; t != NULL; t = t->next) {
1051 if (!osThreadIsAlive(t->id)) {
1052 debugTrace(DEBUG_sched,
1053 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1054 cap->n_spare_workers--;
1055 if (!prev) {
1056 cap->spare_workers = t->next;
1057 } else {
1058 prev->next = t->next;
1059 }
1060 prev = t;
1061 }
1062 }
1063 }
1064
1065 if (!emptyRunQueue(cap) || cap->spare_workers) {
1066 debugTrace(DEBUG_sched,
1067 "runnable threads or workers still alive, yielding");
1068 releaseCapability_(cap,false); // this will wake up a worker
1069 RELEASE_LOCK(&cap->lock);
1070 yieldThread();
1071 continue;
1072 }
1073
1074 // If "safe", then busy-wait for any threads currently doing
1075 // foreign calls. If we're about to unload this DLL, for
1076 // example, we need to be sure that there are no OS threads
1077 // that will try to return to code that has been unloaded.
1078 // We can be a bit more relaxed when this is a standalone
1079 // program that is about to terminate, and let safe=false.
1080 if (cap->suspended_ccalls && safe) {
1081 debugTrace(DEBUG_sched,
1082 "thread(s) are involved in foreign calls, yielding");
1083 cap->running_task = NULL;
1084 RELEASE_LOCK(&cap->lock);
1085 // The IO manager thread might have been slow to start up,
1086 // so the first attempt to kill it might not have
1087 // succeeded. Just in case, try again - the kill message
1088 // will only be sent once.
1089 //
1090 // To reproduce this deadlock: run ffi002(threaded1)
1091 // repeatedly on a loaded machine.
1092 ioManagerDie();
1093 yieldThread();
1094 continue;
1095 }
1096
1097 traceSparkCounters(cap);
1098 RELEASE_LOCK(&cap->lock);
1099 break;
1100 }
1101 // we now have the Capability, its run queue and spare workers
1102 // list are both empty.
1103
1104 // ToDo: we can't drop this mutex, because there might still be
1105 // threads performing foreign calls that will eventually try to
1106 // return via resumeThread() and attempt to grab cap->lock.
1107 // closeMutex(&cap->lock);
1108 #endif
1109 }
1110
1111 void
1112 shutdownCapabilities(Task *task, bool safe)
1113 {
1114 uint32_t i;
1115 for (i=0; i < n_capabilities; i++) {
1116 ASSERT(task->incall->tso == NULL);
1117 shutdownCapability(capabilities[i], task, safe);
1118 }
1119 #if defined(THREADED_RTS)
1120 ASSERT(checkSparkCountInvariant());
1121 #endif
1122 }
1123
1124 static void
1125 freeCapability (Capability *cap)
1126 {
1127 stgFree(cap->mut_lists);
1128 stgFree(cap->saved_mut_lists);
1129 #if defined(THREADED_RTS)
1130 freeSparkPool(cap->sparks);
1131 #endif
1132 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1133 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1134 traceCapDelete(cap);
1135 }
1136
1137 void
1138 freeCapabilities (void)
1139 {
1140 #if defined(THREADED_RTS)
1141 uint32_t i;
1142 for (i=0; i < n_capabilities; i++) {
1143 freeCapability(capabilities[i]);
1144 if (capabilities[i] != &MainCapability)
1145 stgFree(capabilities[i]);
1146 }
1147 #else
1148 freeCapability(&MainCapability);
1149 #endif
1150 stgFree(capabilities);
1151 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1152 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1153 }
1154
1155 /* ---------------------------------------------------------------------------
1156 Mark everything directly reachable from the Capabilities. When
1157 using multiple GC threads, each GC thread marks all Capabilities
1158 for which (c `mod` n == 0), for Capability c and thread n.
1159 ------------------------------------------------------------------------ */
1160
1161 void
1162 markCapability (evac_fn evac, void *user, Capability *cap,
1163 bool no_mark_sparks USED_IF_THREADS)
1164 {
1165 InCall *incall;
1166
1167 // Each GC thread is responsible for following roots from the
1168 // Capability of the same number. There will usually be the same
1169 // or fewer Capabilities as GC threads, but just in case there
1170 // are more, we mark every Capability whose number is the GC
1171 // thread's index plus a multiple of the number of GC threads.
1172 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1173 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1174 #if defined(THREADED_RTS)
1175 evac(user, (StgClosure **)(void *)&cap->inbox);
1176 #endif
1177 for (incall = cap->suspended_ccalls; incall != NULL;
1178 incall=incall->next) {
1179 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1180 }
1181
1182 #if defined(THREADED_RTS)
1183 if (!no_mark_sparks) {
1184 traverseSparkQueue (evac, user, cap);
1185 }
1186 #endif
1187
1188 // Free STM structures for this Capability
1189 stmPreGCHook(cap);
1190 }
1191
1192 void
1193 markCapabilities (evac_fn evac, void *user)
1194 {
1195 uint32_t n;
1196 for (n = 0; n < n_capabilities; n++) {
1197 markCapability(evac, user, capabilities[n], false);
1198 }
1199 }
1200
1201 #if defined(THREADED_RTS)
1202 bool checkSparkCountInvariant (void)
1203 {
1204 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1205 StgWord64 remaining = 0;
1206 uint32_t i;
1207
1208 for (i = 0; i < n_capabilities; i++) {
1209 sparks.created += capabilities[i]->spark_stats.created;
1210 sparks.dud += capabilities[i]->spark_stats.dud;
1211 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1212 sparks.converted += capabilities[i]->spark_stats.converted;
1213 sparks.gcd += capabilities[i]->spark_stats.gcd;
1214 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1215 remaining += sparkPoolSize(capabilities[i]->sparks);
1216 }
1217
1218 /* The invariant is
1219 * created = converted + remaining + gcd + fizzled
1220 */
1221 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1222 "(created == converted + remaining + gcd + fizzled)",
1223 sparks.created, sparks.converted, remaining,
1224 sparks.gcd, sparks.fizzled);
1225
1226 return (sparks.created ==
1227 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1228
1229 }
1230 #endif
1231
1232 #if !defined(mingw32_HOST_OS)
1233 void
1234 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1235 #if defined(THREADED_RTS)
1236 if (cap_no < n_capabilities) {
1237 capabilities[cap_no]->io_manager_control_wr_fd = fd;
1238 } else {
1239 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1240 }
1241 #endif
1242 }
1243 #endif