ff4beb7fd58db9897de1aa14ba8b463a2074b58d
[ghc.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2000-2008
4 *
5 * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6 *
7 -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Schedule.h"
13 #include "RtsUtils.h"
14 #include "Trace.h"
15 #include "Prelude.h"
16 #include "Sparks.h"
17
18 #if defined(THREADED_RTS)
19
20 void
21 initSparkPools( void )
22 {
23 /* walk over the capabilities, allocating a spark pool for each one */
24 nat i;
25 for (i = 0; i < n_capabilities; i++) {
26 capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
27 }
28 }
29
30 void
31 freeSparkPool (SparkPool *pool)
32 {
33 freeWSDeque(pool);
34 }
35
36 /* -----------------------------------------------------------------------------
37 *
38 * Turn a spark into a real thread
39 *
40 * -------------------------------------------------------------------------- */
41
42 void
43 createSparkThread (Capability *cap)
44 {
45 StgTSO *tso;
46
47 tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize,
48 &base_GHCziConc_runSparks_closure);
49
50 postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id);
51
52 appendToRunQueue(cap,tso);
53 }
54
55 /* --------------------------------------------------------------------------
56 * newSpark: create a new spark, as a result of calling "par"
57 * Called directly from STG.
58 * -------------------------------------------------------------------------- */
59
60 StgInt
61 newSpark (StgRegTable *reg, StgClosure *p)
62 {
63 Capability *cap = regTableToCapability(reg);
64 SparkPool *pool = cap->sparks;
65
66 /* I am not sure whether this is the right thing to do.
67 * Maybe it is better to exploit the tag information
68 * instead of throwing it away?
69 */
70 p = UNTAG_CLOSURE(p);
71
72 if (closure_SHOULD_SPARK(p)) {
73 pushWSDeque(pool,p);
74 }
75
76 cap->sparks_created++;
77
78 postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0);
79
80 return 1;
81 }
82
83 /* -----------------------------------------------------------------------------
84 *
85 * tryStealSpark: try to steal a spark from a Capability.
86 *
87 * Returns a valid spark, or NULL if the pool was empty, and can
88 * occasionally return NULL if there was a race with another thread
89 * stealing from the same pool. In this case, try again later.
90 *
91 -------------------------------------------------------------------------- */
92
93 StgClosure *
94 tryStealSpark (Capability *cap)
95 {
96 SparkPool *pool = cap->sparks;
97 StgClosure *stolen;
98
99 do {
100 stolen = stealWSDeque_(pool);
101 // use the no-loopy version, stealWSDeque_(), since if we get a
102 // spurious NULL here the caller may want to try stealing from
103 // other pools before trying again.
104 } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
105
106 return stolen;
107 }
108
109 /* --------------------------------------------------------------------------
110 * Remove all sparks from the spark queues which should not spark any
111 * more. Called after GC. We assume exclusive access to the structure
112 * and replace all sparks in the queue, see explanation below. At exit,
113 * the spark pool only contains sparkable closures.
114 * -------------------------------------------------------------------------- */
115
116 void
117 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
118 {
119 SparkPool *pool;
120 StgClosurePtr spark, tmp, *elements;
121 nat n, pruned_sparks; // stats only
122 StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
123 const StgInfoTable *info;
124
125 n = 0;
126 pruned_sparks = 0;
127
128 pool = cap->sparks;
129
130 // it is possible that top > bottom, indicating an empty pool. We
131 // fix that here; this is only necessary because the loop below
132 // assumes it.
133 if (pool->top > pool->bottom)
134 pool->top = pool->bottom;
135
136 // Take this opportunity to reset top/bottom modulo the size of
137 // the array, to avoid overflow. This is only possible because no
138 // stealing is happening during GC.
139 pool->bottom -= pool->top & ~pool->moduloSize;
140 pool->top &= pool->moduloSize;
141 pool->topBound = pool->top;
142
143 debugTrace(DEBUG_sched,
144 "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
145 sparkPoolSize(pool), pool->bottom, pool->top);
146
147 ASSERT_WSDEQUE_INVARIANTS(pool);
148
149 elements = (StgClosurePtr *)pool->elements;
150
151 /* We have exclusive access to the structure here, so we can reset
152 bottom and top counters, and prune invalid sparks. Contents are
153 copied in-place if they are valuable, otherwise discarded. The
154 routine uses "real" indices t and b, starts by computing them
155 as the modulus size of top and bottom,
156
157 Copying:
158
159 At the beginning, the pool structure can look like this:
160 ( bottom % size >= top % size , no wrap-around)
161 t b
162 ___________***********_________________
163
164 or like this ( bottom % size < top % size, wrap-around )
165 b t
166 ***********__________******************
167 As we need to remove useless sparks anyway, we make one pass
168 between t and b, moving valuable content to b and subsequent
169 cells (wrapping around when the size is reached).
170
171 b t
172 ***********OOO_______XX_X__X?**********
173 ^____move?____/
174
175 After this movement, botInd becomes the new bottom, and old
176 bottom becomes the new top index, both as indices in the array
177 size range.
178 */
179 // starting here
180 currInd = (pool->top) & (pool->moduloSize); // mod
181
182 // copies of evacuated closures go to space from botInd on
183 // we keep oldBotInd to know when to stop
184 oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
185
186 // on entry to loop, we are within the bounds
187 ASSERT( currInd < pool->size && botInd < pool->size );
188
189 while (currInd != oldBotInd ) {
190 /* must use != here, wrap-around at size
191 subtle: loop not entered if queue empty
192 */
193
194 /* check element at currInd. if valuable, evacuate and move to
195 botInd, otherwise move on */
196 spark = elements[currInd];
197
198 // We have to be careful here: in the parallel GC, another
199 // thread might evacuate this closure while we're looking at it,
200 // so grab the info pointer just once.
201 info = spark->header.info;
202 if (IS_FORWARDING_PTR(info)) {
203 tmp = (StgClosure*)UN_FORWARDING_PTR(info);
204 /* if valuable work: shift inside the pool */
205 if (closure_SHOULD_SPARK(tmp)) {
206 elements[botInd] = tmp; // keep entry (new address)
207 botInd++;
208 n++;
209 } else {
210 pruned_sparks++; // discard spark
211 cap->sparks_pruned++;
212 }
213 } else {
214 if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
215 elements[botInd] = spark; // keep entry (new address)
216 evac (user, &elements[botInd]);
217 botInd++;
218 n++;
219 } else {
220 pruned_sparks++; // discard spark
221 cap->sparks_pruned++;
222 }
223 }
224 currInd++;
225
226 // in the loop, we may reach the bounds, and instantly wrap around
227 ASSERT( currInd <= pool->size && botInd <= pool->size );
228 if ( currInd == pool->size ) { currInd = 0; }
229 if ( botInd == pool->size ) { botInd = 0; }
230
231 } // while-loop over spark pool elements
232
233 ASSERT(currInd == oldBotInd);
234
235 pool->top = oldBotInd; // where we started writing
236 pool->topBound = pool->top;
237
238 pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size);
239 // first free place we did not use (corrected by wraparound)
240
241 debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
242
243 debugTrace(DEBUG_sched,
244 "new spark queue len=%ld; (hd=%ld; tl=%ld)",
245 sparkPoolSize(pool), pool->bottom, pool->top);
246
247 ASSERT_WSDEQUE_INVARIANTS(pool);
248 }
249
250 /* GC for the spark pool, called inside Capability.c for all
251 capabilities in turn. Blindly "evac"s complete spark pool. */
252 void
253 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
254 {
255 StgClosure **sparkp;
256 SparkPool *pool;
257 StgWord top,bottom, modMask;
258
259 pool = cap->sparks;
260
261 ASSERT_WSDEQUE_INVARIANTS(pool);
262
263 top = pool->top;
264 bottom = pool->bottom;
265 sparkp = (StgClosurePtr*)pool->elements;
266 modMask = pool->moduloSize;
267
268 while (top < bottom) {
269 /* call evac for all closures in range (wrap-around via modulo)
270 * In GHC-6.10, evac takes an additional 1st argument to hold a
271 * GC-specific register, see rts/sm/GC.c::mark_root()
272 */
273 evac( user , sparkp + (top & modMask) );
274 top++;
275 }
276
277 debugTrace(DEBUG_sched,
278 "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
279 sparkPoolSize(pool), pool->bottom, pool->top);
280 }
281
282 /* ----------------------------------------------------------------------------
283 * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
284 * capabilities) and its size. Accesses all spark pools and equally
285 * distributes the sparks among them.
286 *
287 * Could be called after GC, before Cap. release, from scheduler.
288 * -------------------------------------------------------------------------- */
289 void balanceSparkPoolsCaps(nat n_caps, Capability caps[])
290 GNUC3_ATTRIBUTE(__noreturn__);
291
292 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED,
293 Capability caps[] STG_UNUSED) {
294 barf("not implemented");
295 }
296
297 #else
298
299 StgInt
300 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
301 {
302 /* nothing */
303 return 1;
304 }
305
306 #endif /* THREADED_RTS */