rts: Handle SMALL_MUT_ARR_PTRS in checkClosure
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in a THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30
31 #if !defined(mingw32_HOST_OS)
32 #include "rts/IOManager.h" // for setIOManagerControlFd()
33 #endif
34
35 #include <string.h>
36
37 // one global capability, this is the Capability for non-threaded
38 // builds, and for +RTS -N1
39 Capability MainCapability;
40
41 uint32_t n_capabilities = 0;
42 uint32_t enabled_capabilities = 0;
43
44 // The array of Capabilities. It's important that when we need
45 // to allocate more Capabilities we don't have to move the existing
46 // Capabilities, because there may be pointers to them in use
47 // (e.g. threads in waitForCapability(), see #8209), so this is
48 // an array of Capability* rather than an array of Capability.
49 Capability **capabilities = NULL;
50
51 // Holds the Capability which last became free. This is used so that
52 // an in-call has a chance of quickly finding a free Capability.
53 // Maintaining a global free list of Capabilities would require global
54 // locking, so we don't do that.
55 static Capability *last_free_capability[MAX_NUMA_NODES];
56
57 /*
58 * Indicates that the RTS wants to synchronise all the Capabilities
59 * for some reason. All Capabilities should yieldCapability().
60 */
61 PendingSync * volatile pending_sync = 0;
62
63 // Number of logical NUMA nodes
64 uint32_t n_numa_nodes;
65
66 // Map logical NUMA node to OS node numbers
67 uint32_t numa_map[MAX_NUMA_NODES];
68
69 /* Let foreign code get the current Capability -- assuming there is one!
70 * This is useful for unsafe foreign calls because they are called with
71 * the current Capability held, but they are not passed it. For example,
72 * see see the integer-gmp package which calls allocate() in its
73 * stgAllocForGMP() function (which gets called by gmp functions).
74 * */
75 Capability * rts_unsafeGetMyCapability (void)
76 {
77 #if defined(THREADED_RTS)
78 return myTask()->cap;
79 #else
80 return &MainCapability;
81 #endif
82 }
83
84 #if defined(THREADED_RTS)
85 STATIC_INLINE bool
86 globalWorkToDo (void)
87 {
88 return sched_state >= SCHED_INTERRUPTING
89 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
90 }
91 #endif
92
93 #if defined(THREADED_RTS)
94 StgClosure *
95 findSpark (Capability *cap)
96 {
97 Capability *robbed;
98 StgClosurePtr spark;
99 bool retry;
100 uint32_t i = 0;
101
102 if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
103 // If there are other threads, don't try to run any new
104 // sparks: sparks might be speculative, we don't want to take
105 // resources away from the main computation.
106 return 0;
107 }
108
109 do {
110 retry = false;
111
112 // first try to get a spark from our own pool.
113 // We should be using reclaimSpark(), because it works without
114 // needing any atomic instructions:
115 // spark = reclaimSpark(cap->sparks);
116 // However, measurements show that this makes at least one benchmark
117 // slower (prsa) and doesn't affect the others.
118 spark = tryStealSpark(cap->sparks);
119 while (spark != NULL && fizzledSpark(spark)) {
120 cap->spark_stats.fizzled++;
121 traceEventSparkFizzle(cap);
122 spark = tryStealSpark(cap->sparks);
123 }
124 if (spark != NULL) {
125 cap->spark_stats.converted++;
126
127 // Post event for running a spark from capability's own pool.
128 traceEventSparkRun(cap);
129
130 return spark;
131 }
132 if (!emptySparkPoolCap(cap)) {
133 retry = true;
134 }
135
136 if (n_capabilities == 1) { return NULL; } // makes no sense...
137
138 debugTrace(DEBUG_sched,
139 "cap %d: Trying to steal work from other capabilities",
140 cap->no);
141
142 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
143 start at a random place instead of 0 as well. */
144 for ( i=0 ; i < n_capabilities ; i++ ) {
145 robbed = capabilities[i];
146 if (cap == robbed) // ourselves...
147 continue;
148
149 if (emptySparkPoolCap(robbed)) // nothing to steal here
150 continue;
151
152 spark = tryStealSpark(robbed->sparks);
153 while (spark != NULL && fizzledSpark(spark)) {
154 cap->spark_stats.fizzled++;
155 traceEventSparkFizzle(cap);
156 spark = tryStealSpark(robbed->sparks);
157 }
158 if (spark == NULL && !emptySparkPoolCap(robbed)) {
159 // we conflicted with another thread while trying to steal;
160 // try again later.
161 retry = true;
162 }
163
164 if (spark != NULL) {
165 cap->spark_stats.converted++;
166 traceEventSparkSteal(cap, robbed->no);
167
168 return spark;
169 }
170 // otherwise: no success, try next one
171 }
172 } while (retry);
173
174 debugTrace(DEBUG_sched, "No sparks stolen");
175 return NULL;
176 }
177
178 // Returns True if any spark pool is non-empty at this moment in time
179 // The result is only valid for an instant, of course, so in a sense
180 // is immediately invalid, and should not be relied upon for
181 // correctness.
182 bool
183 anySparks (void)
184 {
185 uint32_t i;
186
187 for (i=0; i < n_capabilities; i++) {
188 if (!emptySparkPoolCap(capabilities[i])) {
189 return true;
190 }
191 }
192 return false;
193 }
194 #endif
195
196 /* -----------------------------------------------------------------------------
197 * Manage the returning_tasks lists.
198 *
199 * These functions require cap->lock
200 * -------------------------------------------------------------------------- */
201
202 #if defined(THREADED_RTS)
203 STATIC_INLINE void
204 newReturningTask (Capability *cap, Task *task)
205 {
206 ASSERT_LOCK_HELD(&cap->lock);
207 ASSERT(task->next == NULL);
208 if (cap->returning_tasks_hd) {
209 ASSERT(cap->returning_tasks_tl->next == NULL);
210 cap->returning_tasks_tl->next = task;
211 } else {
212 cap->returning_tasks_hd = task;
213 }
214 cap->returning_tasks_tl = task;
215 cap->n_returning_tasks++;
216 ASSERT_RETURNING_TASKS(cap,task);
217 }
218
219 STATIC_INLINE Task *
220 popReturningTask (Capability *cap)
221 {
222 ASSERT_LOCK_HELD(&cap->lock);
223 Task *task;
224 task = cap->returning_tasks_hd;
225 ASSERT(task);
226 cap->returning_tasks_hd = task->next;
227 if (!cap->returning_tasks_hd) {
228 cap->returning_tasks_tl = NULL;
229 }
230 task->next = NULL;
231 cap->n_returning_tasks--;
232 ASSERT_RETURNING_TASKS(cap,task);
233 return task;
234 }
235 #endif
236
237 /* ----------------------------------------------------------------------------
238 * Initialisation
239 *
240 * The Capability is initially marked not free.
241 * ------------------------------------------------------------------------- */
242
243 static void
244 initCapability (Capability *cap, uint32_t i)
245 {
246 uint32_t g;
247
248 cap->no = i;
249 cap->node = capNoToNumaNode(i);
250 cap->in_haskell = false;
251 cap->idle = 0;
252 cap->disabled = false;
253
254 cap->run_queue_hd = END_TSO_QUEUE;
255 cap->run_queue_tl = END_TSO_QUEUE;
256 cap->n_run_queue = 0;
257
258 #if defined(THREADED_RTS)
259 initMutex(&cap->lock);
260 cap->running_task = NULL; // indicates cap is free
261 cap->spare_workers = NULL;
262 cap->n_spare_workers = 0;
263 cap->suspended_ccalls = NULL;
264 cap->n_suspended_ccalls = 0;
265 cap->returning_tasks_hd = NULL;
266 cap->returning_tasks_tl = NULL;
267 cap->n_returning_tasks = 0;
268 cap->inbox = (Message*)END_TSO_QUEUE;
269 cap->putMVars = NULL;
270 cap->sparks = allocSparkPool();
271 cap->spark_stats.created = 0;
272 cap->spark_stats.dud = 0;
273 cap->spark_stats.overflowed = 0;
274 cap->spark_stats.converted = 0;
275 cap->spark_stats.gcd = 0;
276 cap->spark_stats.fizzled = 0;
277 #if !defined(mingw32_HOST_OS)
278 cap->io_manager_control_wr_fd = -1;
279 #endif
280 #endif
281 cap->total_allocated = 0;
282
283 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
284 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
285 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
286
287 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
288 RtsFlags.GcFlags.generations,
289 "initCapability");
290 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
291 RtsFlags.GcFlags.generations,
292 "initCapability");
293
294 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
295 cap->mut_lists[g] = NULL;
296 }
297
298 cap->weak_ptr_list_hd = NULL;
299 cap->weak_ptr_list_tl = NULL;
300 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
301 cap->free_trec_chunks = END_STM_CHUNK_LIST;
302 cap->free_trec_headers = NO_TREC;
303 cap->transaction_tokens = 0;
304 cap->context_switch = 0;
305 cap->pinned_object_block = NULL;
306 cap->pinned_object_blocks = NULL;
307
308 #if defined(PROFILING)
309 cap->r.rCCCS = CCS_SYSTEM;
310 #else
311 cap->r.rCCCS = NULL;
312 #endif
313
314 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
315 // don't want it set when not running a Haskell thread.
316 cap->r.rCurrentTSO = NULL;
317
318 traceCapCreate(cap);
319 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
320 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
321 #if defined(THREADED_RTS)
322 traceSparkCounters(cap);
323 #endif
324 }
325
326 /* ---------------------------------------------------------------------------
327 * Function: initCapabilities()
328 *
329 * Purpose: set up the Capability handling. For the THREADED_RTS build,
330 * we keep a table of them, the size of which is
331 * controlled by the user via the RTS flag -N.
332 *
333 * ------------------------------------------------------------------------- */
334 void initCapabilities (void)
335 {
336 uint32_t i;
337
338 /* Declare a couple capability sets representing the process and
339 clock domain. Each capability will get added to these capsets. */
340 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
341 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
342
343 // Initialise NUMA
344 if (!RtsFlags.GcFlags.numa) {
345 n_numa_nodes = 1;
346 for (i = 0; i < MAX_NUMA_NODES; i++) {
347 numa_map[i] = 0;
348 }
349 } else {
350 uint32_t nNodes = osNumaNodes();
351 if (nNodes > MAX_NUMA_NODES) {
352 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
353 }
354 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
355 uint32_t logical = 0, physical = 0;
356 for (; physical < MAX_NUMA_NODES; physical++) {
357 if (mask & 1) {
358 numa_map[logical++] = physical;
359 }
360 mask = mask >> 1;
361 }
362 n_numa_nodes = logical;
363 if (logical == 0) {
364 barf("available NUMA node set is empty");
365 }
366 }
367
368 #if defined(THREADED_RTS)
369
370 #if !defined(REG_Base)
371 // We can't support multiple CPUs if BaseReg is not a register
372 if (RtsFlags.ParFlags.nCapabilities > 1) {
373 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
374 RtsFlags.ParFlags.nCapabilities = 1;
375 }
376 #endif
377
378 n_capabilities = 0;
379 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
380 n_capabilities = RtsFlags.ParFlags.nCapabilities;
381
382 #else /* !THREADED_RTS */
383
384 n_capabilities = 1;
385 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
386 capabilities[0] = &MainCapability;
387
388 initCapability(&MainCapability, 0);
389
390 #endif
391
392 enabled_capabilities = n_capabilities;
393
394 // There are no free capabilities to begin with. We will start
395 // a worker Task to each Capability, which will quickly put the
396 // Capability on the free list when it finds nothing to do.
397 for (i = 0; i < n_numa_nodes; i++) {
398 last_free_capability[i] = capabilities[0];
399 }
400 }
401
402 void
403 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
404 {
405 #if defined(THREADED_RTS)
406 uint32_t i;
407 Capability **old_capabilities = capabilities;
408
409 capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
410
411 if (to == 1) {
412 // THREADED_RTS must work on builds that don't have a mutable
413 // BaseReg (eg. unregisterised), so in this case
414 // capabilities[0] must coincide with &MainCapability.
415 capabilities[0] = &MainCapability;
416 initCapability(&MainCapability, 0);
417 }
418 else
419 {
420 for (i = 0; i < to; i++) {
421 if (i < from) {
422 capabilities[i] = old_capabilities[i];
423 } else {
424 capabilities[i] = stgMallocBytes(sizeof(Capability),
425 "moreCapabilities");
426 initCapability(capabilities[i], i);
427 }
428 }
429 }
430
431 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
432
433 if (old_capabilities != NULL) {
434 stgFree(old_capabilities);
435 }
436 #endif
437 }
438
439 /* ----------------------------------------------------------------------------
440 * setContextSwitches: cause all capabilities to context switch as
441 * soon as possible.
442 * ------------------------------------------------------------------------- */
443
444 void contextSwitchAllCapabilities(void)
445 {
446 uint32_t i;
447 for (i=0; i < n_capabilities; i++) {
448 contextSwitchCapability(capabilities[i]);
449 }
450 }
451
452 void interruptAllCapabilities(void)
453 {
454 uint32_t i;
455 for (i=0; i < n_capabilities; i++) {
456 interruptCapability(capabilities[i]);
457 }
458 }
459
460 /* ----------------------------------------------------------------------------
461 * Give a Capability to a Task. The task must currently be sleeping
462 * on its condition variable.
463 *
464 * Requires cap->lock (modifies cap->running_task).
465 *
466 * When migrating a Task, the migrater must take task->lock before
467 * modifying task->cap, to synchronise with the waking up Task.
468 * Additionally, the migrater should own the Capability (when
469 * migrating the run queue), or cap->lock (when migrating
470 * returning_workers).
471 *
472 * ------------------------------------------------------------------------- */
473
474 #if defined(THREADED_RTS)
475 static void
476 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
477 {
478 ASSERT_LOCK_HELD(&cap->lock);
479 ASSERT(task->cap == cap);
480 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
481 cap->no, task->incall->tso ? "bound task" : "worker",
482 serialisableTaskId(task));
483 ACQUIRE_LOCK(&task->lock);
484 if (task->wakeup == false) {
485 task->wakeup = true;
486 // the wakeup flag is needed because signalCondition() doesn't
487 // flag the condition if the thread is already runniing, but we want
488 // it to be sticky.
489 signalCondition(&task->cond);
490 }
491 RELEASE_LOCK(&task->lock);
492 }
493 #endif
494
495 /* ----------------------------------------------------------------------------
496 * releaseCapability
497 *
498 * The current Task (cap->task) releases the Capability. The Capability is
499 * marked free, and if there is any work to do, an appropriate Task is woken up.
500 *
501 * N.B. May need to take all_tasks_mutex.
502 *
503 * ------------------------------------------------------------------------- */
504
505 #if defined(THREADED_RTS)
506 void
507 releaseCapability_ (Capability* cap,
508 bool always_wakeup)
509 {
510 Task *task;
511
512 task = cap->running_task;
513
514 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
515 ASSERT_RETURNING_TASKS(cap,task);
516
517 cap->running_task = NULL;
518
519 // Check to see whether a worker thread can be given
520 // the go-ahead to return the result of an external call..
521 if (cap->n_returning_tasks != 0) {
522 giveCapabilityToTask(cap,cap->returning_tasks_hd);
523 // The Task pops itself from the queue (see waitForCapability())
524 return;
525 }
526
527 // If there is a pending sync, then we should just leave the Capability
528 // free. The thread trying to sync will be about to call
529 // waitForCapability().
530 //
531 // Note: this is *after* we check for a returning task above,
532 // because the task attempting to acquire all the capabilities may
533 // be currently in waitForCapability() waiting for this
534 // capability, in which case simply setting it as free would not
535 // wake up the waiting task.
536 PendingSync *sync = pending_sync;
537 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
538 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
539 return;
540 }
541
542 // If the next thread on the run queue is a bound thread,
543 // give this Capability to the appropriate Task.
544 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
545 // Make sure we're not about to try to wake ourselves up
546 // ASSERT(task != cap->run_queue_hd->bound);
547 // assertion is false: in schedule() we force a yield after
548 // ThreadBlocked, but the thread may be back on the run queue
549 // by now.
550 task = peekRunQueue(cap)->bound->task;
551 giveCapabilityToTask(cap, task);
552 return;
553 }
554
555 if (!cap->spare_workers) {
556 // Create a worker thread if we don't have one. If the system
557 // is interrupted, we only create a worker task if there
558 // are threads that need to be completed. If the system is
559 // shutting down, we never create a new worker.
560 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
561 debugTrace(DEBUG_sched,
562 "starting new worker on capability %d", cap->no);
563 startWorkerTask(cap);
564 return;
565 }
566 }
567
568 // If we have an unbound thread on the run queue, or if there's
569 // anything else to do, give the Capability to a worker thread.
570 if (always_wakeup ||
571 !emptyRunQueue(cap) || !emptyInbox(cap) ||
572 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
573 if (cap->spare_workers) {
574 giveCapabilityToTask(cap, cap->spare_workers);
575 // The worker Task pops itself from the queue;
576 return;
577 }
578 }
579
580 #if defined(PROFILING)
581 cap->r.rCCCS = CCS_IDLE;
582 #endif
583 last_free_capability[cap->node] = cap;
584 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
585 }
586
587 void
588 releaseCapability (Capability* cap USED_IF_THREADS)
589 {
590 ACQUIRE_LOCK(&cap->lock);
591 releaseCapability_(cap, false);
592 RELEASE_LOCK(&cap->lock);
593 }
594
595 void
596 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
597 {
598 ACQUIRE_LOCK(&cap->lock);
599 releaseCapability_(cap, true);
600 RELEASE_LOCK(&cap->lock);
601 }
602
603 static void
604 enqueueWorker (Capability* cap USED_IF_THREADS)
605 {
606 Task *task;
607
608 task = cap->running_task;
609
610 // If the Task is stopped, we shouldn't be yielding, we should
611 // be just exiting.
612 ASSERT(!task->stopped);
613 ASSERT(task->worker);
614
615 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
616 {
617 task->next = cap->spare_workers;
618 cap->spare_workers = task;
619 cap->n_spare_workers++;
620 }
621 else
622 {
623 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
624 cap->n_spare_workers);
625 releaseCapability_(cap,false);
626 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
627 workerTaskStop(task);
628 RELEASE_LOCK(&cap->lock);
629 shutdownThread();
630 }
631 }
632
633 #endif
634
635 /* ----------------------------------------------------------------------------
636 * waitForWorkerCapability(task)
637 *
638 * waits to be given a Capability, and then returns the Capability. The task
639 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
640 * ------------------------------------------------------------------------- */
641
642 #if defined(THREADED_RTS)
643
644 static Capability * waitForWorkerCapability (Task *task)
645 {
646 Capability *cap;
647
648 for (;;) {
649 ACQUIRE_LOCK(&task->lock);
650 // task->lock held, cap->lock not held
651 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
652 cap = task->cap;
653 task->wakeup = false;
654 RELEASE_LOCK(&task->lock);
655
656 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
657
658 ACQUIRE_LOCK(&cap->lock);
659 if (cap->running_task != NULL) {
660 debugTrace(DEBUG_sched,
661 "capability %d is owned by another task", cap->no);
662 RELEASE_LOCK(&cap->lock);
663 continue;
664 }
665
666 if (task->cap != cap) {
667 // see Note [migrated bound threads]
668 debugTrace(DEBUG_sched,
669 "task has been migrated to cap %d", task->cap->no);
670 RELEASE_LOCK(&cap->lock);
671 continue;
672 }
673
674 if (task->incall->tso == NULL) {
675 ASSERT(cap->spare_workers != NULL);
676 // if we're not at the front of the queue, release it
677 // again. This is unlikely to happen.
678 if (cap->spare_workers != task) {
679 giveCapabilityToTask(cap,cap->spare_workers);
680 RELEASE_LOCK(&cap->lock);
681 continue;
682 }
683 cap->spare_workers = task->next;
684 task->next = NULL;
685 cap->n_spare_workers--;
686 }
687
688 cap->running_task = task;
689 RELEASE_LOCK(&cap->lock);
690 break;
691 }
692
693 return cap;
694 }
695
696 #endif /* THREADED_RTS */
697
698 /* ----------------------------------------------------------------------------
699 * waitForReturnCapability (Task *task)
700 *
701 * The Task should be on the cap->returning_tasks queue of a Capability. This
702 * function waits for the Task to be woken up, and returns the Capability that
703 * it was woken up on.
704 *
705 * ------------------------------------------------------------------------- */
706
707 #if defined(THREADED_RTS)
708
709 static Capability * waitForReturnCapability (Task *task)
710 {
711 Capability *cap;
712
713 for (;;) {
714 ACQUIRE_LOCK(&task->lock);
715 // task->lock held, cap->lock not held
716 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
717 cap = task->cap;
718 task->wakeup = false;
719 RELEASE_LOCK(&task->lock);
720
721 // now check whether we should wake up...
722 ACQUIRE_LOCK(&cap->lock);
723 if (cap->running_task == NULL) {
724 if (cap->returning_tasks_hd != task) {
725 giveCapabilityToTask(cap,cap->returning_tasks_hd);
726 RELEASE_LOCK(&cap->lock);
727 continue;
728 }
729 cap->running_task = task;
730 popReturningTask(cap);
731 RELEASE_LOCK(&cap->lock);
732 break;
733 }
734 RELEASE_LOCK(&cap->lock);
735 }
736
737 return cap;
738 }
739
740 #endif /* THREADED_RTS */
741
742 /* ----------------------------------------------------------------------------
743 * waitForCapability (Capability **pCap, Task *task)
744 *
745 * Purpose: when an OS thread returns from an external call,
746 * it calls waitForCapability() (via Schedule.resumeThread())
747 * to wait for permission to enter the RTS & communicate the
748 * result of the external call back to the Haskell thread that
749 * made it.
750 *
751 * ------------------------------------------------------------------------- */
752
753 void waitForCapability (Capability **pCap, Task *task)
754 {
755 #if !defined(THREADED_RTS)
756
757 MainCapability.running_task = task;
758 task->cap = &MainCapability;
759 *pCap = &MainCapability;
760
761 #else
762 uint32_t i;
763 Capability *cap = *pCap;
764
765 if (cap == NULL) {
766 if (task->preferred_capability != -1) {
767 cap = capabilities[task->preferred_capability %
768 enabled_capabilities];
769 } else {
770 // Try last_free_capability first
771 cap = last_free_capability[task->node];
772 if (cap->running_task) {
773 // Otherwise, search for a free capability on this node.
774 cap = NULL;
775 for (i = task->node; i < enabled_capabilities;
776 i += n_numa_nodes) {
777 // visits all the capabilities on this node, because
778 // cap[i]->node == i % n_numa_nodes
779 if (!capabilities[i]->running_task) {
780 cap = capabilities[i];
781 break;
782 }
783 }
784 if (cap == NULL) {
785 // Can't find a free one, use last_free_capability.
786 cap = last_free_capability[task->node];
787 }
788 }
789 }
790
791 // record the Capability as the one this Task is now assocated with.
792 task->cap = cap;
793
794 } else {
795 ASSERT(task->cap == cap);
796 }
797
798 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
799
800 ACQUIRE_LOCK(&cap->lock);
801 if (!cap->running_task) {
802 // It's free; just grab it
803 cap->running_task = task;
804 RELEASE_LOCK(&cap->lock);
805 } else {
806 newReturningTask(cap,task);
807 RELEASE_LOCK(&cap->lock);
808 cap = waitForReturnCapability(task);
809 }
810
811 #if defined(PROFILING)
812 cap->r.rCCCS = CCS_SYSTEM;
813 #endif
814
815 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
816
817 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
818
819 *pCap = cap;
820 #endif
821 }
822
823 /* ----------------------------------------------------------------------------
824 * yieldCapability
825 *
826 * Give up the Capability, and return when we have it again. This is called
827 * when either we know that the Capability should be given to another Task, or
828 * there is nothing to do right now. One of the following is true:
829 *
830 * - The current Task is a worker, and there's a bound thread at the head of
831 * the run queue (or vice versa)
832 *
833 * - The run queue is empty. We'll be woken up again when there's work to
834 * do.
835 *
836 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
837 * We should become a GC worker for a while.
838 *
839 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
840 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
841 * setNumCapabilities. We should give up the Capability temporarily.
842 *
843 * ------------------------------------------------------------------------- */
844
845 #if defined (THREADED_RTS)
846
847 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
848 and return the bool */
849 bool /* Did we GC? */
850 yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
851 {
852 Capability *cap = *pCap;
853
854 if (gcAllowed)
855 {
856 PendingSync *sync = pending_sync;
857
858 if (sync && sync->type == SYNC_GC_PAR) {
859 if (! sync->idle[cap->no]) {
860 traceEventGcStart(cap);
861 gcWorkerThread(cap);
862 traceEventGcEnd(cap);
863 traceSparkCounters(cap);
864 // See Note [migrated bound threads 2]
865 if (task->cap == cap) {
866 return true;
867 }
868 }
869 }
870 }
871
872 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
873
874 // We must now release the capability and wait to be woken up again.
875 task->wakeup = false;
876
877 ACQUIRE_LOCK(&cap->lock);
878
879 // If this is a worker thread, put it on the spare_workers queue
880 if (isWorker(task)) {
881 enqueueWorker(cap);
882 }
883
884 releaseCapability_(cap, false);
885
886 if (isWorker(task) || isBoundTask(task)) {
887 RELEASE_LOCK(&cap->lock);
888 cap = waitForWorkerCapability(task);
889 } else {
890 // Not a worker Task, or a bound Task. The only way we can be woken up
891 // again is to put ourselves on the returning_tasks queue, so that's
892 // what we do. We still hold cap->lock at this point
893 // The Task waiting for this Capability does not have it
894 // yet, so we can be sure to be woken up later. (see #10545)
895 newReturningTask(cap,task);
896 RELEASE_LOCK(&cap->lock);
897 cap = waitForReturnCapability(task);
898 }
899
900 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
901 ASSERT(cap->running_task == task);
902
903 #if defined(PROFILING)
904 cap->r.rCCCS = CCS_SYSTEM;
905 #endif
906
907 *pCap = cap;
908
909 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
910
911 return false;
912 }
913
914 #endif /* THREADED_RTS */
915
916 // Note [migrated bound threads]
917 //
918 // There's a tricky case where:
919 // - cap A is running an unbound thread T1
920 // - there is a bound thread T2 at the head of the run queue on cap A
921 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
922 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
923 // - T1 blocks, the scheduler migrates T2 to cap B
924 // - the task bound to T2 wakes up on cap B
925 //
926 // We take advantage of the following invariant:
927 //
928 // - A bound thread can only be migrated by the holder of the
929 // Capability on which the bound thread currently lives. So, if we
930 // hold Capability C, and task->cap == C, then task cannot be
931 // migrated under our feet.
932
933 // Note [migrated bound threads 2]
934 //
935 // Second tricky case;
936 // - A bound Task becomes a GC thread
937 // - scheduleDoGC() migrates the thread belonging to this Task,
938 // because the Capability it is on is disabled
939 // - after GC, gcWorkerThread() returns, but now we are
940 // holding a Capability that is not the same as task->cap
941 // - Hence we must check for this case and immediately give up the
942 // cap we hold.
943
944 /* ----------------------------------------------------------------------------
945 * prodCapability
946 *
947 * If a Capability is currently idle, wake up a Task on it. Used to
948 * get every Capability into the GC.
949 * ------------------------------------------------------------------------- */
950
951 #if defined (THREADED_RTS)
952
953 void
954 prodCapability (Capability *cap, Task *task)
955 {
956 ACQUIRE_LOCK(&cap->lock);
957 if (!cap->running_task) {
958 cap->running_task = task;
959 releaseCapability_(cap,true);
960 }
961 RELEASE_LOCK(&cap->lock);
962 }
963
964 #endif /* THREADED_RTS */
965
966 /* ----------------------------------------------------------------------------
967 * tryGrabCapability
968 *
969 * Attempt to gain control of a Capability if it is free.
970 *
971 * ------------------------------------------------------------------------- */
972
973 #if defined (THREADED_RTS)
974
975 bool
976 tryGrabCapability (Capability *cap, Task *task)
977 {
978 int r;
979 if (cap->running_task != NULL) return false;
980 r = TRY_ACQUIRE_LOCK(&cap->lock);
981 if (r != 0) return false;
982 if (cap->running_task != NULL) {
983 RELEASE_LOCK(&cap->lock);
984 return false;
985 }
986 task->cap = cap;
987 cap->running_task = task;
988 RELEASE_LOCK(&cap->lock);
989 return true;
990 }
991
992
993 #endif /* THREADED_RTS */
994
995 /* ----------------------------------------------------------------------------
996 * shutdownCapability
997 *
998 * At shutdown time, we want to let everything exit as cleanly as
999 * possible. For each capability, we let its run queue drain, and
1000 * allow the workers to stop.
1001 *
1002 * This function should be called when interrupted and
1003 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
1004 * will exit the scheduler and call taskStop(), and any bound thread
1005 * that wakes up will return to its caller. Runnable threads are
1006 * killed.
1007 *
1008 * ------------------------------------------------------------------------- */
1009
1010 static void
1011 shutdownCapability (Capability *cap USED_IF_THREADS,
1012 Task *task USED_IF_THREADS,
1013 bool safe USED_IF_THREADS)
1014 {
1015 #if defined(THREADED_RTS)
1016 uint32_t i;
1017
1018 task->cap = cap;
1019
1020 // Loop indefinitely until all the workers have exited and there
1021 // are no Haskell threads left. We used to bail out after 50
1022 // iterations of this loop, but that occasionally left a worker
1023 // running which caused problems later (the closeMutex() below
1024 // isn't safe, for one thing).
1025
1026 for (i = 0; /* i < 50 */; i++) {
1027 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1028
1029 debugTrace(DEBUG_sched,
1030 "shutting down capability %d, attempt %d", cap->no, i);
1031 ACQUIRE_LOCK(&cap->lock);
1032 if (cap->running_task) {
1033 RELEASE_LOCK(&cap->lock);
1034 debugTrace(DEBUG_sched, "not owner, yielding");
1035 yieldThread();
1036 continue;
1037 }
1038 cap->running_task = task;
1039
1040 if (cap->spare_workers) {
1041 // Look for workers that have died without removing
1042 // themselves from the list; this could happen if the OS
1043 // summarily killed the thread, for example. This
1044 // actually happens on Windows when the system is
1045 // terminating the program, and the RTS is running in a
1046 // DLL.
1047 Task *t, *prev;
1048 prev = NULL;
1049 for (t = cap->spare_workers; t != NULL; t = t->next) {
1050 if (!osThreadIsAlive(t->id)) {
1051 debugTrace(DEBUG_sched,
1052 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1053 cap->n_spare_workers--;
1054 if (!prev) {
1055 cap->spare_workers = t->next;
1056 } else {
1057 prev->next = t->next;
1058 }
1059 prev = t;
1060 }
1061 }
1062 }
1063
1064 if (!emptyRunQueue(cap) || cap->spare_workers) {
1065 debugTrace(DEBUG_sched,
1066 "runnable threads or workers still alive, yielding");
1067 releaseCapability_(cap,false); // this will wake up a worker
1068 RELEASE_LOCK(&cap->lock);
1069 yieldThread();
1070 continue;
1071 }
1072
1073 // If "safe", then busy-wait for any threads currently doing
1074 // foreign calls. If we're about to unload this DLL, for
1075 // example, we need to be sure that there are no OS threads
1076 // that will try to return to code that has been unloaded.
1077 // We can be a bit more relaxed when this is a standalone
1078 // program that is about to terminate, and let safe=false.
1079 if (cap->suspended_ccalls && safe) {
1080 debugTrace(DEBUG_sched,
1081 "thread(s) are involved in foreign calls, yielding");
1082 cap->running_task = NULL;
1083 RELEASE_LOCK(&cap->lock);
1084 // The IO manager thread might have been slow to start up,
1085 // so the first attempt to kill it might not have
1086 // succeeded. Just in case, try again - the kill message
1087 // will only be sent once.
1088 //
1089 // To reproduce this deadlock: run ffi002(threaded1)
1090 // repeatedly on a loaded machine.
1091 ioManagerDie();
1092 yieldThread();
1093 continue;
1094 }
1095
1096 traceSparkCounters(cap);
1097 RELEASE_LOCK(&cap->lock);
1098 break;
1099 }
1100 // we now have the Capability, its run queue and spare workers
1101 // list are both empty.
1102
1103 // ToDo: we can't drop this mutex, because there might still be
1104 // threads performing foreign calls that will eventually try to
1105 // return via resumeThread() and attempt to grab cap->lock.
1106 // closeMutex(&cap->lock);
1107 #endif
1108 }
1109
1110 void
1111 shutdownCapabilities(Task *task, bool safe)
1112 {
1113 uint32_t i;
1114 for (i=0; i < n_capabilities; i++) {
1115 ASSERT(task->incall->tso == NULL);
1116 shutdownCapability(capabilities[i], task, safe);
1117 }
1118 #if defined(THREADED_RTS)
1119 ASSERT(checkSparkCountInvariant());
1120 #endif
1121 }
1122
1123 static void
1124 freeCapability (Capability *cap)
1125 {
1126 stgFree(cap->mut_lists);
1127 stgFree(cap->saved_mut_lists);
1128 #if defined(THREADED_RTS)
1129 freeSparkPool(cap->sparks);
1130 #endif
1131 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1132 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1133 traceCapDelete(cap);
1134 }
1135
1136 void
1137 freeCapabilities (void)
1138 {
1139 #if defined(THREADED_RTS)
1140 uint32_t i;
1141 for (i=0; i < n_capabilities; i++) {
1142 freeCapability(capabilities[i]);
1143 if (capabilities[i] != &MainCapability)
1144 stgFree(capabilities[i]);
1145 }
1146 #else
1147 freeCapability(&MainCapability);
1148 #endif
1149 stgFree(capabilities);
1150 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1151 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1152 }
1153
1154 /* ---------------------------------------------------------------------------
1155 Mark everything directly reachable from the Capabilities. When
1156 using multiple GC threads, each GC thread marks all Capabilities
1157 for which (c `mod` n == 0), for Capability c and thread n.
1158 ------------------------------------------------------------------------ */
1159
1160 void
1161 markCapability (evac_fn evac, void *user, Capability *cap,
1162 bool no_mark_sparks USED_IF_THREADS)
1163 {
1164 InCall *incall;
1165
1166 // Each GC thread is responsible for following roots from the
1167 // Capability of the same number. There will usually be the same
1168 // or fewer Capabilities as GC threads, but just in case there
1169 // are more, we mark every Capability whose number is the GC
1170 // thread's index plus a multiple of the number of GC threads.
1171 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1172 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1173 #if defined(THREADED_RTS)
1174 evac(user, (StgClosure **)(void *)&cap->inbox);
1175 #endif
1176 for (incall = cap->suspended_ccalls; incall != NULL;
1177 incall=incall->next) {
1178 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1179 }
1180
1181 #if defined(THREADED_RTS)
1182 if (!no_mark_sparks) {
1183 traverseSparkQueue (evac, user, cap);
1184 }
1185 #endif
1186
1187 // Free STM structures for this Capability
1188 stmPreGCHook(cap);
1189 }
1190
1191 void
1192 markCapabilities (evac_fn evac, void *user)
1193 {
1194 uint32_t n;
1195 for (n = 0; n < n_capabilities; n++) {
1196 markCapability(evac, user, capabilities[n], false);
1197 }
1198 }
1199
1200 #if defined(THREADED_RTS)
1201 bool checkSparkCountInvariant (void)
1202 {
1203 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1204 StgWord64 remaining = 0;
1205 uint32_t i;
1206
1207 for (i = 0; i < n_capabilities; i++) {
1208 sparks.created += capabilities[i]->spark_stats.created;
1209 sparks.dud += capabilities[i]->spark_stats.dud;
1210 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1211 sparks.converted += capabilities[i]->spark_stats.converted;
1212 sparks.gcd += capabilities[i]->spark_stats.gcd;
1213 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1214 remaining += sparkPoolSize(capabilities[i]->sparks);
1215 }
1216
1217 /* The invariant is
1218 * created = converted + remaining + gcd + fizzled
1219 */
1220 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1221 "(created == converted + remaining + gcd + fizzled)",
1222 sparks.created, sparks.converted, remaining,
1223 sparks.gcd, sparks.fizzled);
1224
1225 return (sparks.created ==
1226 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1227
1228 }
1229 #endif
1230
1231 #if !defined(mingw32_HOST_OS)
1232 void
1233 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1234 #if defined(THREADED_RTS)
1235 if (cap_no < n_capabilities) {
1236 capabilities[cap_no]->io_manager_control_wr_fd = fd;
1237 } else {
1238 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1239 }
1240 #endif
1241 }
1242 #endif