Eventlog support for new event type: create spark.
[ghc.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2006
4 *
5 * Capabilities
6 *
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28 #include "Trace.h"
29 #include "GC.h"
30
31 // one global capability, this is the Capability for non-threaded
32 // builds, and for +RTS -N1
33 Capability MainCapability;
34
35 nat n_capabilities;
36 Capability *capabilities = NULL;
37
38 // Holds the Capability which last became free. This is used so that
39 // an in-call has a chance of quickly finding a free Capability.
40 // Maintaining a global free list of Capabilities would require global
41 // locking, so we don't do that.
42 Capability *last_free_capability;
43
44 /* GC indicator, in scope for the scheduler, init'ed to false */
45 volatile StgWord waiting_for_gc = 0;
46
47 #if defined(THREADED_RTS)
48 STATIC_INLINE rtsBool
49 globalWorkToDo (void)
50 {
51 return blackholes_need_checking
52 || sched_state >= SCHED_INTERRUPTING
53 ;
54 }
55 #endif
56
57 #if defined(THREADED_RTS)
58 StgClosure *
59 findSpark (Capability *cap)
60 {
61 Capability *robbed;
62 StgClosurePtr spark;
63 rtsBool retry;
64 nat i = 0;
65
66 if (!emptyRunQueue(cap)) {
67 // If there are other threads, don't try to run any new
68 // sparks: sparks might be speculative, we don't want to take
69 // resources away from the main computation.
70 return 0;
71 }
72
73 // first try to get a spark from our own pool.
74 // We should be using reclaimSpark(), because it works without
75 // needing any atomic instructions:
76 // spark = reclaimSpark(cap->sparks);
77 // However, measurements show that this makes at least one benchmark
78 // slower (prsa) and doesn't affect the others.
79 spark = tryStealSpark(cap);
80 if (spark != NULL) {
81 cap->sparks_converted++;
82
83 // Post event for running a spark from capability's own pool.
84 postEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO->id, 0);
85
86 return spark;
87 }
88
89 if (n_capabilities == 1) { return NULL; } // makes no sense...
90
91 debugTrace(DEBUG_sched,
92 "cap %d: Trying to steal work from other capabilities",
93 cap->no);
94
95 do {
96 retry = rtsFalse;
97
98 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
99 start at a random place instead of 0 as well. */
100 for ( i=0 ; i < n_capabilities ; i++ ) {
101 robbed = &capabilities[i];
102 if (cap == robbed) // ourselves...
103 continue;
104
105 if (emptySparkPoolCap(robbed)) // nothing to steal here
106 continue;
107
108 spark = tryStealSpark(robbed);
109 if (spark == NULL && !emptySparkPoolCap(robbed)) {
110 // we conflicted with another thread while trying to steal;
111 // try again later.
112 retry = rtsTrue;
113 }
114
115 if (spark != NULL) {
116 debugTrace(DEBUG_sched,
117 "cap %d: Stole a spark from capability %d",
118 cap->no, robbed->no);
119 cap->sparks_converted++;
120
121 postEvent(cap, EVENT_STEAL_SPARK,
122 cap->r.rCurrentTSO->id, robbed->no);
123
124
125 return spark;
126 }
127 // otherwise: no success, try next one
128 }
129 } while (retry);
130
131 debugTrace(DEBUG_sched, "No sparks stolen");
132 return NULL;
133 }
134
135 // Returns True if any spark pool is non-empty at this moment in time
136 // The result is only valid for an instant, of course, so in a sense
137 // is immediately invalid, and should not be relied upon for
138 // correctness.
139 rtsBool
140 anySparks (void)
141 {
142 nat i;
143
144 for (i=0; i < n_capabilities; i++) {
145 if (!emptySparkPoolCap(&capabilities[i])) {
146 return rtsTrue;
147 }
148 }
149 return rtsFalse;
150 }
151 #endif
152
153 /* -----------------------------------------------------------------------------
154 * Manage the returning_tasks lists.
155 *
156 * These functions require cap->lock
157 * -------------------------------------------------------------------------- */
158
159 #if defined(THREADED_RTS)
160 STATIC_INLINE void
161 newReturningTask (Capability *cap, Task *task)
162 {
163 ASSERT_LOCK_HELD(&cap->lock);
164 ASSERT(task->return_link == NULL);
165 if (cap->returning_tasks_hd) {
166 ASSERT(cap->returning_tasks_tl->return_link == NULL);
167 cap->returning_tasks_tl->return_link = task;
168 } else {
169 cap->returning_tasks_hd = task;
170 }
171 cap->returning_tasks_tl = task;
172 }
173
174 STATIC_INLINE Task *
175 popReturningTask (Capability *cap)
176 {
177 ASSERT_LOCK_HELD(&cap->lock);
178 Task *task;
179 task = cap->returning_tasks_hd;
180 ASSERT(task);
181 cap->returning_tasks_hd = task->return_link;
182 if (!cap->returning_tasks_hd) {
183 cap->returning_tasks_tl = NULL;
184 }
185 task->return_link = NULL;
186 return task;
187 }
188 #endif
189
190 /* ----------------------------------------------------------------------------
191 * Initialisation
192 *
193 * The Capability is initially marked not free.
194 * ------------------------------------------------------------------------- */
195
196 static void
197 initCapability( Capability *cap, nat i )
198 {
199 nat g;
200
201 cap->no = i;
202 cap->in_haskell = rtsFalse;
203 cap->in_gc = rtsFalse;
204
205 cap->run_queue_hd = END_TSO_QUEUE;
206 cap->run_queue_tl = END_TSO_QUEUE;
207
208 #if defined(THREADED_RTS)
209 initMutex(&cap->lock);
210 cap->running_task = NULL; // indicates cap is free
211 cap->spare_workers = NULL;
212 cap->suspended_ccalling_tasks = NULL;
213 cap->returning_tasks_hd = NULL;
214 cap->returning_tasks_tl = NULL;
215 cap->wakeup_queue_hd = END_TSO_QUEUE;
216 cap->wakeup_queue_tl = END_TSO_QUEUE;
217 cap->sparks_created = 0;
218 cap->sparks_converted = 0;
219 cap->sparks_pruned = 0;
220 #endif
221
222 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
223 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
224 cap->f.stgGCFun = (F_)__stg_gc_fun;
225
226 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
227 RtsFlags.GcFlags.generations,
228 "initCapability");
229 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
230 RtsFlags.GcFlags.generations,
231 "initCapability");
232
233 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
234 cap->mut_lists[g] = NULL;
235 }
236
237 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
238 cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
239 cap->free_trec_chunks = END_STM_CHUNK_LIST;
240 cap->free_trec_headers = NO_TREC;
241 cap->transaction_tokens = 0;
242 cap->context_switch = 0;
243 }
244
245 /* ---------------------------------------------------------------------------
246 * Function: initCapabilities()
247 *
248 * Purpose: set up the Capability handling. For the THREADED_RTS build,
249 * we keep a table of them, the size of which is
250 * controlled by the user via the RTS flag -N.
251 *
252 * ------------------------------------------------------------------------- */
253 void
254 initCapabilities( void )
255 {
256 #if defined(THREADED_RTS)
257 nat i;
258
259 #ifndef REG_Base
260 // We can't support multiple CPUs if BaseReg is not a register
261 if (RtsFlags.ParFlags.nNodes > 1) {
262 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
263 RtsFlags.ParFlags.nNodes = 1;
264 }
265 #endif
266
267 n_capabilities = RtsFlags.ParFlags.nNodes;
268
269 if (n_capabilities == 1) {
270 capabilities = &MainCapability;
271 // THREADED_RTS must work on builds that don't have a mutable
272 // BaseReg (eg. unregisterised), so in this case
273 // capabilities[0] must coincide with &MainCapability.
274 } else {
275 capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
276 "initCapabilities");
277 }
278
279 for (i = 0; i < n_capabilities; i++) {
280 initCapability(&capabilities[i], i);
281 }
282
283 debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
284
285 #else /* !THREADED_RTS */
286
287 n_capabilities = 1;
288 capabilities = &MainCapability;
289 initCapability(&MainCapability, 0);
290
291 #endif
292
293 // There are no free capabilities to begin with. We will start
294 // a worker Task to each Capability, which will quickly put the
295 // Capability on the free list when it finds nothing to do.
296 last_free_capability = &capabilities[0];
297 }
298
299 /* ----------------------------------------------------------------------------
300 * setContextSwitches: cause all capabilities to context switch as
301 * soon as possible.
302 * ------------------------------------------------------------------------- */
303
304 void setContextSwitches(void)
305 {
306 nat i;
307 for (i=0; i < n_capabilities; i++) {
308 contextSwitchCapability(&capabilities[i]);
309 }
310 }
311
312 /* ----------------------------------------------------------------------------
313 * Give a Capability to a Task. The task must currently be sleeping
314 * on its condition variable.
315 *
316 * Requires cap->lock (modifies cap->running_task).
317 *
318 * When migrating a Task, the migrater must take task->lock before
319 * modifying task->cap, to synchronise with the waking up Task.
320 * Additionally, the migrater should own the Capability (when
321 * migrating the run queue), or cap->lock (when migrating
322 * returning_workers).
323 *
324 * ------------------------------------------------------------------------- */
325
326 #if defined(THREADED_RTS)
327 STATIC_INLINE void
328 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
329 {
330 ASSERT_LOCK_HELD(&cap->lock);
331 ASSERT(task->cap == cap);
332 debugTrace(DEBUG_sched, "passing capability %d to %s %p",
333 cap->no, task->tso ? "bound task" : "worker",
334 (void *)task->id);
335 ACQUIRE_LOCK(&task->lock);
336 task->wakeup = rtsTrue;
337 // the wakeup flag is needed because signalCondition() doesn't
338 // flag the condition if the thread is already runniing, but we want
339 // it to be sticky.
340 signalCondition(&task->cond);
341 RELEASE_LOCK(&task->lock);
342 }
343 #endif
344
345 /* ----------------------------------------------------------------------------
346 * Function: releaseCapability(Capability*)
347 *
348 * Purpose: Letting go of a capability. Causes a
349 * 'returning worker' thread or a 'waiting worker'
350 * to wake up, in that order.
351 * ------------------------------------------------------------------------- */
352
353 #if defined(THREADED_RTS)
354 void
355 releaseCapability_ (Capability* cap,
356 rtsBool always_wakeup)
357 {
358 Task *task;
359
360 task = cap->running_task;
361
362 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
363
364 cap->running_task = NULL;
365
366 // Check to see whether a worker thread can be given
367 // the go-ahead to return the result of an external call..
368 if (cap->returning_tasks_hd != NULL) {
369 giveCapabilityToTask(cap,cap->returning_tasks_hd);
370 // The Task pops itself from the queue (see waitForReturnCapability())
371 return;
372 }
373
374 if (waiting_for_gc == PENDING_GC_SEQ) {
375 last_free_capability = cap; // needed?
376 debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
377 return;
378 }
379
380
381 // If the next thread on the run queue is a bound thread,
382 // give this Capability to the appropriate Task.
383 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
384 // Make sure we're not about to try to wake ourselves up
385 ASSERT(task != cap->run_queue_hd->bound);
386 task = cap->run_queue_hd->bound;
387 giveCapabilityToTask(cap,task);
388 return;
389 }
390
391 if (!cap->spare_workers) {
392 // Create a worker thread if we don't have one. If the system
393 // is interrupted, we only create a worker task if there
394 // are threads that need to be completed. If the system is
395 // shutting down, we never create a new worker.
396 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
397 debugTrace(DEBUG_sched,
398 "starting new worker on capability %d", cap->no);
399 startWorkerTask(cap, workerStart);
400 return;
401 }
402 }
403
404 // If we have an unbound thread on the run queue, or if there's
405 // anything else to do, give the Capability to a worker thread.
406 if (always_wakeup ||
407 !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
408 !emptySparkPoolCap(cap) || globalWorkToDo()) {
409 if (cap->spare_workers) {
410 giveCapabilityToTask(cap,cap->spare_workers);
411 // The worker Task pops itself from the queue;
412 return;
413 }
414 }
415
416 last_free_capability = cap;
417 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
418 }
419
420 void
421 releaseCapability (Capability* cap USED_IF_THREADS)
422 {
423 ACQUIRE_LOCK(&cap->lock);
424 releaseCapability_(cap, rtsFalse);
425 RELEASE_LOCK(&cap->lock);
426 }
427
428 void
429 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
430 {
431 ACQUIRE_LOCK(&cap->lock);
432 releaseCapability_(cap, rtsTrue);
433 RELEASE_LOCK(&cap->lock);
434 }
435
436 static void
437 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
438 {
439 Task *task;
440
441 ACQUIRE_LOCK(&cap->lock);
442
443 task = cap->running_task;
444
445 // If the current task is a worker, save it on the spare_workers
446 // list of this Capability. A worker can mark itself as stopped,
447 // in which case it is not replaced on the spare_worker queue.
448 // This happens when the system is shutting down (see
449 // Schedule.c:workerStart()).
450 // Also, be careful to check that this task hasn't just exited
451 // Haskell to do a foreign call (task->suspended_tso).
452 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
453 task->next = cap->spare_workers;
454 cap->spare_workers = task;
455 }
456 // Bound tasks just float around attached to their TSOs.
457
458 releaseCapability_(cap,rtsFalse);
459
460 RELEASE_LOCK(&cap->lock);
461 }
462 #endif
463
464 /* ----------------------------------------------------------------------------
465 * waitForReturnCapability( Task *task )
466 *
467 * Purpose: when an OS thread returns from an external call,
468 * it calls waitForReturnCapability() (via Schedule.resumeThread())
469 * to wait for permission to enter the RTS & communicate the
470 * result of the external call back to the Haskell thread that
471 * made it.
472 *
473 * ------------------------------------------------------------------------- */
474 void
475 waitForReturnCapability (Capability **pCap, Task *task)
476 {
477 #if !defined(THREADED_RTS)
478
479 MainCapability.running_task = task;
480 task->cap = &MainCapability;
481 *pCap = &MainCapability;
482
483 #else
484 Capability *cap = *pCap;
485
486 if (cap == NULL) {
487 // Try last_free_capability first
488 cap = last_free_capability;
489 if (!cap->running_task) {
490 nat i;
491 // otherwise, search for a free capability
492 cap = NULL;
493 for (i = 0; i < n_capabilities; i++) {
494 if (!capabilities[i].running_task) {
495 cap = &capabilities[i];
496 break;
497 }
498 }
499 if (cap == NULL) {
500 // Can't find a free one, use last_free_capability.
501 cap = last_free_capability;
502 }
503 }
504
505 // record the Capability as the one this Task is now assocated with.
506 task->cap = cap;
507
508 } else {
509 ASSERT(task->cap == cap);
510 }
511
512 ACQUIRE_LOCK(&cap->lock);
513
514 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
515
516 if (!cap->running_task) {
517 // It's free; just grab it
518 cap->running_task = task;
519 RELEASE_LOCK(&cap->lock);
520 } else {
521 newReturningTask(cap,task);
522 RELEASE_LOCK(&cap->lock);
523
524 for (;;) {
525 ACQUIRE_LOCK(&task->lock);
526 // task->lock held, cap->lock not held
527 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
528 cap = task->cap;
529 task->wakeup = rtsFalse;
530 RELEASE_LOCK(&task->lock);
531
532 // now check whether we should wake up...
533 ACQUIRE_LOCK(&cap->lock);
534 if (cap->running_task == NULL) {
535 if (cap->returning_tasks_hd != task) {
536 giveCapabilityToTask(cap,cap->returning_tasks_hd);
537 RELEASE_LOCK(&cap->lock);
538 continue;
539 }
540 cap->running_task = task;
541 popReturningTask(cap);
542 RELEASE_LOCK(&cap->lock);
543 break;
544 }
545 RELEASE_LOCK(&cap->lock);
546 }
547
548 }
549
550 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
551
552 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
553
554 *pCap = cap;
555 #endif
556 }
557
558 #if defined(THREADED_RTS)
559 /* ----------------------------------------------------------------------------
560 * yieldCapability
561 * ------------------------------------------------------------------------- */
562
563 void
564 yieldCapability (Capability** pCap, Task *task)
565 {
566 Capability *cap = *pCap;
567
568 if (waiting_for_gc == PENDING_GC_PAR) {
569 debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
570 postEvent(cap, EVENT_GC_START, 0, 0);
571 gcWorkerThread(cap);
572 postEvent(cap, EVENT_GC_END, 0, 0);
573 return;
574 }
575
576 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
577
578 // We must now release the capability and wait to be woken up
579 // again.
580 task->wakeup = rtsFalse;
581 releaseCapabilityAndQueueWorker(cap);
582
583 for (;;) {
584 ACQUIRE_LOCK(&task->lock);
585 // task->lock held, cap->lock not held
586 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
587 cap = task->cap;
588 task->wakeup = rtsFalse;
589 RELEASE_LOCK(&task->lock);
590
591 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
592
593 ACQUIRE_LOCK(&cap->lock);
594 if (cap->running_task != NULL) {
595 debugTrace(DEBUG_sched,
596 "capability %d is owned by another task", cap->no);
597 RELEASE_LOCK(&cap->lock);
598 continue;
599 }
600
601 if (task->tso == NULL) {
602 ASSERT(cap->spare_workers != NULL);
603 // if we're not at the front of the queue, release it
604 // again. This is unlikely to happen.
605 if (cap->spare_workers != task) {
606 giveCapabilityToTask(cap,cap->spare_workers);
607 RELEASE_LOCK(&cap->lock);
608 continue;
609 }
610 cap->spare_workers = task->next;
611 task->next = NULL;
612 }
613 cap->running_task = task;
614 RELEASE_LOCK(&cap->lock);
615 break;
616 }
617
618 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
619 ASSERT(cap->running_task == task);
620
621 *pCap = cap;
622
623 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
624
625 return;
626 }
627
628 /* ----------------------------------------------------------------------------
629 * Wake up a thread on a Capability.
630 *
631 * This is used when the current Task is running on a Capability and
632 * wishes to wake up a thread on a different Capability.
633 * ------------------------------------------------------------------------- */
634
635 void
636 wakeupThreadOnCapability (Capability *my_cap,
637 Capability *other_cap,
638 StgTSO *tso)
639 {
640 ACQUIRE_LOCK(&other_cap->lock);
641
642 // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
643 if (tso->bound) {
644 ASSERT(tso->bound->cap == tso->cap);
645 tso->bound->cap = other_cap;
646 }
647 tso->cap = other_cap;
648
649 ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
650
651 if (other_cap->running_task == NULL) {
652 // nobody is running this Capability, we can add our thread
653 // directly onto the run queue and start up a Task to run it.
654
655 other_cap->running_task = myTask();
656 // precond for releaseCapability_() and appendToRunQueue()
657
658 appendToRunQueue(other_cap,tso);
659
660 releaseCapability_(other_cap,rtsFalse);
661 } else {
662 appendToWakeupQueue(my_cap,other_cap,tso);
663 other_cap->context_switch = 1;
664 // someone is running on this Capability, so it cannot be
665 // freed without first checking the wakeup queue (see
666 // releaseCapability_).
667 }
668
669 RELEASE_LOCK(&other_cap->lock);
670 }
671
672 /* ----------------------------------------------------------------------------
673 * prodCapability
674 *
675 * If a Capability is currently idle, wake up a Task on it. Used to
676 * get every Capability into the GC.
677 * ------------------------------------------------------------------------- */
678
679 void
680 prodCapability (Capability *cap, Task *task)
681 {
682 ACQUIRE_LOCK(&cap->lock);
683 if (!cap->running_task) {
684 cap->running_task = task;
685 releaseCapability_(cap,rtsTrue);
686 }
687 RELEASE_LOCK(&cap->lock);
688 }
689
690 /* ----------------------------------------------------------------------------
691 * shutdownCapability
692 *
693 * At shutdown time, we want to let everything exit as cleanly as
694 * possible. For each capability, we let its run queue drain, and
695 * allow the workers to stop.
696 *
697 * This function should be called when interrupted and
698 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
699 * will exit the scheduler and call taskStop(), and any bound thread
700 * that wakes up will return to its caller. Runnable threads are
701 * killed.
702 *
703 * ------------------------------------------------------------------------- */
704
705 void
706 shutdownCapability (Capability *cap, Task *task, rtsBool safe)
707 {
708 nat i;
709
710 task->cap = cap;
711
712 // Loop indefinitely until all the workers have exited and there
713 // are no Haskell threads left. We used to bail out after 50
714 // iterations of this loop, but that occasionally left a worker
715 // running which caused problems later (the closeMutex() below
716 // isn't safe, for one thing).
717
718 for (i = 0; /* i < 50 */; i++) {
719 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
720
721 debugTrace(DEBUG_sched,
722 "shutting down capability %d, attempt %d", cap->no, i);
723 ACQUIRE_LOCK(&cap->lock);
724 if (cap->running_task) {
725 RELEASE_LOCK(&cap->lock);
726 debugTrace(DEBUG_sched, "not owner, yielding");
727 yieldThread();
728 continue;
729 }
730 cap->running_task = task;
731
732 if (cap->spare_workers) {
733 // Look for workers that have died without removing
734 // themselves from the list; this could happen if the OS
735 // summarily killed the thread, for example. This
736 // actually happens on Windows when the system is
737 // terminating the program, and the RTS is running in a
738 // DLL.
739 Task *t, *prev;
740 prev = NULL;
741 for (t = cap->spare_workers; t != NULL; t = t->next) {
742 if (!osThreadIsAlive(t->id)) {
743 debugTrace(DEBUG_sched,
744 "worker thread %p has died unexpectedly", (void *)t->id);
745 if (!prev) {
746 cap->spare_workers = t->next;
747 } else {
748 prev->next = t->next;
749 }
750 prev = t;
751 }
752 }
753 }
754
755 if (!emptyRunQueue(cap) || cap->spare_workers) {
756 debugTrace(DEBUG_sched,
757 "runnable threads or workers still alive, yielding");
758 releaseCapability_(cap,rtsFalse); // this will wake up a worker
759 RELEASE_LOCK(&cap->lock);
760 yieldThread();
761 continue;
762 }
763
764 // If "safe", then busy-wait for any threads currently doing
765 // foreign calls. If we're about to unload this DLL, for
766 // example, we need to be sure that there are no OS threads
767 // that will try to return to code that has been unloaded.
768 // We can be a bit more relaxed when this is a standalone
769 // program that is about to terminate, and let safe=false.
770 if (cap->suspended_ccalling_tasks && safe) {
771 debugTrace(DEBUG_sched,
772 "thread(s) are involved in foreign calls, yielding");
773 cap->running_task = NULL;
774 RELEASE_LOCK(&cap->lock);
775 yieldThread();
776 continue;
777 }
778
779 postEvent(cap, EVENT_SHUTDOWN, 0, 0);
780 debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
781 RELEASE_LOCK(&cap->lock);
782 break;
783 }
784 // we now have the Capability, its run queue and spare workers
785 // list are both empty.
786
787 // ToDo: we can't drop this mutex, because there might still be
788 // threads performing foreign calls that will eventually try to
789 // return via resumeThread() and attempt to grab cap->lock.
790 // closeMutex(&cap->lock);
791 }
792
793 /* ----------------------------------------------------------------------------
794 * tryGrabCapability
795 *
796 * Attempt to gain control of a Capability if it is free.
797 *
798 * ------------------------------------------------------------------------- */
799
800 rtsBool
801 tryGrabCapability (Capability *cap, Task *task)
802 {
803 if (cap->running_task != NULL) return rtsFalse;
804 ACQUIRE_LOCK(&cap->lock);
805 if (cap->running_task != NULL) {
806 RELEASE_LOCK(&cap->lock);
807 return rtsFalse;
808 }
809 task->cap = cap;
810 cap->running_task = task;
811 RELEASE_LOCK(&cap->lock);
812 return rtsTrue;
813 }
814
815
816 #endif /* THREADED_RTS */
817
818 static void
819 freeCapability (Capability *cap)
820 {
821 stgFree(cap->mut_lists);
822 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
823 freeSparkPool(cap->sparks);
824 #endif
825 }
826
827 void
828 freeCapabilities (void)
829 {
830 #if defined(THREADED_RTS)
831 nat i;
832 for (i=0; i < n_capabilities; i++) {
833 freeCapability(&capabilities[i]);
834 }
835 #else
836 freeCapability(&MainCapability);
837 #endif
838 }
839
840 /* ---------------------------------------------------------------------------
841 Mark everything directly reachable from the Capabilities. When
842 using multiple GC threads, each GC thread marks all Capabilities
843 for which (c `mod` n == 0), for Capability c and thread n.
844 ------------------------------------------------------------------------ */
845
846 void
847 markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
848 rtsBool prune_sparks USED_IF_THREADS)
849 {
850 nat i;
851 Capability *cap;
852 Task *task;
853
854 // Each GC thread is responsible for following roots from the
855 // Capability of the same number. There will usually be the same
856 // or fewer Capabilities as GC threads, but just in case there
857 // are more, we mark every Capability whose number is the GC
858 // thread's index plus a multiple of the number of GC threads.
859 for (i = i0; i < n_capabilities; i += delta) {
860 cap = &capabilities[i];
861 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
862 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
863 #if defined(THREADED_RTS)
864 evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
865 evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
866 #endif
867 for (task = cap->suspended_ccalling_tasks; task != NULL;
868 task=task->next) {
869 debugTrace(DEBUG_sched,
870 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
871 evac(user, (StgClosure **)(void *)&task->suspended_tso);
872 }
873
874 #if defined(THREADED_RTS)
875 if (prune_sparks) {
876 pruneSparkQueue (evac, user, cap);
877 } else {
878 traverseSparkQueue (evac, user, cap);
879 }
880 #endif
881 }
882
883 #if !defined(THREADED_RTS)
884 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
885 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
886 evac(user, (StgClosure **)(void *)&sleeping_queue);
887 #endif
888 }
889
890 void
891 markCapabilities (evac_fn evac, void *user)
892 {
893 markSomeCapabilities(evac, user, 0, 1, rtsFalse);
894 }