Remove incorrect assertions in steal()
[ghc.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2000-2008
4 *
5 * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6 *
7 * The implementation uses Double-Ended Queues with lock-free access
8 * (thereby often called "deque") as described in
9 *
10 * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11 * SPAA'05, July 2005, Las Vegas, USA.
12 * ACM 1-58113-986-1/05/0007
13 *
14 * Author: Jost Berthold MSRC 07-09/2008
15 *
16 * The DeQue is held as a circular array with known length. Positions
17 * of top (read-end) and bottom (write-end) always increase, and the
18 * array is accessed with indices modulo array-size. While this bears
19 * the risk of overflow, we assume that (with 64 bit indices), a
20 * program must run very long to reach that point.
21 *
22 * The write end of the queue (position bottom) can only be used with
23 * mutual exclusion, i.e. by exactly one caller at a time. At this
24 * end, new items can be enqueued using pushBottom()/newSpark(), and
25 * removed using popBottom()/reclaimSpark() (the latter implying a cas
26 * synchronisation with potential concurrent readers for the case of
27 * just one element).
28 *
29 * Multiple readers can steal()/findSpark() from the read end
30 * (position top), and are synchronised without a lock, based on a cas
31 * of the top position. One reader wins, the others return NULL for a
32 * failure.
33 *
34 * Both popBottom and steal also return NULL when the queue is empty.
35 *
36 -------------------------------------------------------------------------*/
37
38 #include "PosixSource.h"
39 #include "Rts.h"
40 #include "Storage.h"
41 #include "Schedule.h"
42 #include "SchedAPI.h"
43 #include "RtsFlags.h"
44 #include "RtsUtils.h"
45 #include "ParTicky.h"
46 #include "Trace.h"
47 #include "Prelude.h"
48
49 #include "SMP.h" // for cas
50
51 #include "Sparks.h"
52
53 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
54
55 /* internal helpers ... */
56
57 static StgWord
58 roundUp2(StgWord val)
59 {
60 StgWord rounded = 1;
61
62 /* StgWord is unsigned anyway, only catch 0 */
63 if (val == 0) {
64 barf("DeQue,roundUp2: invalid size 0 requested");
65 }
66 /* at least 1 bit set, shift up to its place */
67 do {
68 rounded = rounded << 1;
69 } while (0 != (val = val>>1));
70 return rounded;
71 }
72
73 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
74
75 /* -----------------------------------------------------------------------------
76 *
77 * Initialising spark pools.
78 *
79 * -------------------------------------------------------------------------- */
80
81 /* constructor */
82 static SparkPool*
83 initPool(StgWord size)
84 {
85 StgWord realsize;
86 SparkPool *q;
87
88 realsize = roundUp2(size); /* to compute modulo as a bitwise & */
89
90 q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */
91 "newSparkPool");
92 q->elements = (StgClosurePtr*)
93 stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
94 "newSparkPool:data space");
95 q->top=0;
96 q->bottom=0;
97 q->topBound=0; /* read by writer, updated each time top is read */
98
99 q->size = realsize; /* power of 2 */
100 q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
101
102 ASSERT_SPARK_POOL_INVARIANTS(q);
103 return q;
104 }
105
106 void
107 initSparkPools( void )
108 {
109 #ifdef THREADED_RTS
110 /* walk over the capabilities, allocating a spark pool for each one */
111 nat i;
112 for (i = 0; i < n_capabilities; i++) {
113 capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
114 }
115 #else
116 /* allocate a single spark pool */
117 MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
118 #endif
119 }
120
121 void
122 freeSparkPool (SparkPool *pool)
123 {
124 /* should not interfere with concurrent findSpark() calls! And
125 nobody should use the pointer any more. We cross our fingers...*/
126 stgFree(pool->elements);
127 stgFree(pool);
128 }
129
130 /* -----------------------------------------------------------------------------
131 *
132 * reclaimSpark: remove a spark from the write end of the queue.
133 * Returns the removed spark, and NULL if a race is lost or the pool
134 * empty.
135 *
136 * If only one spark is left in the pool, we synchronise with
137 * concurrently stealing threads by using cas to modify the top field.
138 * This routine should NEVER be called by a task which does not own
139 * the capability. Can this be checked here?
140 *
141 * -------------------------------------------------------------------------- */
142
143 StgClosure *
144 reclaimSpark (SparkPool *deque)
145 {
146 /* also a bit tricky, has to avoid concurrent steal() calls by
147 accessing top with cas, when there is only one element left */
148 StgWord t, b;
149 StgClosurePtr* pos;
150 long currSize;
151 StgClosurePtr removed;
152
153 ASSERT_SPARK_POOL_INVARIANTS(deque);
154
155 b = deque->bottom;
156 /* "decrement b as a test, see what happens" */
157 deque->bottom = --b;
158 pos = (deque->elements) + (b & (deque->moduloSize));
159 t = deque->top; /* using topBound would give an *upper* bound, we
160 need a lower bound. We use the real top here, but
161 can update the topBound value */
162 deque->topBound = t;
163 currSize = b - t;
164 if (currSize < 0) { /* was empty before decrementing b, set b
165 consistently and abort */
166 deque->bottom = t;
167 return NULL;
168 }
169 removed = *pos;
170 if (currSize > 0) { /* no danger, still elements in buffer after b-- */
171 return removed;
172 }
173 /* otherwise, has someone meanwhile stolen the same (last) element?
174 Check and increment top value to know */
175 if ( !(CASTOP(&(deque->top),t,t+1)) ) {
176 removed = NULL; /* no success, but continue adjusting bottom */
177 }
178 deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
179 deque->topBound = t+1; /* ...and cached top value as well */
180
181 ASSERT_SPARK_POOL_INVARIANTS(deque);
182
183 return removed;
184 }
185
186 /* -----------------------------------------------------------------------------
187 *
188 * tryStealSpark: try to steal a spark from a Capability.
189 *
190 * Returns a valid spark, or NULL if the pool was empty, and can
191 * occasionally return NULL if there was a race with another thread
192 * stealing from the same pool. In this case, try again later.
193 *
194 -------------------------------------------------------------------------- */
195
196 static StgClosurePtr
197 steal(SparkPool *deque)
198 {
199 StgClosurePtr* pos;
200 StgClosurePtr* arraybase;
201 StgWord sz;
202 StgClosurePtr stolen;
203 StgWord b,t;
204
205 // Can't do this on someone else's spark pool:
206 // ASSERT_SPARK_POOL_INVARIANTS(deque);
207
208 b = deque->bottom;
209 t = deque->top;
210
211 if (b - t <= 0 ) {
212 return NULL; /* already looks empty, abort */
213 }
214
215 /* now access array, see pushBottom() */
216 arraybase = deque->elements;
217 sz = deque->moduloSize;
218 pos = arraybase + (t & sz);
219 stolen = *pos;
220
221 /* now decide whether we have won */
222 if ( !(CASTOP(&(deque->top),t,t+1)) ) {
223 /* lost the race, someon else has changed top in the meantime */
224 return NULL;
225 } /* else: OK, top has been incremented by the cas call */
226
227 // Can't do this on someone else's spark pool:
228 // ASSERT_SPARK_POOL_INVARIANTS(deque);
229
230 /* return stolen element */
231 return stolen;
232 }
233
234 StgClosure *
235 tryStealSpark (Capability *cap)
236 {
237 SparkPool *pool = cap->sparks;
238 StgClosure *stolen;
239
240 do {
241 stolen = steal(pool);
242 } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
243
244 return stolen;
245 }
246
247
248 /* -----------------------------------------------------------------------------
249 *
250 * "guesses" whether a deque is empty. Can return false negatives in
251 * presence of concurrent steal() calls, and false positives in
252 * presence of a concurrent pushBottom().
253 *
254 * -------------------------------------------------------------------------- */
255
256 rtsBool
257 looksEmpty(SparkPool* deque)
258 {
259 StgWord t = deque->top;
260 StgWord b = deque->bottom;
261 /* try to prefer false negatives by reading top first */
262 return (b - t <= 0);
263 /* => array is *never* completely filled, always 1 place free! */
264 }
265
266 /* -----------------------------------------------------------------------------
267 *
268 * Turn a spark into a real thread
269 *
270 * -------------------------------------------------------------------------- */
271
272 void
273 createSparkThread (Capability *cap)
274 {
275 StgTSO *tso;
276
277 tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize,
278 &base_GHCziConc_runSparks_closure);
279 appendToRunQueue(cap,tso);
280 }
281
282 /* -----------------------------------------------------------------------------
283 *
284 * Create a new spark
285 *
286 * -------------------------------------------------------------------------- */
287
288 #define DISCARD_NEW
289
290 /* enqueue an element. Should always succeed by resizing the array
291 (not implemented yet, silently fails in that case). */
292 static void
293 pushBottom (SparkPool* deque, StgClosurePtr elem)
294 {
295 StgWord t;
296 StgClosurePtr* pos;
297 StgWord sz = deque->moduloSize;
298 StgWord b = deque->bottom;
299
300 ASSERT_SPARK_POOL_INVARIANTS(deque);
301
302 /* we try to avoid reading deque->top (accessed by all) and use
303 deque->topBound (accessed only by writer) instead.
304 This is why we do not just call empty(deque) here.
305 */
306 t = deque->topBound;
307 if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
308 /* could be full, check the real top value in this case */
309 t = deque->top;
310 deque->topBound = t;
311 if (b - t >= sz) { /* really no space left :-( */
312 /* reallocate the array, copying the values. Concurrent steal()s
313 will in the meantime use the old one and modify only top.
314 This means: we cannot safely free the old space! Can keep it
315 on a free list internally here...
316
317 Potential bug in combination with steal(): if array is
318 replaced, it is unclear which one concurrent steal operations
319 use. Must read the array base address in advance in steal().
320 */
321 #if defined(DISCARD_NEW)
322 ASSERT_SPARK_POOL_INVARIANTS(deque);
323 return; /* for now, silently fail */
324 #else
325 /* could make room by incrementing the top position here. In
326 * this case, should use CASTOP. If this fails, someone else has
327 * removed something, and new room will be available.
328 */
329 ASSERT_SPARK_POOL_INVARIANTS(deque);
330 #endif
331 }
332 }
333 pos = (deque->elements) + (b & sz);
334 *pos = elem;
335 (deque->bottom)++;
336
337 ASSERT_SPARK_POOL_INVARIANTS(deque);
338 return;
339 }
340
341
342 /* --------------------------------------------------------------------------
343 * newSpark: create a new spark, as a result of calling "par"
344 * Called directly from STG.
345 * -------------------------------------------------------------------------- */
346
347 StgInt
348 newSpark (StgRegTable *reg, StgClosure *p)
349 {
350 Capability *cap = regTableToCapability(reg);
351 SparkPool *pool = cap->sparks;
352
353 /* I am not sure whether this is the right thing to do.
354 * Maybe it is better to exploit the tag information
355 * instead of throwing it away?
356 */
357 p = UNTAG_CLOSURE(p);
358
359 ASSERT_SPARK_POOL_INVARIANTS(pool);
360
361 if (closure_SHOULD_SPARK(p)) {
362 pushBottom(pool,p);
363 }
364
365 cap->sparks_created++;
366
367 ASSERT_SPARK_POOL_INVARIANTS(pool);
368 return 1;
369 }
370
371
372
373 /* --------------------------------------------------------------------------
374 * Remove all sparks from the spark queues which should not spark any
375 * more. Called after GC. We assume exclusive access to the structure
376 * and replace all sparks in the queue, see explanation below. At exit,
377 * the spark pool only contains sparkable closures.
378 * -------------------------------------------------------------------------- */
379
380 void
381 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
382 {
383 SparkPool *pool;
384 StgClosurePtr spark, tmp, *elements;
385 nat n, pruned_sparks; // stats only
386 StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
387 const StgInfoTable *info;
388
389 PAR_TICKY_MARK_SPARK_QUEUE_START();
390
391 n = 0;
392 pruned_sparks = 0;
393
394 pool = cap->sparks;
395
396 // it is possible that top > bottom, indicating an empty pool. We
397 // fix that here; this is only necessary because the loop below
398 // assumes it.
399 if (pool->top > pool->bottom)
400 pool->top = pool->bottom;
401
402 // Take this opportunity to reset top/bottom modulo the size of
403 // the array, to avoid overflow. This is only possible because no
404 // stealing is happening during GC.
405 pool->bottom -= pool->top & ~pool->moduloSize;
406 pool->top &= pool->moduloSize;
407 pool->topBound = pool->top;
408
409 debugTrace(DEBUG_sched,
410 "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
411 sparkPoolSize(pool), pool->bottom, pool->top);
412 ASSERT_SPARK_POOL_INVARIANTS(pool);
413
414 elements = pool->elements;
415
416 /* We have exclusive access to the structure here, so we can reset
417 bottom and top counters, and prune invalid sparks. Contents are
418 copied in-place if they are valuable, otherwise discarded. The
419 routine uses "real" indices t and b, starts by computing them
420 as the modulus size of top and bottom,
421
422 Copying:
423
424 At the beginning, the pool structure can look like this:
425 ( bottom % size >= top % size , no wrap-around)
426 t b
427 ___________***********_________________
428
429 or like this ( bottom % size < top % size, wrap-around )
430 b t
431 ***********__________******************
432 As we need to remove useless sparks anyway, we make one pass
433 between t and b, moving valuable content to b and subsequent
434 cells (wrapping around when the size is reached).
435
436 b t
437 ***********OOO_______XX_X__X?**********
438 ^____move?____/
439
440 After this movement, botInd becomes the new bottom, and old
441 bottom becomes the new top index, both as indices in the array
442 size range.
443 */
444 // starting here
445 currInd = (pool->top) & (pool->moduloSize); // mod
446
447 // copies of evacuated closures go to space from botInd on
448 // we keep oldBotInd to know when to stop
449 oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
450
451 // on entry to loop, we are within the bounds
452 ASSERT( currInd < pool->size && botInd < pool->size );
453
454 while (currInd != oldBotInd ) {
455 /* must use != here, wrap-around at size
456 subtle: loop not entered if queue empty
457 */
458
459 /* check element at currInd. if valuable, evacuate and move to
460 botInd, otherwise move on */
461 spark = elements[currInd];
462
463 // We have to be careful here: in the parallel GC, another
464 // thread might evacuate this closure while we're looking at it,
465 // so grab the info pointer just once.
466 info = spark->header.info;
467 if (IS_FORWARDING_PTR(info)) {
468 tmp = (StgClosure*)UN_FORWARDING_PTR(info);
469 /* if valuable work: shift inside the pool */
470 if (closure_SHOULD_SPARK(tmp)) {
471 elements[botInd] = tmp; // keep entry (new address)
472 botInd++;
473 n++;
474 } else {
475 pruned_sparks++; // discard spark
476 cap->sparks_pruned++;
477 }
478 } else {
479 if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
480 elements[botInd] = spark; // keep entry (new address)
481 evac (user, &elements[botInd]);
482 botInd++;
483 n++;
484 } else {
485 pruned_sparks++; // discard spark
486 cap->sparks_pruned++;
487 }
488 }
489 currInd++;
490
491 // in the loop, we may reach the bounds, and instantly wrap around
492 ASSERT( currInd <= pool->size && botInd <= pool->size );
493 if ( currInd == pool->size ) { currInd = 0; }
494 if ( botInd == pool->size ) { botInd = 0; }
495
496 } // while-loop over spark pool elements
497
498 ASSERT(currInd == oldBotInd);
499
500 pool->top = oldBotInd; // where we started writing
501 pool->topBound = pool->top;
502
503 pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size);
504 // first free place we did not use (corrected by wraparound)
505
506 PAR_TICKY_MARK_SPARK_QUEUE_END(n);
507
508 debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
509
510 debugTrace(DEBUG_sched,
511 "new spark queue len=%d; (hd=%ld; tl=%ld)",
512 sparkPoolSize(pool), pool->bottom, pool->top);
513
514 ASSERT_SPARK_POOL_INVARIANTS(pool);
515 }
516
517 /* GC for the spark pool, called inside Capability.c for all
518 capabilities in turn. Blindly "evac"s complete spark pool. */
519 void
520 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
521 {
522 StgClosure **sparkp;
523 SparkPool *pool;
524 StgWord top,bottom, modMask;
525
526 pool = cap->sparks;
527
528 ASSERT_SPARK_POOL_INVARIANTS(pool);
529
530 top = pool->top;
531 bottom = pool->bottom;
532 sparkp = pool->elements;
533 modMask = pool->moduloSize;
534
535 while (top < bottom) {
536 /* call evac for all closures in range (wrap-around via modulo)
537 * In GHC-6.10, evac takes an additional 1st argument to hold a
538 * GC-specific register, see rts/sm/GC.c::mark_root()
539 */
540 evac( user , sparkp + (top & modMask) );
541 top++;
542 }
543
544 debugTrace(DEBUG_sched,
545 "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
546 sparkPoolSize(pool), pool->bottom, pool->top);
547 }
548
549 /* ----------------------------------------------------------------------------
550 * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
551 * capabilities) and its size. Accesses all spark pools and equally
552 * distributes the sparks among them.
553 *
554 * Could be called after GC, before Cap. release, from scheduler.
555 * -------------------------------------------------------------------------- */
556 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
557
558 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED,
559 Capability caps[] STG_UNUSED) {
560 barf("not implemented");
561 }
562
563 #else
564
565 StgInt
566 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
567 {
568 /* nothing */
569 return 1;
570 }
571
572
573 #endif /* PARALLEL_HASKELL || THREADED_RTS */
574
575
576 /* -----------------------------------------------------------------------------
577 *
578 * GRAN & PARALLEL_HASKELL stuff beyond here.
579 *
580 * TODO "nuke" this!
581 *
582 * -------------------------------------------------------------------------- */
583
584 #if defined(PARALLEL_HASKELL) || defined(GRAN)
585
586 static void slide_spark_pool( StgSparkPool *pool );
587
588 rtsBool
589 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
590 {
591 if (pool->tl == pool->lim)
592 slide_spark_pool(pool);
593
594 if (closure_SHOULD_SPARK(closure) &&
595 pool->tl < pool->lim) {
596 *(pool->tl++) = closure;
597
598 #if defined(PARALLEL_HASKELL)
599 // collect parallel global statistics (currently done together with GC stats)
600 if (RtsFlags.ParFlags.ParStats.Global &&
601 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
602 // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime());
603 globalParStats.tot_sparks_created++;
604 }
605 #endif
606 return rtsTrue;
607 } else {
608 #if defined(PARALLEL_HASKELL)
609 // collect parallel global statistics (currently done together with GC stats)
610 if (RtsFlags.ParFlags.ParStats.Global &&
611 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
612 //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime());
613 globalParStats.tot_sparks_ignored++;
614 }
615 #endif
616 return rtsFalse;
617 }
618 }
619
620 static void
621 slide_spark_pool( StgSparkPool *pool )
622 {
623 StgClosure **sparkp, **to_sparkp;
624
625 sparkp = pool->hd;
626 to_sparkp = pool->base;
627 while (sparkp < pool->tl) {
628 ASSERT(to_sparkp<=sparkp);
629 ASSERT(*sparkp!=NULL);
630 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
631
632 if (closure_SHOULD_SPARK(*sparkp)) {
633 *to_sparkp++ = *sparkp++;
634 } else {
635 sparkp++;
636 }
637 }
638 pool->hd = pool->base;
639 pool->tl = to_sparkp;
640 }
641
642 void
643 disposeSpark(spark)
644 StgClosure *spark;
645 {
646 #if !defined(THREADED_RTS)
647 Capability *cap;
648 StgSparkPool *pool;
649
650 cap = &MainRegTable;
651 pool = &(cap->rSparks);
652 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
653 #endif
654 ASSERT(spark != (StgClosure *)NULL);
655 /* Do nothing */
656 }
657
658
659 #elif defined(GRAN)
660
661 /*
662 Search the spark queue of the proc in event for a spark that's worth
663 turning into a thread
664 (was gimme_spark in the old RTS)
665 */
666 void
667 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
668 {
669 PEs proc = event->proc, /* proc to search for work */
670 creator = event->creator; /* proc that requested work */
671 StgClosure* node;
672 rtsBool found;
673 rtsSparkQ spark_of_non_local_node = NULL,
674 spark_of_non_local_node_prev = NULL,
675 low_priority_spark = NULL,
676 low_priority_spark_prev = NULL,
677 spark = NULL, prev = NULL;
678
679 /* Choose a spark from the local spark queue */
680 prev = (rtsSpark*)NULL;
681 spark = pending_sparks_hds[proc];
682 found = rtsFalse;
683
684 // ToDo: check this code & implement local sparking !! -- HWL
685 while (!found && spark != (rtsSpark*)NULL)
686 {
687 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
688 (prev==(rtsSpark*)NULL || prev->next==spark) &&
689 (spark->prev==prev));
690 node = spark->node;
691 if (!closure_SHOULD_SPARK(node))
692 {
693 IF_GRAN_DEBUG(checkSparkQ,
694 debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
695 spark, node));
696
697 if (RtsFlags.GranFlags.GranSimStats.Sparks)
698 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
699 spark->node, spark->name, spark_queue_len(proc));
700
701 ASSERT(spark != (rtsSpark*)NULL);
702 ASSERT(SparksAvail>0);
703 --SparksAvail;
704
705 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
706 spark = delete_from_sparkq (spark, proc, rtsTrue);
707 if (spark != (rtsSpark*)NULL)
708 prev = spark->prev;
709 continue;
710 }
711 /* -- node should eventually be sparked */
712 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
713 !IS_LOCAL_TO(PROCS(node),CurrentProc))
714 {
715 barf("Local sparking not yet implemented");
716
717 /* Remember first low priority spark */
718 if (spark_of_non_local_node==(rtsSpark*)NULL) {
719 spark_of_non_local_node_prev = prev;
720 spark_of_non_local_node = spark;
721 }
722
723 if (spark->next == (rtsSpark*)NULL) {
724 /* ASSERT(spark==SparkQueueTl); just for testing */
725 prev = spark_of_non_local_node_prev;
726 spark = spark_of_non_local_node;
727 found = rtsTrue;
728 break;
729 }
730
731 # if defined(GRAN) && defined(GRAN_CHECK)
732 /* Should never happen; just for testing
733 if (spark==pending_sparks_tl) {
734 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
735 stg_exit(EXIT_FAILURE);
736 } */
737 # endif
738 prev = spark;
739 spark = spark->next;
740 ASSERT(SparksAvail>0);
741 --SparksAvail;
742 continue;
743 }
744 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
745 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
746 {
747 if (RtsFlags.GranFlags.DoPrioritySparking)
748 barf("Priority sparking not yet implemented");
749
750 found = rtsTrue;
751 }
752 #if 0
753 else /* only used if SparkPriority2 is defined */
754 {
755 /* ToDo: fix the code below and re-integrate it */
756 /* Remember first low priority spark */
757 if (low_priority_spark==(rtsSpark*)NULL) {
758 low_priority_spark_prev = prev;
759 low_priority_spark = spark;
760 }
761
762 if (spark->next == (rtsSpark*)NULL) {
763 /* ASSERT(spark==spark_queue_tl); just for testing */
764 prev = low_priority_spark_prev;
765 spark = low_priority_spark;
766 found = rtsTrue; /* take low pri spark => rc is 2 */
767 break;
768 }
769
770 /* Should never happen; just for testing
771 if (spark==pending_sparks_tl) {
772 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
773 stg_exit(EXIT_FAILURE);
774 break;
775 } */
776 prev = spark;
777 spark = spark->next;
778
779 IF_GRAN_DEBUG(pri,
780 debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
781 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
782 spark->node, spark->name);)
783 }
784 #endif
785 } /* while (spark!=NULL && !found) */
786
787 *spark_res = spark;
788 *found_res = found;
789 }
790
791 /*
792 Turn the spark into a thread.
793 In GranSim this basically means scheduling a StartThread event for the
794 node pointed to by the spark at some point in the future.
795 (was munch_spark in the old RTS)
796 */
797 rtsBool
798 activateSpark (rtsEvent *event, rtsSparkQ spark)
799 {
800 PEs proc = event->proc, /* proc to search for work */
801 creator = event->creator; /* proc that requested work */
802 StgTSO* tso;
803 StgClosure* node;
804 rtsTime spark_arrival_time;
805
806 /*
807 We've found a node on PE proc requested by PE creator.
808 If proc==creator we can turn the spark into a thread immediately;
809 otherwise we schedule a MoveSpark event on the requesting PE
810 */
811
812 /* DaH Qu' yIchen */
813 if (proc!=creator) {
814
815 /* only possible if we simulate GUM style fishing */
816 ASSERT(RtsFlags.GranFlags.Fishing);
817
818 /* Message packing costs for sending a Fish; qeq jabbI'ID */
819 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
820
821 if (RtsFlags.GranFlags.GranSimStats.Sparks)
822 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
823 (StgTSO*)NULL, spark->node,
824 spark->name, spark_queue_len(proc));
825
826 /* time of the spark arrival on the remote PE */
827 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
828
829 new_event(creator, proc, spark_arrival_time,
830 MoveSpark,
831 (StgTSO*)NULL, spark->node, spark);
832
833 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
834
835 } else { /* proc==creator i.e. turn the spark into a thread */
836
837 if ( RtsFlags.GranFlags.GranSimStats.Global &&
838 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
839
840 globalGranStats.tot_low_pri_sparks++;
841 IF_GRAN_DEBUG(pri,
842 debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
843 spark->gran_info,
844 spark->node, spark->name));
845 }
846
847 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
848
849 node = spark->node;
850
851 # if 0
852 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
853 if (GARBAGE COLLECTION IS NECESSARY) {
854 /* Some kind of backoff needed here in case there's too little heap */
855 # if defined(GRAN_CHECK) && defined(GRAN)
856 if (RtsFlags.GcFlags.giveStats)
857 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
858 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
859 spark, node, spark->name);
860 # endif
861 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
862 FindWork,
863 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
864 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
865 GarbageCollect(GetRoots, rtsFalse);
866 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
867 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
868 spark = NULL;
869 return; /* was: continue; */ /* to the next event, eventually */
870 }
871 # endif
872
873 if (RtsFlags.GranFlags.GranSimStats.Sparks)
874 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
875 spark->node, spark->name,
876 spark_queue_len(CurrentProc));
877
878 new_event(proc, proc, CurrentTime[proc],
879 StartThread,
880 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
881
882 procStatus[proc] = Starting;
883 }
884 }
885
886 /* -------------------------------------------------------------------------
887 This is the main point where handling granularity information comes into
888 play.
889 ------------------------------------------------------------------------- */
890
891 #define MAX_RAND_PRI 100
892
893 /*
894 Granularity info transformers.
895 Applied to the GRAN_INFO field of a spark.
896 */
897 STATIC_INLINE nat ID(nat x) { return(x); };
898 STATIC_INLINE nat INV(nat x) { return(-x); };
899 STATIC_INLINE nat IGNORE(nat x) { return (0); };
900 STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
901
902 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
903 rtsSpark *
904 newSpark(node,name,gran_info,size_info,par_info,local)
905 StgClosure *node;
906 nat name, gran_info, size_info, par_info, local;
907 {
908 nat pri;
909 rtsSpark *newspark;
910
911 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
912 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
913 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
914 ID(gran_info);
915
916 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
917 pri<RtsFlags.GranFlags.SparkPriority ) {
918 IF_GRAN_DEBUG(pri,
919 debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
920 pri, RtsFlags.GranFlags.SparkPriority, node, name));
921 return ((rtsSpark*)NULL);
922 }
923
924 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
925 newspark->prev = newspark->next = (rtsSpark*)NULL;
926 newspark->node = node;
927 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
928 newspark->gran_info = pri;
929 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
930
931 if (RtsFlags.GranFlags.GranSimStats.Global) {
932 globalGranStats.tot_sparks_created++;
933 globalGranStats.sparks_created_on_PE[CurrentProc]++;
934 }
935
936 return(newspark);
937 }
938
939 void
940 disposeSpark(spark)
941 rtsSpark *spark;
942 {
943 ASSERT(spark!=NULL);
944 stgFree(spark);
945 }
946
947 void
948 disposeSparkQ(spark)
949 rtsSparkQ spark;
950 {
951 if (spark==NULL)
952 return;
953
954 disposeSparkQ(spark->next);
955
956 # ifdef GRAN_CHECK
957 if (SparksAvail < 0) {
958 debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
959 print_spark(spark);
960 }
961 # endif
962
963 stgFree(spark);
964 }
965
966 /*
967 With PrioritySparking add_to_spark_queue performs an insert sort to keep
968 the spark queue sorted. Otherwise the spark is just added to the end of
969 the queue.
970 */
971
972 void
973 add_to_spark_queue(spark)
974 rtsSpark *spark;
975 {
976 rtsSpark *prev = NULL, *next = NULL;
977 nat count = 0;
978 rtsBool found = rtsFalse;
979
980 if ( spark == (rtsSpark *)NULL ) {
981 return;
982 }
983
984 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
985 /* Priority sparking is enabled i.e. spark queues must be sorted */
986
987 for (prev = NULL, next = pending_sparks_hd, count=0;
988 (next != NULL) &&
989 !(found = (spark->gran_info >= next->gran_info));
990 prev = next, next = next->next, count++)
991 {}
992
993 } else { /* 'utQo' */
994 /* Priority sparking is disabled */
995
996 found = rtsFalse; /* to add it at the end */
997
998 }
999
1000 if (found) {
1001 /* next points to the first spark with a gran_info smaller than that
1002 of spark; therefore, add spark before next into the spark queue */
1003 spark->next = next;
1004 if ( next == NULL ) {
1005 pending_sparks_tl = spark;
1006 } else {
1007 next->prev = spark;
1008 }
1009 spark->prev = prev;
1010 if ( prev == NULL ) {
1011 pending_sparks_hd = spark;
1012 } else {
1013 prev->next = spark;
1014 }
1015 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
1016 /* add the spark at the end of the spark queue */
1017 spark->next = NULL;
1018 spark->prev = pending_sparks_tl;
1019 if (pending_sparks_hd == NULL)
1020 pending_sparks_hd = spark;
1021 else
1022 pending_sparks_tl->next = spark;
1023 pending_sparks_tl = spark;
1024 }
1025 ++SparksAvail;
1026
1027 /* add costs for search in priority sparking */
1028 if (RtsFlags.GranFlags.DoPrioritySparking) {
1029 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
1030 }
1031
1032 IF_GRAN_DEBUG(checkSparkQ,
1033 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
1034 spark, spark->node, CurrentProc);
1035 print_sparkq_stats());
1036
1037 # if defined(GRAN_CHECK)
1038 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
1039 for (prev = NULL, next = pending_sparks_hd;
1040 (next != NULL);
1041 prev = next, next = next->next)
1042 {}
1043 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
1044 debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1045 spark,CurrentProc,
1046 pending_sparks_tl, prev);
1047 }
1048 # endif
1049
1050 # if defined(GRAN_CHECK)
1051 /* Check if the sparkq is still sorted. Just for testing, really! */
1052 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
1053 RtsFlags.GranFlags.Debug.pri ) {
1054 rtsBool sorted = rtsTrue;
1055 rtsSpark *prev, *next;
1056
1057 if (pending_sparks_hd == NULL ||
1058 pending_sparks_hd->next == NULL ) {
1059 /* just 1 elem => ok */
1060 } else {
1061 for (prev = pending_sparks_hd,
1062 next = pending_sparks_hd->next;
1063 (next != NULL) ;
1064 prev = next, next = next->next) {
1065 sorted = sorted &&
1066 (prev->gran_info >= next->gran_info);
1067 }
1068 }
1069 if (!sorted) {
1070 debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
1071 CurrentProc);
1072 print_sparkq(CurrentProc);
1073 }
1074 }
1075 # endif
1076 }
1077
1078 nat
1079 spark_queue_len(proc)
1080 PEs proc;
1081 {
1082 rtsSpark *prev, *spark; /* prev only for testing !! */
1083 nat len;
1084
1085 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
1086 spark != NULL;
1087 len++, prev = spark, spark = spark->next)
1088 {}
1089
1090 # if defined(GRAN_CHECK)
1091 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
1092 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
1093 debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1094 proc, pending_sparks_tls[proc], prev);
1095 # endif
1096
1097 return (len);
1098 }
1099
1100 /*
1101 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
1102 hd and tl pointers of the spark queue. Returns a pointer to the next
1103 spark in the queue.
1104 */
1105 rtsSpark *
1106 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
1107 rtsSpark *spark;
1108 PEs p;
1109 rtsBool dispose_too;
1110 {
1111 rtsSpark *new_spark;
1112
1113 if (spark==NULL)
1114 barf("delete_from_sparkq: trying to delete NULL spark\n");
1115
1116 # if defined(GRAN_CHECK)
1117 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1118 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
1119 pending_sparks_hd, pending_sparks_tl,
1120 spark->prev, spark, spark->next,
1121 (spark->next==NULL ? 0 : spark->next->prev));
1122 }
1123 # endif
1124
1125 if (spark->prev==NULL) {
1126 /* spark is first spark of queue => adjust hd pointer */
1127 ASSERT(pending_sparks_hds[p]==spark);
1128 pending_sparks_hds[p] = spark->next;
1129 } else {
1130 spark->prev->next = spark->next;
1131 }
1132 if (spark->next==NULL) {
1133 ASSERT(pending_sparks_tls[p]==spark);
1134 /* spark is first spark of queue => adjust tl pointer */
1135 pending_sparks_tls[p] = spark->prev;
1136 } else {
1137 spark->next->prev = spark->prev;
1138 }
1139 new_spark = spark->next;
1140
1141 # if defined(GRAN_CHECK)
1142 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1143 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
1144 pending_sparks_hd, pending_sparks_tl,
1145 spark->prev, spark, spark->next,
1146 (spark->next==NULL ? 0 : spark->next->prev), spark);
1147 }
1148 # endif
1149
1150 if (dispose_too)
1151 disposeSpark(spark);
1152
1153 return new_spark;
1154 }
1155
1156 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
1157 void
1158 markSparkQueue(void)
1159 {
1160 StgClosure *MarkRoot(StgClosure *root); // prototype
1161 PEs p;
1162 rtsSpark *sp;
1163
1164 for (p=0; p<RtsFlags.GranFlags.proc; p++)
1165 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
1166 ASSERT(sp->node!=NULL);
1167 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
1168 // ToDo?: statistics gathering here (also for GUM!)
1169 sp->node = (StgClosure *)MarkRoot(sp->node);
1170 }
1171
1172 IF_DEBUG(gc,
1173 debugBelch("markSparkQueue: spark statistics at start of GC:");
1174 print_sparkq_stats());
1175 }
1176
1177 void
1178 print_spark(spark)
1179 rtsSpark *spark;
1180 {
1181 char str[16];
1182
1183 if (spark==NULL) {
1184 debugBelch("Spark: NIL\n");
1185 return;
1186 } else {
1187 sprintf(str,
1188 ((spark->node==NULL) ? "______" : "%#6lx"),
1189 stgCast(StgPtr,spark->node));
1190
1191 debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
1192 str, spark->name,
1193 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
1194 spark->prev, spark->next);
1195 }
1196 }
1197
1198 void
1199 print_sparkq(proc)
1200 PEs proc;
1201 // rtsSpark *hd;
1202 {
1203 rtsSpark *x = pending_sparks_hds[proc];
1204
1205 debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
1206 for (; x!=(rtsSpark*)NULL; x=x->next) {
1207 print_spark(x);
1208 }
1209 }
1210
1211 /*
1212 Print a statistics of all spark queues.
1213 */
1214 void
1215 print_sparkq_stats(void)
1216 {
1217 PEs p;
1218
1219 debugBelch("SparkQs: [");
1220 for (p=0; p<RtsFlags.GranFlags.proc; p++)
1221 debugBelch(", PE %d: %d", p, spark_queue_len(p));
1222 debugBelch("\n");
1223 }
1224
1225 #endif