Linker: some extra debugging / logging
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30
31 #if !defined(mingw32_HOST_OS)
32 #include "rts/IOManager.h" // for setIOManagerControlFd()
33 #endif
34
35 #include <string.h>
36
37 // one global capability, this is the Capability for non-threaded
38 // builds, and for +RTS -N1
39 Capability MainCapability;
40
41 uint32_t n_capabilities = 0;
42 uint32_t enabled_capabilities = 0;
43
44 // The array of Capabilities. It's important that when we need
45 // to allocate more Capabilities we don't have to move the existing
46 // Capabilities, because there may be pointers to them in use
47 // (e.g. threads in waitForCapability(), see #8209), so this is
48 // an array of Capability* rather than an array of Capability.
49 Capability **capabilities = NULL;
50
51 // Holds the Capability which last became free. This is used so that
52 // an in-call has a chance of quickly finding a free Capability.
53 // Maintaining a global free list of Capabilities would require global
54 // locking, so we don't do that.
55 static Capability *last_free_capability[MAX_NUMA_NODES];
56
57 /*
58 * Indicates that the RTS wants to synchronise all the Capabilities
59 * for some reason. All Capabilities should yieldCapability().
60 */
61 PendingSync * volatile pending_sync = 0;
62
63 // Number of logical NUMA nodes
64 uint32_t n_numa_nodes;
65
66 // Map logical NUMA node to OS node numbers
67 uint32_t numa_map[MAX_NUMA_NODES];
68
69 /* Let foreign code get the current Capability -- assuming there is one!
70 * This is useful for unsafe foreign calls because they are called with
71 * the current Capability held, but they are not passed it. For example,
72 * see see the integer-gmp package which calls allocate() in its
73 * stgAllocForGMP() function (which gets called by gmp functions).
74 * */
75 Capability * rts_unsafeGetMyCapability (void)
76 {
77 #if defined(THREADED_RTS)
78 return myTask()->cap;
79 #else
80 return &MainCapability;
81 #endif
82 }
83
84 #if defined(THREADED_RTS)
85 STATIC_INLINE rtsBool
86 globalWorkToDo (void)
87 {
88 return sched_state >= SCHED_INTERRUPTING
89 || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
90 }
91 #endif
92
93 #if defined(THREADED_RTS)
94 StgClosure *
95 findSpark (Capability *cap)
96 {
97 Capability *robbed;
98 StgClosurePtr spark;
99 rtsBool retry;
100 uint32_t i = 0;
101
102 if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
103 // If there are other threads, don't try to run any new
104 // sparks: sparks might be speculative, we don't want to take
105 // resources away from the main computation.
106 return 0;
107 }
108
109 do {
110 retry = rtsFalse;
111
112 // first try to get a spark from our own pool.
113 // We should be using reclaimSpark(), because it works without
114 // needing any atomic instructions:
115 // spark = reclaimSpark(cap->sparks);
116 // However, measurements show that this makes at least one benchmark
117 // slower (prsa) and doesn't affect the others.
118 spark = tryStealSpark(cap->sparks);
119 while (spark != NULL && fizzledSpark(spark)) {
120 cap->spark_stats.fizzled++;
121 traceEventSparkFizzle(cap);
122 spark = tryStealSpark(cap->sparks);
123 }
124 if (spark != NULL) {
125 cap->spark_stats.converted++;
126
127 // Post event for running a spark from capability's own pool.
128 traceEventSparkRun(cap);
129
130 return spark;
131 }
132 if (!emptySparkPoolCap(cap)) {
133 retry = rtsTrue;
134 }
135
136 if (n_capabilities == 1) { return NULL; } // makes no sense...
137
138 debugTrace(DEBUG_sched,
139 "cap %d: Trying to steal work from other capabilities",
140 cap->no);
141
142 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
143 start at a random place instead of 0 as well. */
144 for ( i=0 ; i < n_capabilities ; i++ ) {
145 robbed = capabilities[i];
146 if (cap == robbed) // ourselves...
147 continue;
148
149 if (emptySparkPoolCap(robbed)) // nothing to steal here
150 continue;
151
152 spark = tryStealSpark(robbed->sparks);
153 while (spark != NULL && fizzledSpark(spark)) {
154 cap->spark_stats.fizzled++;
155 traceEventSparkFizzle(cap);
156 spark = tryStealSpark(robbed->sparks);
157 }
158 if (spark == NULL && !emptySparkPoolCap(robbed)) {
159 // we conflicted with another thread while trying to steal;
160 // try again later.
161 retry = rtsTrue;
162 }
163
164 if (spark != NULL) {
165 cap->spark_stats.converted++;
166 traceEventSparkSteal(cap, robbed->no);
167
168 return spark;
169 }
170 // otherwise: no success, try next one
171 }
172 } while (retry);
173
174 debugTrace(DEBUG_sched, "No sparks stolen");
175 return NULL;
176 }
177
178 // Returns True if any spark pool is non-empty at this moment in time
179 // The result is only valid for an instant, of course, so in a sense
180 // is immediately invalid, and should not be relied upon for
181 // correctness.
182 rtsBool
183 anySparks (void)
184 {
185 uint32_t i;
186
187 for (i=0; i < n_capabilities; i++) {
188 if (!emptySparkPoolCap(capabilities[i])) {
189 return rtsTrue;
190 }
191 }
192 return rtsFalse;
193 }
194 #endif
195
196 /* -----------------------------------------------------------------------------
197 * Manage the returning_tasks lists.
198 *
199 * These functions require cap->lock
200 * -------------------------------------------------------------------------- */
201
202 #if defined(THREADED_RTS)
203 STATIC_INLINE void
204 newReturningTask (Capability *cap, Task *task)
205 {
206 ASSERT_LOCK_HELD(&cap->lock);
207 ASSERT(task->next == NULL);
208 if (cap->returning_tasks_hd) {
209 ASSERT(cap->returning_tasks_tl->next == NULL);
210 cap->returning_tasks_tl->next = task;
211 } else {
212 cap->returning_tasks_hd = task;
213 }
214 cap->returning_tasks_tl = task;
215 }
216
217 STATIC_INLINE Task *
218 popReturningTask (Capability *cap)
219 {
220 ASSERT_LOCK_HELD(&cap->lock);
221 Task *task;
222 task = cap->returning_tasks_hd;
223 ASSERT(task);
224 cap->returning_tasks_hd = task->next;
225 if (!cap->returning_tasks_hd) {
226 cap->returning_tasks_tl = NULL;
227 }
228 task->next = NULL;
229 return task;
230 }
231 #endif
232
233 /* ----------------------------------------------------------------------------
234 * Initialisation
235 *
236 * The Capability is initially marked not free.
237 * ------------------------------------------------------------------------- */
238
239 static void
240 initCapability (Capability *cap, uint32_t i)
241 {
242 uint32_t g;
243
244 cap->no = i;
245 cap->node = capNoToNumaNode(i);
246 cap->in_haskell = rtsFalse;
247 cap->idle = 0;
248 cap->disabled = rtsFalse;
249
250 cap->run_queue_hd = END_TSO_QUEUE;
251 cap->run_queue_tl = END_TSO_QUEUE;
252
253 #if defined(THREADED_RTS)
254 initMutex(&cap->lock);
255 cap->running_task = NULL; // indicates cap is free
256 cap->spare_workers = NULL;
257 cap->n_spare_workers = 0;
258 cap->suspended_ccalls = NULL;
259 cap->returning_tasks_hd = NULL;
260 cap->returning_tasks_tl = NULL;
261 cap->inbox = (Message*)END_TSO_QUEUE;
262 cap->sparks = allocSparkPool();
263 cap->spark_stats.created = 0;
264 cap->spark_stats.dud = 0;
265 cap->spark_stats.overflowed = 0;
266 cap->spark_stats.converted = 0;
267 cap->spark_stats.gcd = 0;
268 cap->spark_stats.fizzled = 0;
269 #if !defined(mingw32_HOST_OS)
270 cap->io_manager_control_wr_fd = -1;
271 #endif
272 #endif
273 cap->total_allocated = 0;
274
275 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
276 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
277 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
278
279 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
280 RtsFlags.GcFlags.generations,
281 "initCapability");
282 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
283 RtsFlags.GcFlags.generations,
284 "initCapability");
285
286 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
287 cap->mut_lists[g] = NULL;
288 }
289
290 cap->weak_ptr_list_hd = NULL;
291 cap->weak_ptr_list_tl = NULL;
292 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
293 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
294 cap->free_trec_chunks = END_STM_CHUNK_LIST;
295 cap->free_trec_headers = NO_TREC;
296 cap->transaction_tokens = 0;
297 cap->context_switch = 0;
298 cap->pinned_object_block = NULL;
299 cap->pinned_object_blocks = NULL;
300
301 #ifdef PROFILING
302 cap->r.rCCCS = CCS_SYSTEM;
303 #else
304 cap->r.rCCCS = NULL;
305 #endif
306
307 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
308 // don't want it set when not running a Haskell thread.
309 cap->r.rCurrentTSO = NULL;
310
311 traceCapCreate(cap);
312 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
313 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
314 #if defined(THREADED_RTS)
315 traceSparkCounters(cap);
316 #endif
317 }
318
319 /* ---------------------------------------------------------------------------
320 * Function: initCapabilities()
321 *
322 * Purpose: set up the Capability handling. For the THREADED_RTS build,
323 * we keep a table of them, the size of which is
324 * controlled by the user via the RTS flag -N.
325 *
326 * ------------------------------------------------------------------------- */
327 void initCapabilities (void)
328 {
329 uint32_t i;
330
331 /* Declare a couple capability sets representing the process and
332 clock domain. Each capability will get added to these capsets. */
333 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
334 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
335
336 // Initialise NUMA
337 if (!RtsFlags.GcFlags.numa) {
338 n_numa_nodes = 1;
339 for (i = 0; i < MAX_NUMA_NODES; i++) {
340 numa_map[i] = 0;
341 }
342 } else {
343 uint32_t nNodes = osNumaNodes();
344 if (nNodes > MAX_NUMA_NODES) {
345 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
346 }
347 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
348 uint32_t logical = 0, physical = 0;
349 for (; physical < MAX_NUMA_NODES; physical++) {
350 if (mask & 1) {
351 numa_map[logical++] = physical;
352 }
353 mask = mask >> 1;
354 }
355 n_numa_nodes = logical;
356 if (logical == 0) {
357 barf("%s: available NUMA node set is empty");
358 }
359 }
360
361 #if defined(THREADED_RTS)
362
363 #ifndef REG_Base
364 // We can't support multiple CPUs if BaseReg is not a register
365 if (RtsFlags.ParFlags.nCapabilities > 1) {
366 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
367 RtsFlags.ParFlags.nCapabilities = 1;
368 }
369 #endif
370
371 n_capabilities = 0;
372 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
373 n_capabilities = RtsFlags.ParFlags.nCapabilities;
374
375 #else /* !THREADED_RTS */
376
377 n_capabilities = 1;
378 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
379 capabilities[0] = &MainCapability;
380
381 initCapability(&MainCapability, 0);
382
383 #endif
384
385 enabled_capabilities = n_capabilities;
386
387 // There are no free capabilities to begin with. We will start
388 // a worker Task to each Capability, which will quickly put the
389 // Capability on the free list when it finds nothing to do.
390 for (i = 0; i < n_numa_nodes; i++) {
391 last_free_capability[i] = capabilities[0];
392 }
393 }
394
395 void
396 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
397 {
398 #if defined(THREADED_RTS)
399 uint32_t i;
400 Capability **old_capabilities = capabilities;
401
402 capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
403
404 if (to == 1) {
405 // THREADED_RTS must work on builds that don't have a mutable
406 // BaseReg (eg. unregisterised), so in this case
407 // capabilities[0] must coincide with &MainCapability.
408 capabilities[0] = &MainCapability;
409 initCapability(&MainCapability, 0);
410 }
411 else
412 {
413 for (i = 0; i < to; i++) {
414 if (i < from) {
415 capabilities[i] = old_capabilities[i];
416 } else {
417 capabilities[i] = stgMallocBytes(sizeof(Capability),
418 "moreCapabilities");
419 initCapability(capabilities[i], i);
420 }
421 }
422 }
423
424 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
425
426 if (old_capabilities != NULL) {
427 stgFree(old_capabilities);
428 }
429 #endif
430 }
431
432 /* ----------------------------------------------------------------------------
433 * setContextSwitches: cause all capabilities to context switch as
434 * soon as possible.
435 * ------------------------------------------------------------------------- */
436
437 void contextSwitchAllCapabilities(void)
438 {
439 uint32_t i;
440 for (i=0; i < n_capabilities; i++) {
441 contextSwitchCapability(capabilities[i]);
442 }
443 }
444
445 void interruptAllCapabilities(void)
446 {
447 uint32_t i;
448 for (i=0; i < n_capabilities; i++) {
449 interruptCapability(capabilities[i]);
450 }
451 }
452
453 /* ----------------------------------------------------------------------------
454 * Give a Capability to a Task. The task must currently be sleeping
455 * on its condition variable.
456 *
457 * Requires cap->lock (modifies cap->running_task).
458 *
459 * When migrating a Task, the migrater must take task->lock before
460 * modifying task->cap, to synchronise with the waking up Task.
461 * Additionally, the migrater should own the Capability (when
462 * migrating the run queue), or cap->lock (when migrating
463 * returning_workers).
464 *
465 * ------------------------------------------------------------------------- */
466
467 #if defined(THREADED_RTS)
468 static void
469 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
470 {
471 ASSERT_LOCK_HELD(&cap->lock);
472 ASSERT(task->cap == cap);
473 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
474 cap->no, task->incall->tso ? "bound task" : "worker",
475 serialisableTaskId(task));
476 ACQUIRE_LOCK(&task->lock);
477 if (task->wakeup == rtsFalse) {
478 task->wakeup = rtsTrue;
479 // the wakeup flag is needed because signalCondition() doesn't
480 // flag the condition if the thread is already runniing, but we want
481 // it to be sticky.
482 signalCondition(&task->cond);
483 }
484 RELEASE_LOCK(&task->lock);
485 }
486 #endif
487
488 /* ----------------------------------------------------------------------------
489 * releaseCapability
490 *
491 * The current Task (cap->task) releases the Capability. The Capability is
492 * marked free, and if there is any work to do, an appropriate Task is woken up.
493 * ------------------------------------------------------------------------- */
494
495 #if defined(THREADED_RTS)
496 void
497 releaseCapability_ (Capability* cap,
498 rtsBool always_wakeup)
499 {
500 Task *task;
501
502 task = cap->running_task;
503
504 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
505
506 cap->running_task = NULL;
507
508 // Check to see whether a worker thread can be given
509 // the go-ahead to return the result of an external call..
510 if (cap->returning_tasks_hd != NULL) {
511 giveCapabilityToTask(cap,cap->returning_tasks_hd);
512 // The Task pops itself from the queue (see waitForCapability())
513 return;
514 }
515
516 // If there is a pending sync, then we should just leave the Capability
517 // free. The thread trying to sync will be about to call
518 // waitForCapability().
519 //
520 // Note: this is *after* we check for a returning task above,
521 // because the task attempting to acquire all the capabilities may
522 // be currently in waitForCapability() waiting for this
523 // capability, in which case simply setting it as free would not
524 // wake up the waiting task.
525 PendingSync *sync = pending_sync;
526 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
527 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
528 return;
529 }
530
531 // If the next thread on the run queue is a bound thread,
532 // give this Capability to the appropriate Task.
533 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
534 // Make sure we're not about to try to wake ourselves up
535 // ASSERT(task != cap->run_queue_hd->bound);
536 // assertion is false: in schedule() we force a yield after
537 // ThreadBlocked, but the thread may be back on the run queue
538 // by now.
539 task = peekRunQueue(cap)->bound->task;
540 giveCapabilityToTask(cap, task);
541 return;
542 }
543
544 if (!cap->spare_workers) {
545 // Create a worker thread if we don't have one. If the system
546 // is interrupted, we only create a worker task if there
547 // are threads that need to be completed. If the system is
548 // shutting down, we never create a new worker.
549 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
550 debugTrace(DEBUG_sched,
551 "starting new worker on capability %d", cap->no);
552 startWorkerTask(cap);
553 return;
554 }
555 }
556
557 // If we have an unbound thread on the run queue, or if there's
558 // anything else to do, give the Capability to a worker thread.
559 if (always_wakeup ||
560 !emptyRunQueue(cap) || !emptyInbox(cap) ||
561 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
562 if (cap->spare_workers) {
563 giveCapabilityToTask(cap, cap->spare_workers);
564 // The worker Task pops itself from the queue;
565 return;
566 }
567 }
568
569 #ifdef PROFILING
570 cap->r.rCCCS = CCS_IDLE;
571 #endif
572 last_free_capability[cap->node] = cap;
573 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
574 }
575
576 void
577 releaseCapability (Capability* cap USED_IF_THREADS)
578 {
579 ACQUIRE_LOCK(&cap->lock);
580 releaseCapability_(cap, rtsFalse);
581 RELEASE_LOCK(&cap->lock);
582 }
583
584 void
585 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
586 {
587 ACQUIRE_LOCK(&cap->lock);
588 releaseCapability_(cap, rtsTrue);
589 RELEASE_LOCK(&cap->lock);
590 }
591
592 static void
593 enqueueWorker (Capability* cap USED_IF_THREADS)
594 {
595 Task *task;
596
597 task = cap->running_task;
598
599 // If the Task is stopped, we shouldn't be yielding, we should
600 // be just exiting.
601 ASSERT(!task->stopped);
602 ASSERT(task->worker);
603
604 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
605 {
606 task->next = cap->spare_workers;
607 cap->spare_workers = task;
608 cap->n_spare_workers++;
609 }
610 else
611 {
612 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
613 cap->n_spare_workers);
614 releaseCapability_(cap,rtsFalse);
615 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
616 workerTaskStop(task);
617 RELEASE_LOCK(&cap->lock);
618 shutdownThread();
619 }
620 }
621
622 #endif
623
624 /* ----------------------------------------------------------------------------
625 * waitForWorkerCapability(task)
626 *
627 * waits to be given a Capability, and then returns the Capability. The task
628 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
629 * ------------------------------------------------------------------------- */
630
631 #if defined(THREADED_RTS)
632
633 static Capability * waitForWorkerCapability (Task *task)
634 {
635 Capability *cap;
636
637 for (;;) {
638 ACQUIRE_LOCK(&task->lock);
639 // task->lock held, cap->lock not held
640 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
641 cap = task->cap;
642 task->wakeup = rtsFalse;
643 RELEASE_LOCK(&task->lock);
644
645 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
646
647 ACQUIRE_LOCK(&cap->lock);
648 if (cap->running_task != NULL) {
649 debugTrace(DEBUG_sched,
650 "capability %d is owned by another task", cap->no);
651 RELEASE_LOCK(&cap->lock);
652 continue;
653 }
654
655 if (task->cap != cap) {
656 // see Note [migrated bound threads]
657 debugTrace(DEBUG_sched,
658 "task has been migrated to cap %d", task->cap->no);
659 RELEASE_LOCK(&cap->lock);
660 continue;
661 }
662
663 if (task->incall->tso == NULL) {
664 ASSERT(cap->spare_workers != NULL);
665 // if we're not at the front of the queue, release it
666 // again. This is unlikely to happen.
667 if (cap->spare_workers != task) {
668 giveCapabilityToTask(cap,cap->spare_workers);
669 RELEASE_LOCK(&cap->lock);
670 continue;
671 }
672 cap->spare_workers = task->next;
673 task->next = NULL;
674 cap->n_spare_workers--;
675 }
676
677 cap->running_task = task;
678 RELEASE_LOCK(&cap->lock);
679 break;
680 }
681
682 return cap;
683 }
684
685 #endif /* THREADED_RTS */
686
687 /* ----------------------------------------------------------------------------
688 * waitForReturnCapability (Task *task)
689 *
690 * The Task should be on the cap->returning_tasks queue of a Capability. This
691 * function waits for the Task to be woken up, and returns the Capability that
692 * it was woken up on.
693 *
694 * ------------------------------------------------------------------------- */
695
696 #if defined(THREADED_RTS)
697
698 static Capability * waitForReturnCapability (Task *task)
699 {
700 Capability *cap;
701
702 for (;;) {
703 ACQUIRE_LOCK(&task->lock);
704 // task->lock held, cap->lock not held
705 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
706 cap = task->cap;
707 task->wakeup = rtsFalse;
708 RELEASE_LOCK(&task->lock);
709
710 // now check whether we should wake up...
711 ACQUIRE_LOCK(&cap->lock);
712 if (cap->running_task == NULL) {
713 if (cap->returning_tasks_hd != task) {
714 giveCapabilityToTask(cap,cap->returning_tasks_hd);
715 RELEASE_LOCK(&cap->lock);
716 continue;
717 }
718 cap->running_task = task;
719 popReturningTask(cap);
720 RELEASE_LOCK(&cap->lock);
721 break;
722 }
723 RELEASE_LOCK(&cap->lock);
724 }
725
726 return cap;
727 }
728
729 #endif /* THREADED_RTS */
730
731 /* ----------------------------------------------------------------------------
732 * waitForCapability (Capability **pCap, Task *task)
733 *
734 * Purpose: when an OS thread returns from an external call,
735 * it calls waitForCapability() (via Schedule.resumeThread())
736 * to wait for permission to enter the RTS & communicate the
737 * result of the external call back to the Haskell thread that
738 * made it.
739 *
740 * ------------------------------------------------------------------------- */
741
742 void waitForCapability (Capability **pCap, Task *task)
743 {
744 #if !defined(THREADED_RTS)
745
746 MainCapability.running_task = task;
747 task->cap = &MainCapability;
748 *pCap = &MainCapability;
749
750 #else
751 uint32_t i;
752 Capability *cap = *pCap;
753
754 if (cap == NULL) {
755 if (task->preferred_capability != -1) {
756 cap = capabilities[task->preferred_capability %
757 enabled_capabilities];
758 } else {
759 // Try last_free_capability first
760 cap = last_free_capability[task->node];
761 if (cap->running_task) {
762 // Otherwise, search for a free capability on this node.
763 cap = NULL;
764 for (i = task->node; i < enabled_capabilities;
765 i += n_numa_nodes) {
766 // visits all the capabilities on this node, because
767 // cap[i]->node == i % n_numa_nodes
768 if (!capabilities[i]->running_task) {
769 cap = capabilities[i];
770 break;
771 }
772 }
773 if (cap == NULL) {
774 // Can't find a free one, use last_free_capability.
775 cap = last_free_capability[task->node];
776 }
777 }
778 }
779
780 // record the Capability as the one this Task is now assocated with.
781 task->cap = cap;
782
783 } else {
784 ASSERT(task->cap == cap);
785 }
786
787 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
788
789 ACQUIRE_LOCK(&cap->lock);
790 if (!cap->running_task) {
791 // It's free; just grab it
792 cap->running_task = task;
793 RELEASE_LOCK(&cap->lock);
794 } else {
795 newReturningTask(cap,task);
796 RELEASE_LOCK(&cap->lock);
797 cap = waitForReturnCapability(task);
798 }
799
800 #ifdef PROFILING
801 cap->r.rCCCS = CCS_SYSTEM;
802 #endif
803
804 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
805
806 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
807
808 *pCap = cap;
809 #endif
810 }
811
812 /* ----------------------------------------------------------------------------
813 * yieldCapability
814 *
815 * Give up the Capability, and return when we have it again. This is called
816 * when either we know that the Capability should be given to another Task, or
817 * there is nothing to do right now. One of the following is true:
818 *
819 * - The current Task is a worker, and there's a bound thread at the head of
820 * the run queue (or vice versa)
821 *
822 * - The run queue is empty. We'll be woken up again when there's work to
823 * do.
824 *
825 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
826 * We should become a GC worker for a while.
827 *
828 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
829 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
830 * setNumCapabilities. We should give up the Capability temporarily.
831 *
832 * ------------------------------------------------------------------------- */
833
834 #if defined (THREADED_RTS)
835
836 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
837 and return the rtsBool */
838 rtsBool /* Did we GC? */
839 yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
840 {
841 Capability *cap = *pCap;
842
843 if (gcAllowed)
844 {
845 PendingSync *sync = pending_sync;
846
847 if (sync && sync->type == SYNC_GC_PAR) {
848 if (! sync->idle[cap->no]) {
849 traceEventGcStart(cap);
850 gcWorkerThread(cap);
851 traceEventGcEnd(cap);
852 traceSparkCounters(cap);
853 // See Note [migrated bound threads 2]
854 if (task->cap == cap) {
855 return rtsTrue;
856 }
857 }
858 }
859 }
860
861 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
862
863 // We must now release the capability and wait to be woken up again.
864 task->wakeup = rtsFalse;
865
866 ACQUIRE_LOCK(&cap->lock);
867
868 // If this is a worker thread, put it on the spare_workers queue
869 if (isWorker(task)) {
870 enqueueWorker(cap);
871 }
872
873 releaseCapability_(cap, rtsFalse);
874
875 if (isWorker(task) || isBoundTask(task)) {
876 RELEASE_LOCK(&cap->lock);
877 cap = waitForWorkerCapability(task);
878 } else {
879 // Not a worker Task, or a bound Task. The only way we can be woken up
880 // again is to put ourselves on the returning_tasks queue, so that's
881 // what we do. We still hold cap->lock at this point
882 // The Task waiting for this Capability does not have it
883 // yet, so we can be sure to be woken up later. (see #10545)
884 newReturningTask(cap,task);
885 RELEASE_LOCK(&cap->lock);
886 cap = waitForReturnCapability(task);
887 }
888
889 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
890 ASSERT(cap->running_task == task);
891
892 #ifdef PROFILING
893 cap->r.rCCCS = CCS_SYSTEM;
894 #endif
895
896 *pCap = cap;
897
898 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
899
900 return rtsFalse;
901 }
902
903 #endif /* THREADED_RTS */
904
905 // Note [migrated bound threads]
906 //
907 // There's a tricky case where:
908 // - cap A is running an unbound thread T1
909 // - there is a bound thread T2 at the head of the run queue on cap A
910 // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
911 // - T1 returns quickly grabbing A again (T2 is still waking up on A)
912 // - T1 blocks, the scheduler migrates T2 to cap B
913 // - the task bound to T2 wakes up on cap B
914 //
915 // We take advantage of the following invariant:
916 //
917 // - A bound thread can only be migrated by the holder of the
918 // Capability on which the bound thread currently lives. So, if we
919 // hold Capability C, and task->cap == C, then task cannot be
920 // migrated under our feet.
921
922 // Note [migrated bound threads 2]
923 //
924 // Second tricky case;
925 // - A bound Task becomes a GC thread
926 // - scheduleDoGC() migrates the thread belonging to this Task,
927 // because the Capability it is on is disabled
928 // - after GC, gcWorkerThread() returns, but now we are
929 // holding a Capability that is not the same as task->cap
930 // - Hence we must check for this case and immediately give up the
931 // cap we hold.
932
933 /* ----------------------------------------------------------------------------
934 * prodCapability
935 *
936 * If a Capability is currently idle, wake up a Task on it. Used to
937 * get every Capability into the GC.
938 * ------------------------------------------------------------------------- */
939
940 #if defined (THREADED_RTS)
941
942 void
943 prodCapability (Capability *cap, Task *task)
944 {
945 ACQUIRE_LOCK(&cap->lock);
946 if (!cap->running_task) {
947 cap->running_task = task;
948 releaseCapability_(cap,rtsTrue);
949 }
950 RELEASE_LOCK(&cap->lock);
951 }
952
953 #endif /* THREADED_RTS */
954
955 /* ----------------------------------------------------------------------------
956 * tryGrabCapability
957 *
958 * Attempt to gain control of a Capability if it is free.
959 *
960 * ------------------------------------------------------------------------- */
961
962 #if defined (THREADED_RTS)
963
964 rtsBool
965 tryGrabCapability (Capability *cap, Task *task)
966 {
967 if (cap->running_task != NULL) return rtsFalse;
968 ACQUIRE_LOCK(&cap->lock);
969 if (cap->running_task != NULL) {
970 RELEASE_LOCK(&cap->lock);
971 return rtsFalse;
972 }
973 task->cap = cap;
974 cap->running_task = task;
975 RELEASE_LOCK(&cap->lock);
976 return rtsTrue;
977 }
978
979
980 #endif /* THREADED_RTS */
981
982 /* ----------------------------------------------------------------------------
983 * shutdownCapability
984 *
985 * At shutdown time, we want to let everything exit as cleanly as
986 * possible. For each capability, we let its run queue drain, and
987 * allow the workers to stop.
988 *
989 * This function should be called when interrupted and
990 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
991 * will exit the scheduler and call taskStop(), and any bound thread
992 * that wakes up will return to its caller. Runnable threads are
993 * killed.
994 *
995 * ------------------------------------------------------------------------- */
996
997 static void
998 shutdownCapability (Capability *cap USED_IF_THREADS,
999 Task *task USED_IF_THREADS,
1000 rtsBool safe USED_IF_THREADS)
1001 {
1002 #if defined(THREADED_RTS)
1003 uint32_t i;
1004
1005 task->cap = cap;
1006
1007 // Loop indefinitely until all the workers have exited and there
1008 // are no Haskell threads left. We used to bail out after 50
1009 // iterations of this loop, but that occasionally left a worker
1010 // running which caused problems later (the closeMutex() below
1011 // isn't safe, for one thing).
1012
1013 for (i = 0; /* i < 50 */; i++) {
1014 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1015
1016 debugTrace(DEBUG_sched,
1017 "shutting down capability %d, attempt %d", cap->no, i);
1018 ACQUIRE_LOCK(&cap->lock);
1019 if (cap->running_task) {
1020 RELEASE_LOCK(&cap->lock);
1021 debugTrace(DEBUG_sched, "not owner, yielding");
1022 yieldThread();
1023 continue;
1024 }
1025 cap->running_task = task;
1026
1027 if (cap->spare_workers) {
1028 // Look for workers that have died without removing
1029 // themselves from the list; this could happen if the OS
1030 // summarily killed the thread, for example. This
1031 // actually happens on Windows when the system is
1032 // terminating the program, and the RTS is running in a
1033 // DLL.
1034 Task *t, *prev;
1035 prev = NULL;
1036 for (t = cap->spare_workers; t != NULL; t = t->next) {
1037 if (!osThreadIsAlive(t->id)) {
1038 debugTrace(DEBUG_sched,
1039 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1040 cap->n_spare_workers--;
1041 if (!prev) {
1042 cap->spare_workers = t->next;
1043 } else {
1044 prev->next = t->next;
1045 }
1046 prev = t;
1047 }
1048 }
1049 }
1050
1051 if (!emptyRunQueue(cap) || cap->spare_workers) {
1052 debugTrace(DEBUG_sched,
1053 "runnable threads or workers still alive, yielding");
1054 releaseCapability_(cap,rtsFalse); // this will wake up a worker
1055 RELEASE_LOCK(&cap->lock);
1056 yieldThread();
1057 continue;
1058 }
1059
1060 // If "safe", then busy-wait for any threads currently doing
1061 // foreign calls. If we're about to unload this DLL, for
1062 // example, we need to be sure that there are no OS threads
1063 // that will try to return to code that has been unloaded.
1064 // We can be a bit more relaxed when this is a standalone
1065 // program that is about to terminate, and let safe=false.
1066 if (cap->suspended_ccalls && safe) {
1067 debugTrace(DEBUG_sched,
1068 "thread(s) are involved in foreign calls, yielding");
1069 cap->running_task = NULL;
1070 RELEASE_LOCK(&cap->lock);
1071 // The IO manager thread might have been slow to start up,
1072 // so the first attempt to kill it might not have
1073 // succeeded. Just in case, try again - the kill message
1074 // will only be sent once.
1075 //
1076 // To reproduce this deadlock: run ffi002(threaded1)
1077 // repeatedly on a loaded machine.
1078 ioManagerDie();
1079 yieldThread();
1080 continue;
1081 }
1082
1083 traceSparkCounters(cap);
1084 RELEASE_LOCK(&cap->lock);
1085 break;
1086 }
1087 // we now have the Capability, its run queue and spare workers
1088 // list are both empty.
1089
1090 // ToDo: we can't drop this mutex, because there might still be
1091 // threads performing foreign calls that will eventually try to
1092 // return via resumeThread() and attempt to grab cap->lock.
1093 // closeMutex(&cap->lock);
1094 #endif
1095 }
1096
1097 void
1098 shutdownCapabilities(Task *task, rtsBool safe)
1099 {
1100 uint32_t i;
1101 for (i=0; i < n_capabilities; i++) {
1102 ASSERT(task->incall->tso == NULL);
1103 shutdownCapability(capabilities[i], task, safe);
1104 }
1105 #if defined(THREADED_RTS)
1106 ASSERT(checkSparkCountInvariant());
1107 #endif
1108 }
1109
1110 static void
1111 freeCapability (Capability *cap)
1112 {
1113 stgFree(cap->mut_lists);
1114 stgFree(cap->saved_mut_lists);
1115 #if defined(THREADED_RTS)
1116 freeSparkPool(cap->sparks);
1117 #endif
1118 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1119 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1120 traceCapDelete(cap);
1121 }
1122
1123 void
1124 freeCapabilities (void)
1125 {
1126 #if defined(THREADED_RTS)
1127 uint32_t i;
1128 for (i=0; i < n_capabilities; i++) {
1129 freeCapability(capabilities[i]);
1130 if (capabilities[i] != &MainCapability)
1131 stgFree(capabilities[i]);
1132 }
1133 #else
1134 freeCapability(&MainCapability);
1135 #endif
1136 stgFree(capabilities);
1137 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1138 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1139 }
1140
1141 /* ---------------------------------------------------------------------------
1142 Mark everything directly reachable from the Capabilities. When
1143 using multiple GC threads, each GC thread marks all Capabilities
1144 for which (c `mod` n == 0), for Capability c and thread n.
1145 ------------------------------------------------------------------------ */
1146
1147 void
1148 markCapability (evac_fn evac, void *user, Capability *cap,
1149 rtsBool no_mark_sparks USED_IF_THREADS)
1150 {
1151 InCall *incall;
1152
1153 // Each GC thread is responsible for following roots from the
1154 // Capability of the same number. There will usually be the same
1155 // or fewer Capabilities as GC threads, but just in case there
1156 // are more, we mark every Capability whose number is the GC
1157 // thread's index plus a multiple of the number of GC threads.
1158 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1159 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1160 #if defined(THREADED_RTS)
1161 evac(user, (StgClosure **)(void *)&cap->inbox);
1162 #endif
1163 for (incall = cap->suspended_ccalls; incall != NULL;
1164 incall=incall->next) {
1165 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1166 }
1167
1168 #if defined(THREADED_RTS)
1169 if (!no_mark_sparks) {
1170 traverseSparkQueue (evac, user, cap);
1171 }
1172 #endif
1173
1174 // Free STM structures for this Capability
1175 stmPreGCHook(cap);
1176 }
1177
1178 void
1179 markCapabilities (evac_fn evac, void *user)
1180 {
1181 uint32_t n;
1182 for (n = 0; n < n_capabilities; n++) {
1183 markCapability(evac, user, capabilities[n], rtsFalse);
1184 }
1185 }
1186
1187 #if defined(THREADED_RTS)
1188 rtsBool checkSparkCountInvariant (void)
1189 {
1190 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1191 StgWord64 remaining = 0;
1192 uint32_t i;
1193
1194 for (i = 0; i < n_capabilities; i++) {
1195 sparks.created += capabilities[i]->spark_stats.created;
1196 sparks.dud += capabilities[i]->spark_stats.dud;
1197 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1198 sparks.converted += capabilities[i]->spark_stats.converted;
1199 sparks.gcd += capabilities[i]->spark_stats.gcd;
1200 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1201 remaining += sparkPoolSize(capabilities[i]->sparks);
1202 }
1203
1204 /* The invariant is
1205 * created = converted + remaining + gcd + fizzled
1206 */
1207 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1208 "(created == converted + remaining + gcd + fizzled)",
1209 sparks.created, sparks.converted, remaining,
1210 sparks.gcd, sparks.fizzled);
1211
1212 return (sparks.created ==
1213 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1214
1215 }
1216 #endif
1217
1218 #if !defined(mingw32_HOST_OS)
1219 void
1220 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1221 #if defined(THREADED_RTS)
1222 if (cap_no < n_capabilities) {
1223 capabilities[cap_no]->io_manager_control_wr_fd = fd;
1224 } else {
1225 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1226 }
1227 #endif
1228 }
1229 #endif