STM: Only wake up once
[ghc.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2 * (c) The GHC Team 1998-2005
3 *
4 * STM implementation.
5 *
6 * Overview
7 * --------
8 *
9 * See the PPoPP 2005 paper "Composable memory transactions". In summary,
10 * each transcation has a TRec (transaction record) holding entries for each of the
11 * TVars (transactional variables) that it has accessed. Each entry records
12 * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13 * the transaction wants to write to the TVar, (d) during commit, the identity of
14 * the TRec that wrote the expected value.
15 *
16 * Separate TRecs are used for each level in a nest of transactions. This allows
17 * a nested transaction to be aborted without condemning its enclosing transactions.
18 * This is needed in the implementation of catchRetry. Note that the "expected value"
19 * in a nested transaction's TRec is the value expected to be *held in memory* if
20 * the transaction commits -- not the "new value" stored in one of the enclosing
21 * transactions. This means that validation can be done without searching through
22 * a nest of TRecs.
23 *
24 * Concurrency control
25 * -------------------
26 *
27 * Three different concurrency control schemes can be built according to the settings
28 * in STM.h:
29 *
30 * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31 * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32 *
33 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34 * an invocation on the STM interface. Note that this does not mean that
35 * transactions are simply serialized -- the lock is only held *within* the
36 * implementation of stmCommitTransaction, stmWait etc.
37 *
38 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39 * and, when committing a transaction, no locks are acquired for TVars that have
40 * been read but not updated.
41 *
42 * Concurrency control is implemented in the functions:
43 *
44 * lock_stm
45 * unlock_stm
46 * lock_tvar / cond_lock_tvar
47 * unlock_tvar
48 *
49 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the
50 * implementation of these functions.
51 *
52 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53 * using STM_CG_LOCK, and otherwise they are no-ops.
54 *
55 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they
56 * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57 * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58 * builds). This is because locking a TVar is implemented by writing the lock
59 * holder's TRec into the TVar's current_value field:
60 *
61 * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value
62 * it contained.
63 *
64 * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it
65 * contains a specified value. Return TRUE if this succeeds,
66 * FALSE otherwise.
67 *
68 * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69 * storing a specified value in place of the lock entry.
70 *
71 * Using these operations, the typcial pattern of a commit/validate/wait operation
72 * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that
73 * the TVars that were only read from still contain their expected values,
74 * (d) release the locks on the TVars, writing updates to them in the case of a
75 * commit, (e) unlock the STM.
76 *
77 * Queues of waiting threads hang off the first_watch_queue_entry
78 * field of each TVar. This may only be manipulated when holding that
79 * TVar's lock. In particular, when a thread is putting itself to
80 * sleep, it mustn't release the TVar's lock until it has added itself
81 * to the wait queue and marked its TSO as BlockedOnSTM -- this makes
82 * sure that other threads will know to wake it.
83 *
84 * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94 #include "sm/Storage.h"
95
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for
102 // THREADED_RTS builds with fine-grained locking.
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail. They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
118
119 #ifdef SHAKE
120 static const int do_shake = TRUE;
121 #else
122 static const int do_shake = FALSE;
123 #endif
124 static int shake_ctr = 0;
125 static int shake_lim = 1;
126
127 static int shake(void) {
128 if (do_shake) {
129 if (((shake_ctr++) % shake_lim) == 0) {
130 shake_ctr = 1;
131 shake_lim ++;
132 return TRUE;
133 }
134 return FALSE;
135 } else {
136 return FALSE;
137 }
138 }
139
140 /*......................................................................*/
141
142 // Helper macros for iterating over entries within a transaction
143 // record
144
145 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
146 StgTRecHeader *__t = (_t); \
147 StgTRecChunk *__c = __t -> current_chunk; \
148 StgWord __limit = __c -> next_entry_idx; \
149 TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit); \
150 while (__c != END_STM_CHUNK_LIST) { \
151 StgWord __i; \
152 for (__i = 0; __i < __limit; __i ++) { \
153 TRecEntry *_x = &(__c -> entries[__i]); \
154 do { CODE } while (0); \
155 } \
156 __c = __c -> prev_chunk; \
157 __limit = TREC_CHUNK_NUM_ENTRIES; \
158 } \
159 exit_for_each: \
160 if (FALSE) goto exit_for_each; \
161 } while (0)
162
163 #define BREAK_FOR_EACH goto exit_for_each
164
165 /*......................................................................*/
166
167 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
168 // and wait queue entries without GC
169
170 #define REUSE_MEMORY
171
172 /*......................................................................*/
173
174 #define IF_STM_UNIPROC(__X) do { } while (0)
175 #define IF_STM_CG_LOCK(__X) do { } while (0)
176 #define IF_STM_FG_LOCKS(__X) do { } while (0)
177
178 #if defined(STM_UNIPROC)
179 #undef IF_STM_UNIPROC
180 #define IF_STM_UNIPROC(__X) do { __X } while (0)
181 static const StgBool config_use_read_phase = FALSE;
182
183 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
184 TRACE("%p : lock_stm()", trec);
185 }
186
187 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
188 TRACE("%p : unlock_stm()", trec);
189 }
190
191 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
192 StgTVar *s STG_UNUSED) {
193 StgClosure *result;
194 TRACE("%p : lock_tvar(%p)", trec, s);
195 result = s -> current_value;
196 return result;
197 }
198
199 static void unlock_tvar(Capability *cap,
200 StgTRecHeader *trec STG_UNUSED,
201 StgTVar *s,
202 StgClosure *c,
203 StgBool force_update) {
204 TRACE("%p : unlock_tvar(%p)", trec, s);
205 if (force_update) {
206 s -> current_value = c;
207 dirty_TVAR(cap,s);
208 }
209 }
210
211 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
212 StgTVar *s STG_UNUSED,
213 StgClosure *expected) {
214 StgClosure *result;
215 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
216 result = s -> current_value;
217 TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
218 return (result == expected);
219 }
220
221 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
222 // Nothing -- uniproc
223 return TRUE;
224 }
225
226 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
227 // Nothing -- uniproc
228 }
229 #endif
230
231 #if defined(STM_CG_LOCK) /*........................................*/
232
233 #undef IF_STM_CG_LOCK
234 #define IF_STM_CG_LOCK(__X) do { __X } while (0)
235 static const StgBool config_use_read_phase = FALSE;
236 static volatile StgTRecHeader *smp_locked = NULL;
237
238 static void lock_stm(StgTRecHeader *trec) {
239 while (cas(&smp_locked, NULL, trec) != NULL) { }
240 TRACE("%p : lock_stm()", trec);
241 }
242
243 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
244 TRACE("%p : unlock_stm()", trec);
245 ASSERT (smp_locked == trec);
246 smp_locked = 0;
247 }
248
249 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
250 StgTVar *s STG_UNUSED) {
251 StgClosure *result;
252 TRACE("%p : lock_tvar(%p)", trec, s);
253 ASSERT (smp_locked == trec);
254 result = s -> current_value;
255 return result;
256 }
257
258 static void *unlock_tvar(Capability *cap,
259 StgTRecHeader *trec STG_UNUSED,
260 StgTVar *s,
261 StgClosure *c,
262 StgBool force_update) {
263 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
264 ASSERT (smp_locked == trec);
265 if (force_update) {
266 s -> current_value = c;
267 dirty_TVAR(cap,s);
268 }
269 }
270
271 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
272 StgTVar *s STG_UNUSED,
273 StgClosure *expected) {
274 StgClosure *result;
275 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
276 ASSERT (smp_locked == trec);
277 result = s -> current_value;
278 TRACE("%p : %d", result ? "success" : "failure");
279 return (result == expected);
280 }
281
282 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
283 // Nothing -- protected by STM lock
284 return TRUE;
285 }
286
287 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
288 // Nothing -- protected by STM lock
289 }
290 #endif
291
292 #if defined(STM_FG_LOCKS) /*...................................*/
293
294 #undef IF_STM_FG_LOCKS
295 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
296 static const StgBool config_use_read_phase = TRUE;
297
298 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
299 TRACE("%p : lock_stm()", trec);
300 }
301
302 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
303 TRACE("%p : unlock_stm()", trec);
304 }
305
306 static StgClosure *lock_tvar(StgTRecHeader *trec,
307 StgTVar *s STG_UNUSED) {
308 StgClosure *result;
309 TRACE("%p : lock_tvar(%p)", trec, s);
310 do {
311 do {
312 result = s -> current_value;
313 } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
314 } while (cas((void *)&(s -> current_value),
315 (StgWord)result, (StgWord)trec) != (StgWord)result);
316 return result;
317 }
318
319 static void unlock_tvar(Capability *cap,
320 StgTRecHeader *trec STG_UNUSED,
321 StgTVar *s,
322 StgClosure *c,
323 StgBool force_update STG_UNUSED) {
324 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
325 ASSERT(s -> current_value == (StgClosure *)trec);
326 s -> current_value = c;
327 dirty_TVAR(cap,s);
328 }
329
330 static StgBool cond_lock_tvar(StgTRecHeader *trec,
331 StgTVar *s,
332 StgClosure *expected) {
333 StgClosure *result;
334 StgWord w;
335 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
336 w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
337 result = (StgClosure *)w;
338 TRACE("%p : %s", trec, result ? "success" : "failure");
339 return (result == expected);
340 }
341
342 static StgBool lock_inv(StgAtomicInvariant *inv) {
343 return (cas(&(inv -> lock), 0, 1) == 0);
344 }
345
346 static void unlock_inv(StgAtomicInvariant *inv) {
347 ASSERT(inv -> lock == 1);
348 inv -> lock = 0;
349 }
350 #endif
351
352 /*......................................................................*/
353
354 static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
355 StgClosure *c = q -> closure;
356 StgInfoTable *info = get_itbl(c);
357 return (info -> type) == TSO;
358 }
359
360 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
361 StgClosure *c = q -> closure;
362 return (c->header.info == &stg_ATOMIC_INVARIANT_info);
363 }
364
365 /*......................................................................*/
366
367 // Helper functions for thread blocking and unblocking
368
369 static void park_tso(StgTSO *tso) {
370 ASSERT(tso -> why_blocked == NotBlocked);
371 tso -> why_blocked = BlockedOnSTM;
372 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
373 TRACE("park_tso on tso=%p", tso);
374 }
375
376 static void unpark_tso(Capability *cap, StgTSO *tso) {
377 // We will continue unparking threads while they remain on one of the wait
378 // queues: it's up to the thread itself to remove it from the wait queues
379 // if it decides to do so when it is scheduled.
380
381 // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
382 // to avoid multiple CPUs unblocking the same TSO, and also to
383 // synchronise with throwTo(). The first time the TSO is unblocked
384 // we mark this fact by setting block_info.closure == STM_AWOKEN.
385 // This way we can avoid sending further wakeup messages in the
386 // future.
387 lockTSO(tso);
388 if (tso->why_blocked == BlockedOnSTM && tso->block_info.closure == STM_AWOKEN) {
389 TRACE("unpark_tso already woken up tso=%p", tso);
390 } else if (tso -> why_blocked == BlockedOnSTM) {
391 TRACE("unpark_tso on tso=%p", tso);
392 tso->block_info.closure = STM_AWOKEN;
393 tryWakeupThread(cap,tso);
394 } else {
395 TRACE("spurious unpark_tso on tso=%p", tso);
396 }
397 unlockTSO(tso);
398 }
399
400 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
401 StgTVarWatchQueue *q;
402 StgTVarWatchQueue *trail;
403 TRACE("unpark_waiters_on tvar=%p", s);
404 // unblock TSOs in reverse order, to be a bit fairer (#2319)
405 for (q = s -> first_watch_queue_entry, trail = q;
406 q != END_STM_WATCH_QUEUE;
407 q = q -> next_queue_entry) {
408 trail = q;
409 }
410 q = trail;
411 for (;
412 q != END_STM_WATCH_QUEUE;
413 q = q -> prev_queue_entry) {
414 if (watcher_is_tso(q)) {
415 unpark_tso(cap, (StgTSO *)(q -> closure));
416 }
417 }
418 }
419
420 /*......................................................................*/
421
422 // Helper functions for downstream allocation and initialization
423
424 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
425 StgAtomicInvariant *invariant) {
426 StgInvariantCheckQueue *result;
427 result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue));
428 SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
429 result -> invariant = invariant;
430 result -> my_execution = NO_TREC;
431 return result;
432 }
433
434 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
435 StgClosure *closure) {
436 StgTVarWatchQueue *result;
437 result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
438 SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
439 result -> closure = closure;
440 return result;
441 }
442
443 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
444 StgTRecChunk *result;
445 result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
446 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
447 result -> prev_chunk = END_STM_CHUNK_LIST;
448 result -> next_entry_idx = 0;
449 return result;
450 }
451
452 static StgTRecHeader *new_stg_trec_header(Capability *cap,
453 StgTRecHeader *enclosing_trec) {
454 StgTRecHeader *result;
455 result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
456 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
457
458 result -> enclosing_trec = enclosing_trec;
459 result -> current_chunk = new_stg_trec_chunk(cap);
460 result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
461
462 if (enclosing_trec == NO_TREC) {
463 result -> state = TREC_ACTIVE;
464 } else {
465 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
466 enclosing_trec -> state == TREC_CONDEMNED);
467 result -> state = enclosing_trec -> state;
468 }
469
470 return result;
471 }
472
473 /*......................................................................*/
474
475 // Allocation / deallocation functions that retain per-capability lists
476 // of closures that can be re-used
477
478 static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
479 StgAtomicInvariant *invariant) {
480 StgInvariantCheckQueue *result = NULL;
481 if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
482 result = new_stg_invariant_check_queue(cap, invariant);
483 } else {
484 result = cap -> free_invariant_check_queues;
485 result -> invariant = invariant;
486 result -> my_execution = NO_TREC;
487 cap -> free_invariant_check_queues = result -> next_queue_entry;
488 }
489 return result;
490 }
491
492 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
493 StgClosure *closure) {
494 StgTVarWatchQueue *result = NULL;
495 if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
496 result = new_stg_tvar_watch_queue(cap, closure);
497 } else {
498 result = cap -> free_tvar_watch_queues;
499 result -> closure = closure;
500 cap -> free_tvar_watch_queues = result -> next_queue_entry;
501 }
502 return result;
503 }
504
505 static void free_stg_tvar_watch_queue(Capability *cap,
506 StgTVarWatchQueue *wq) {
507 #if defined(REUSE_MEMORY)
508 wq -> next_queue_entry = cap -> free_tvar_watch_queues;
509 cap -> free_tvar_watch_queues = wq;
510 #endif
511 }
512
513 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
514 StgTRecChunk *result = NULL;
515 if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
516 result = new_stg_trec_chunk(cap);
517 } else {
518 result = cap -> free_trec_chunks;
519 cap -> free_trec_chunks = result -> prev_chunk;
520 result -> prev_chunk = END_STM_CHUNK_LIST;
521 result -> next_entry_idx = 0;
522 }
523 return result;
524 }
525
526 static void free_stg_trec_chunk(Capability *cap,
527 StgTRecChunk *c) {
528 #if defined(REUSE_MEMORY)
529 c -> prev_chunk = cap -> free_trec_chunks;
530 cap -> free_trec_chunks = c;
531 #endif
532 }
533
534 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
535 StgTRecHeader *enclosing_trec) {
536 StgTRecHeader *result = NULL;
537 if (cap -> free_trec_headers == NO_TREC) {
538 result = new_stg_trec_header(cap, enclosing_trec);
539 } else {
540 result = cap -> free_trec_headers;
541 cap -> free_trec_headers = result -> enclosing_trec;
542 result -> enclosing_trec = enclosing_trec;
543 result -> current_chunk -> next_entry_idx = 0;
544 result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
545 if (enclosing_trec == NO_TREC) {
546 result -> state = TREC_ACTIVE;
547 } else {
548 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
549 enclosing_trec -> state == TREC_CONDEMNED);
550 result -> state = enclosing_trec -> state;
551 }
552 }
553 return result;
554 }
555
556 static void free_stg_trec_header(Capability *cap,
557 StgTRecHeader *trec) {
558 #if defined(REUSE_MEMORY)
559 StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
560 while (chunk != END_STM_CHUNK_LIST) {
561 StgTRecChunk *prev_chunk = chunk -> prev_chunk;
562 free_stg_trec_chunk(cap, chunk);
563 chunk = prev_chunk;
564 }
565 trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
566 trec -> enclosing_trec = cap -> free_trec_headers;
567 cap -> free_trec_headers = trec;
568 #endif
569 }
570
571 /*......................................................................*/
572
573 // Helper functions for managing waiting lists
574
575 static void build_watch_queue_entries_for_trec(Capability *cap,
576 StgTSO *tso,
577 StgTRecHeader *trec) {
578 ASSERT(trec != NO_TREC);
579 ASSERT(trec -> enclosing_trec == NO_TREC);
580 ASSERT(trec -> state == TREC_ACTIVE);
581
582 TRACE("%p : build_watch_queue_entries_for_trec()", trec);
583
584 FOR_EACH_ENTRY(trec, e, {
585 StgTVar *s;
586 StgTVarWatchQueue *q;
587 StgTVarWatchQueue *fq;
588 s = e -> tvar;
589 TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
590 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
591 NACQ_ASSERT(s -> current_value == e -> expected_value);
592 fq = s -> first_watch_queue_entry;
593 q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
594 q -> next_queue_entry = fq;
595 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
596 if (fq != END_STM_WATCH_QUEUE) {
597 fq -> prev_queue_entry = q;
598 }
599 s -> first_watch_queue_entry = q;
600 e -> new_value = (StgClosure *) q;
601 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
602 });
603 }
604
605 static void remove_watch_queue_entries_for_trec(Capability *cap,
606 StgTRecHeader *trec) {
607 ASSERT(trec != NO_TREC);
608 ASSERT(trec -> enclosing_trec == NO_TREC);
609 ASSERT(trec -> state == TREC_WAITING ||
610 trec -> state == TREC_CONDEMNED);
611
612 TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
613
614 FOR_EACH_ENTRY(trec, e, {
615 StgTVar *s;
616 StgTVarWatchQueue *pq;
617 StgTVarWatchQueue *nq;
618 StgTVarWatchQueue *q;
619 StgClosure *saw;
620 s = e -> tvar;
621 saw = lock_tvar(trec, s);
622 q = (StgTVarWatchQueue *) (e -> new_value);
623 TRACE("%p : removing tso=%p from watch queue for tvar=%p",
624 trec,
625 q -> closure,
626 s);
627 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
628 nq = q -> next_queue_entry;
629 pq = q -> prev_queue_entry;
630 if (nq != END_STM_WATCH_QUEUE) {
631 nq -> prev_queue_entry = pq;
632 }
633 if (pq != END_STM_WATCH_QUEUE) {
634 pq -> next_queue_entry = nq;
635 } else {
636 ASSERT (s -> first_watch_queue_entry == q);
637 s -> first_watch_queue_entry = nq;
638 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
639 }
640 free_stg_tvar_watch_queue(cap, q);
641 unlock_tvar(cap, trec, s, saw, FALSE);
642 });
643 }
644
645 /*......................................................................*/
646
647 static TRecEntry *get_new_entry(Capability *cap,
648 StgTRecHeader *t) {
649 TRecEntry *result;
650 StgTRecChunk *c;
651 int i;
652
653 c = t -> current_chunk;
654 i = c -> next_entry_idx;
655 ASSERT(c != END_STM_CHUNK_LIST);
656
657 if (i < TREC_CHUNK_NUM_ENTRIES) {
658 // Continue to use current chunk
659 result = &(c -> entries[i]);
660 c -> next_entry_idx ++;
661 } else {
662 // Current chunk is full: allocate a fresh one
663 StgTRecChunk *nc;
664 nc = alloc_stg_trec_chunk(cap);
665 nc -> prev_chunk = c;
666 nc -> next_entry_idx = 1;
667 t -> current_chunk = nc;
668 result = &(nc -> entries[0]);
669 }
670
671 return result;
672 }
673
674 /*......................................................................*/
675
676 static void merge_update_into(Capability *cap,
677 StgTRecHeader *t,
678 StgTVar *tvar,
679 StgClosure *expected_value,
680 StgClosure *new_value) {
681 int found;
682
683 // Look for an entry in this trec
684 found = FALSE;
685 FOR_EACH_ENTRY(t, e, {
686 StgTVar *s;
687 s = e -> tvar;
688 if (s == tvar) {
689 found = TRUE;
690 if (e -> expected_value != expected_value) {
691 // Must abort if the two entries start from different values
692 TRACE("%p : update entries inconsistent at %p (%p vs %p)",
693 t, tvar, e -> expected_value, expected_value);
694 t -> state = TREC_CONDEMNED;
695 }
696 e -> new_value = new_value;
697 BREAK_FOR_EACH;
698 }
699 });
700
701 if (!found) {
702 // No entry so far in this trec
703 TRecEntry *ne;
704 ne = get_new_entry(cap, t);
705 ne -> tvar = tvar;
706 ne -> expected_value = expected_value;
707 ne -> new_value = new_value;
708 }
709 }
710
711 /*......................................................................*/
712
713 static void merge_read_into(Capability *cap,
714 StgTRecHeader *trec,
715 StgTVar *tvar,
716 StgClosure *expected_value)
717 {
718 int found;
719 StgTRecHeader *t;
720
721 found = FALSE;
722
723 //
724 // See #7493
725 //
726 // We need to look for an existing entry *anywhere* in the stack of
727 // nested transactions. Otherwise, in stmCommitNestedTransaction()
728 // we can't tell the difference between
729 //
730 // (1) a read-only entry
731 // (2) an entry that writes back the original value
732 //
733 // Since in both cases e->new_value == e->expected_value. But in (1)
734 // we want to do nothing, and in (2) we want to update e->new_value
735 // in the outer transaction.
736 //
737 // Here we deal with the first possibility: we never create a
738 // read-only entry in an inner transaction if there is an existing
739 // outer entry; so we never have an inner read and an outer update.
740 // So then in stmCommitNestedTransaction() we know we can always
741 // write e->new_value over the outer entry, because the inner entry
742 // is the most up to date.
743 //
744 for (t = trec; !found && t != NO_TREC; t = t -> enclosing_trec)
745 {
746 FOR_EACH_ENTRY(t, e, {
747 if (e -> tvar == tvar) {
748 found = TRUE;
749 if (e -> expected_value != expected_value) {
750 // Must abort if the two entries start from different values
751 TRACE("%p : read entries inconsistent at %p (%p vs %p)",
752 t, tvar, e -> expected_value, expected_value);
753 t -> state = TREC_CONDEMNED;
754 }
755 BREAK_FOR_EACH;
756 }
757 });
758 }
759
760 if (!found) {
761 // No entry found
762 TRecEntry *ne;
763 ne = get_new_entry(cap, trec);
764 ne -> tvar = tvar;
765 ne -> expected_value = expected_value;
766 ne -> new_value = expected_value;
767 }
768 }
769
770 /*......................................................................*/
771
772 static StgBool entry_is_update(TRecEntry *e) {
773 StgBool result;
774 result = (e -> expected_value != e -> new_value);
775 return result;
776 }
777
778 #if defined(STM_FG_LOCKS)
779 static StgBool entry_is_read_only(TRecEntry *e) {
780 StgBool result;
781 result = (e -> expected_value == e -> new_value);
782 return result;
783 }
784
785 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
786 StgClosure *c;
787 StgBool result;
788 c = s -> current_value;
789 result = (c == (StgClosure *) h);
790 return result;
791 }
792 #endif
793
794 // revert_ownership : release a lock on a TVar, storing back
795 // the value that it held when the lock was acquired. "revert_all"
796 // is set in stmWait and stmReWait when we acquired locks on all of
797 // the TVars involved. "revert_all" is not set in commit operations
798 // where we don't lock TVars that have been read from but not updated.
799
800 static void revert_ownership(Capability *cap STG_UNUSED,
801 StgTRecHeader *trec STG_UNUSED,
802 StgBool revert_all STG_UNUSED) {
803 #if defined(STM_FG_LOCKS)
804 FOR_EACH_ENTRY(trec, e, {
805 if (revert_all || entry_is_update(e)) {
806 StgTVar *s;
807 s = e -> tvar;
808 if (tvar_is_locked(s, trec)) {
809 unlock_tvar(cap, trec, s, e -> expected_value, TRUE);
810 }
811 }
812 });
813 #endif
814 }
815
816 /*......................................................................*/
817
818 // validate_and_acquire_ownership : this performs the twin functions
819 // of checking that the TVars referred to by entries in trec hold the
820 // expected values and:
821 //
822 // - locking the TVar (on updated TVars during commit, or all TVars
823 // during wait)
824 //
825 // - recording the identity of the TRec who wrote the value seen in the
826 // TVar (on non-updated TVars during commit). These values are
827 // stashed in the TRec entries and are then checked in check_read_only
828 // to ensure that an atomic snapshot of all of these locations has been
829 // seen.
830
831 static StgBool validate_and_acquire_ownership (Capability *cap,
832 StgTRecHeader *trec,
833 int acquire_all,
834 int retain_ownership) {
835 StgBool result;
836
837 if (shake()) {
838 TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
839 return FALSE;
840 }
841
842 ASSERT ((trec -> state == TREC_ACTIVE) ||
843 (trec -> state == TREC_WAITING) ||
844 (trec -> state == TREC_CONDEMNED));
845 result = !((trec -> state) == TREC_CONDEMNED);
846 if (result) {
847 FOR_EACH_ENTRY(trec, e, {
848 StgTVar *s;
849 s = e -> tvar;
850 if (acquire_all || entry_is_update(e)) {
851 TRACE("%p : trying to acquire %p", trec, s);
852 if (!cond_lock_tvar(trec, s, e -> expected_value)) {
853 TRACE("%p : failed to acquire %p", trec, s);
854 result = FALSE;
855 BREAK_FOR_EACH;
856 }
857 } else {
858 ASSERT(config_use_read_phase);
859 IF_STM_FG_LOCKS({
860 TRACE("%p : will need to check %p", trec, s);
861 if (s -> current_value != e -> expected_value) {
862 TRACE("%p : doesn't match", trec);
863 result = FALSE;
864 BREAK_FOR_EACH;
865 }
866 e -> num_updates = s -> num_updates;
867 if (s -> current_value != e -> expected_value) {
868 TRACE("%p : doesn't match (race)", trec);
869 result = FALSE;
870 BREAK_FOR_EACH;
871 } else {
872 TRACE("%p : need to check version %ld", trec, e -> num_updates);
873 }
874 });
875 }
876 });
877 }
878
879 if ((!result) || (!retain_ownership)) {
880 revert_ownership(cap, trec, acquire_all);
881 }
882
883 return result;
884 }
885
886 // check_read_only : check that we've seen an atomic snapshot of the
887 // non-updated TVars accessed by a trec. This checks that the last TRec to
888 // commit an update to the TVar is unchanged since the value was stashed in
889 // validate_and_acquire_ownership. If no udpate is seen to any TVar than
890 // all of them contained their expected values at the start of the call to
891 // check_read_only.
892 //
893 // The paper "Concurrent programming without locks" (under submission), or
894 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
895 // this kind of algorithm.
896
897 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
898 StgBool result = TRUE;
899
900 ASSERT (config_use_read_phase);
901 IF_STM_FG_LOCKS({
902 FOR_EACH_ENTRY(trec, e, {
903 StgTVar *s;
904 s = e -> tvar;
905 if (entry_is_read_only(e)) {
906 TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
907 if (s -> num_updates != e -> num_updates) {
908 // ||s -> current_value != e -> expected_value) {
909 TRACE("%p : mismatch", trec);
910 result = FALSE;
911 BREAK_FOR_EACH;
912 }
913 }
914 });
915 });
916
917 return result;
918 }
919
920
921 /************************************************************************/
922
923 void stmPreGCHook (Capability *cap) {
924 lock_stm(NO_TREC);
925 TRACE("stmPreGCHook");
926 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
927 cap->free_trec_chunks = END_STM_CHUNK_LIST;
928 cap->free_trec_headers = NO_TREC;
929 unlock_stm(NO_TREC);
930 }
931
932 /************************************************************************/
933
934 // check_read_only relies on version numbers held in TVars' "num_updates"
935 // fields not wrapping around while a transaction is committed. The version
936 // number is incremented each time an update is committed to the TVar
937 // This is unlikely to wrap around when 32-bit integers are used for the counts,
938 // but to ensure correctness we maintain a shared count on the maximum
939 // number of commit operations that may occur and check that this has
940 // not increased by more than 2^32 during a commit.
941
942 #define TOKEN_BATCH_SIZE 1024
943
944 static volatile StgInt64 max_commits = 0;
945
946 #if defined(THREADED_RTS)
947 static volatile StgBool token_locked = FALSE;
948
949 static void getTokenBatch(Capability *cap) {
950 while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
951 max_commits += TOKEN_BATCH_SIZE;
952 TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
953 cap -> transaction_tokens = TOKEN_BATCH_SIZE;
954 token_locked = FALSE;
955 }
956
957 static void getToken(Capability *cap) {
958 if (cap -> transaction_tokens == 0) {
959 getTokenBatch(cap);
960 }
961 cap -> transaction_tokens --;
962 }
963 #else
964 static void getToken(Capability *cap STG_UNUSED) {
965 // Nothing
966 }
967 #endif
968
969 /*......................................................................*/
970
971 StgTRecHeader *stmStartTransaction(Capability *cap,
972 StgTRecHeader *outer) {
973 StgTRecHeader *t;
974 TRACE("%p : stmStartTransaction with %d tokens",
975 outer,
976 cap -> transaction_tokens);
977
978 getToken(cap);
979
980 t = alloc_stg_trec_header(cap, outer);
981 TRACE("%p : stmStartTransaction()=%p", outer, t);
982 return t;
983 }
984
985 /*......................................................................*/
986
987 void stmAbortTransaction(Capability *cap,
988 StgTRecHeader *trec) {
989 StgTRecHeader *et;
990 TRACE("%p : stmAbortTransaction", trec);
991 ASSERT (trec != NO_TREC);
992 ASSERT ((trec -> state == TREC_ACTIVE) ||
993 (trec -> state == TREC_WAITING) ||
994 (trec -> state == TREC_CONDEMNED));
995
996 lock_stm(trec);
997
998 et = trec -> enclosing_trec;
999 if (et == NO_TREC) {
1000 // We're a top-level transaction: remove any watch queue entries that
1001 // we may have.
1002 TRACE("%p : aborting top-level transaction", trec);
1003
1004 if (trec -> state == TREC_WAITING) {
1005 ASSERT (trec -> enclosing_trec == NO_TREC);
1006 TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
1007 remove_watch_queue_entries_for_trec(cap, trec);
1008 }
1009
1010 } else {
1011 // We're a nested transaction: merge our read set into our parent's
1012 TRACE("%p : retaining read-set into parent %p", trec, et);
1013
1014 FOR_EACH_ENTRY(trec, e, {
1015 StgTVar *s = e -> tvar;
1016 merge_read_into(cap, et, s, e -> expected_value);
1017 });
1018 }
1019
1020 trec -> state = TREC_ABORTED;
1021 unlock_stm(trec);
1022
1023 TRACE("%p : stmAbortTransaction done", trec);
1024 }
1025
1026 /*......................................................................*/
1027
1028 void stmFreeAbortedTRec(Capability *cap,
1029 StgTRecHeader *trec) {
1030 TRACE("%p : stmFreeAbortedTRec", trec);
1031 ASSERT (trec != NO_TREC);
1032 ASSERT ((trec -> state == TREC_CONDEMNED) ||
1033 (trec -> state == TREC_ABORTED));
1034
1035 free_stg_trec_header(cap, trec);
1036
1037 TRACE("%p : stmFreeAbortedTRec done", trec);
1038 }
1039
1040 /*......................................................................*/
1041
1042 void stmCondemnTransaction(Capability *cap,
1043 StgTRecHeader *trec) {
1044 TRACE("%p : stmCondemnTransaction", trec);
1045 ASSERT (trec != NO_TREC);
1046 ASSERT ((trec -> state == TREC_ACTIVE) ||
1047 (trec -> state == TREC_WAITING) ||
1048 (trec -> state == TREC_CONDEMNED));
1049
1050 lock_stm(trec);
1051 if (trec -> state == TREC_WAITING) {
1052 ASSERT (trec -> enclosing_trec == NO_TREC);
1053 TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
1054 remove_watch_queue_entries_for_trec(cap, trec);
1055 }
1056 trec -> state = TREC_CONDEMNED;
1057 unlock_stm(trec);
1058
1059 TRACE("%p : stmCondemnTransaction done", trec);
1060 }
1061
1062 /*......................................................................*/
1063
1064 StgBool stmValidateNestOfTransactions(Capability *cap, StgTRecHeader *trec) {
1065 StgTRecHeader *t;
1066 StgBool result;
1067
1068 TRACE("%p : stmValidateNestOfTransactions", trec);
1069 ASSERT(trec != NO_TREC);
1070 ASSERT((trec -> state == TREC_ACTIVE) ||
1071 (trec -> state == TREC_WAITING) ||
1072 (trec -> state == TREC_CONDEMNED));
1073
1074 lock_stm(trec);
1075
1076 t = trec;
1077 result = TRUE;
1078 while (t != NO_TREC) {
1079 result &= validate_and_acquire_ownership(cap, t, TRUE, FALSE);
1080 t = t -> enclosing_trec;
1081 }
1082
1083 if (!result && trec -> state != TREC_WAITING) {
1084 trec -> state = TREC_CONDEMNED;
1085 }
1086
1087 unlock_stm(trec);
1088
1089 TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1090 return result;
1091 }
1092
1093 /*......................................................................*/
1094
1095 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1096 TRecEntry *result = NULL;
1097
1098 TRACE("%p : get_entry_for TVar %p", trec, tvar);
1099 ASSERT(trec != NO_TREC);
1100
1101 do {
1102 FOR_EACH_ENTRY(trec, e, {
1103 if (e -> tvar == tvar) {
1104 result = e;
1105 if (in != NULL) {
1106 *in = trec;
1107 }
1108 BREAK_FOR_EACH;
1109 }
1110 });
1111 trec = trec -> enclosing_trec;
1112 } while (result == NULL && trec != NO_TREC);
1113
1114 return result;
1115 }
1116
1117 /*......................................................................*/
1118
1119 /*
1120 * Add/remove links between an invariant TVars. The caller must have
1121 * locked the TVars involved and the invariant.
1122 */
1123
1124 static void disconnect_invariant(Capability *cap,
1125 StgAtomicInvariant *inv) {
1126 StgTRecHeader *last_execution = inv -> last_execution;
1127
1128 TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
1129
1130 FOR_EACH_ENTRY(last_execution, e, {
1131 StgTVar *s = e -> tvar;
1132 StgTVarWatchQueue *q = s -> first_watch_queue_entry;
1133 DEBUG_ONLY( StgBool found = FALSE );
1134 TRACE(" looking for trec on tvar=%p", s);
1135 for (q = s -> first_watch_queue_entry;
1136 q != END_STM_WATCH_QUEUE;
1137 q = q -> next_queue_entry) {
1138 if (q -> closure == (StgClosure*)inv) {
1139 StgTVarWatchQueue *pq;
1140 StgTVarWatchQueue *nq;
1141 nq = q -> next_queue_entry;
1142 pq = q -> prev_queue_entry;
1143 if (nq != END_STM_WATCH_QUEUE) {
1144 nq -> prev_queue_entry = pq;
1145 }
1146 if (pq != END_STM_WATCH_QUEUE) {
1147 pq -> next_queue_entry = nq;
1148 } else {
1149 ASSERT (s -> first_watch_queue_entry == q);
1150 s -> first_watch_queue_entry = nq;
1151 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
1152 }
1153 TRACE(" found it in watch queue entry %p", q);
1154 free_stg_tvar_watch_queue(cap, q);
1155 DEBUG_ONLY( found = TRUE );
1156 break;
1157 }
1158 }
1159 ASSERT(found);
1160 });
1161 inv -> last_execution = NO_TREC;
1162 }
1163
1164 static void connect_invariant_to_trec(Capability *cap,
1165 StgAtomicInvariant *inv,
1166 StgTRecHeader *my_execution) {
1167 TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
1168
1169 ASSERT(inv -> last_execution == NO_TREC);
1170
1171 FOR_EACH_ENTRY(my_execution, e, {
1172 StgTVar *s = e -> tvar;
1173 StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
1174 StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
1175
1176 // We leave "last_execution" holding the values that will be
1177 // in the heap after the transaction we're in the process
1178 // of committing has finished.
1179 TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
1180 if (entry != NULL) {
1181 e -> expected_value = entry -> new_value;
1182 e -> new_value = entry -> new_value;
1183 }
1184
1185 TRACE(" linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
1186 q -> next_queue_entry = fq;
1187 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
1188 if (fq != END_STM_WATCH_QUEUE) {
1189 fq -> prev_queue_entry = q;
1190 }
1191 s -> first_watch_queue_entry = q;
1192 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
1193 });
1194
1195 inv -> last_execution = my_execution;
1196 }
1197
1198 /*
1199 * Add a new invariant to the trec's list of invariants to check on commit
1200 */
1201 void stmAddInvariantToCheck(Capability *cap,
1202 StgTRecHeader *trec,
1203 StgClosure *code) {
1204 StgAtomicInvariant *invariant;
1205 StgInvariantCheckQueue *q;
1206 TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
1207 ASSERT(trec != NO_TREC);
1208 ASSERT(trec -> state == TREC_ACTIVE ||
1209 trec -> state == TREC_CONDEMNED);
1210
1211
1212 // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
1213 // to signal that this is a new invariant in the current atomic block
1214
1215 invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant));
1216 TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
1217 SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
1218 invariant -> code = code;
1219 invariant -> last_execution = NO_TREC;
1220 invariant -> lock = 0;
1221
1222 // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
1223
1224 q = alloc_stg_invariant_check_queue(cap, invariant);
1225 TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
1226 q -> invariant = invariant;
1227 q -> my_execution = NO_TREC;
1228 q -> next_queue_entry = trec -> invariants_to_check;
1229 trec -> invariants_to_check = q;
1230
1231 TRACE("%p : stmAddInvariantToCheck done", trec);
1232 }
1233
1234 /*
1235 * Fill in the trec's list of invariants that might be violated by the
1236 * current transaction.
1237 */
1238
1239 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
1240 StgTRecChunk *c;
1241 TRACE("%p : stmGetInvariantsToCheck, head was %p",
1242 trec,
1243 trec -> invariants_to_check);
1244
1245 ASSERT(trec != NO_TREC);
1246 ASSERT ((trec -> state == TREC_ACTIVE) ||
1247 (trec -> state == TREC_WAITING) ||
1248 (trec -> state == TREC_CONDEMNED));
1249 ASSERT(trec -> enclosing_trec == NO_TREC);
1250
1251 lock_stm(trec);
1252 c = trec -> current_chunk;
1253 while (c != END_STM_CHUNK_LIST) {
1254 unsigned int i;
1255 for (i = 0; i < c -> next_entry_idx; i ++) {
1256 TRecEntry *e = &(c -> entries[i]);
1257 if (entry_is_update(e)) {
1258 StgTVar *s = e -> tvar;
1259 StgClosure *old = lock_tvar(trec, s);
1260
1261 // Pick up any invariants on the TVar being updated
1262 // by entry "e"
1263
1264 StgTVarWatchQueue *q;
1265 TRACE("%p : checking for invariants on %p", trec, s);
1266 for (q = s -> first_watch_queue_entry;
1267 q != END_STM_WATCH_QUEUE;
1268 q = q -> next_queue_entry) {
1269 if (watcher_is_invariant(q)) {
1270 StgBool found = FALSE;
1271 StgInvariantCheckQueue *q2;
1272 TRACE("%p : Touching invariant %p", trec, q -> closure);
1273 for (q2 = trec -> invariants_to_check;
1274 q2 != END_INVARIANT_CHECK_QUEUE;
1275 q2 = q2 -> next_queue_entry) {
1276 if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
1277 TRACE("%p : Already found %p", trec, q -> closure);
1278 found = TRUE;
1279 break;
1280 }
1281 }
1282
1283 if (!found) {
1284 StgInvariantCheckQueue *q3;
1285 TRACE("%p : Not already found %p", trec, q -> closure);
1286 q3 = alloc_stg_invariant_check_queue(cap,
1287 (StgAtomicInvariant*) q -> closure);
1288 q3 -> next_queue_entry = trec -> invariants_to_check;
1289 trec -> invariants_to_check = q3;
1290 }
1291 }
1292 }
1293
1294 unlock_tvar(cap, trec, s, old, FALSE);
1295 }
1296 }
1297 c = c -> prev_chunk;
1298 }
1299
1300 unlock_stm(trec);
1301
1302 TRACE("%p : stmGetInvariantsToCheck, head now %p",
1303 trec,
1304 trec -> invariants_to_check);
1305
1306 return (trec -> invariants_to_check);
1307 }
1308
1309 /*......................................................................*/
1310
1311 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1312 int result;
1313 StgInt64 max_commits_at_start = max_commits;
1314 StgBool touched_invariants;
1315 StgBool use_read_phase;
1316
1317 TRACE("%p : stmCommitTransaction()", trec);
1318 ASSERT (trec != NO_TREC);
1319
1320 lock_stm(trec);
1321
1322 ASSERT (trec -> enclosing_trec == NO_TREC);
1323 ASSERT ((trec -> state == TREC_ACTIVE) ||
1324 (trec -> state == TREC_CONDEMNED));
1325
1326 // touched_invariants is true if we've written to a TVar with invariants
1327 // attached to it, or if we're trying to add a new invariant to the system.
1328
1329 touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
1330
1331 // If we have touched invariants then (i) lock the invariant, and (ii) add
1332 // the invariant's read set to our own. Step (i) is needed to serialize
1333 // concurrent transactions that attempt to make conflicting updates
1334 // to the invariant's trec (suppose it read from t1 and t2, and that one
1335 // concurrent transcation writes only to t1, and a second writes only to
1336 // t2). Step (ii) is needed so that both transactions will lock t1 and t2
1337 // to gain access to their wait lists (and hence be able to unhook the
1338 // invariant from both tvars).
1339
1340 if (touched_invariants) {
1341 StgInvariantCheckQueue *q = trec -> invariants_to_check;
1342 TRACE("%p : locking invariants", trec);
1343 while (q != END_INVARIANT_CHECK_QUEUE) {
1344 StgTRecHeader *inv_old_trec;
1345 StgAtomicInvariant *inv;
1346 TRACE("%p : locking invariant %p", trec, q -> invariant);
1347 inv = q -> invariant;
1348 if (!lock_inv(inv)) {
1349 TRACE("%p : failed to lock %p", trec, inv);
1350 trec -> state = TREC_CONDEMNED;
1351 break;
1352 }
1353
1354 inv_old_trec = inv -> last_execution;
1355 if (inv_old_trec != NO_TREC) {
1356 StgTRecChunk *c = inv_old_trec -> current_chunk;
1357 while (c != END_STM_CHUNK_LIST) {
1358 unsigned int i;
1359 for (i = 0; i < c -> next_entry_idx; i ++) {
1360 TRecEntry *e = &(c -> entries[i]);
1361 TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
1362 merge_read_into (cap, trec, e -> tvar, e -> expected_value);
1363 }
1364 c = c -> prev_chunk;
1365 }
1366 }
1367 q = q -> next_queue_entry;
1368 }
1369 TRACE("%p : finished locking invariants", trec);
1370 }
1371
1372 // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1373 // (i) the configuration lets us use a read phase, and (ii) we've not
1374 // touched or introduced any invariants.
1375 //
1376 // In principle we could extend the implementation to support a read-phase
1377 // and invariants, but it complicates the logic: the links between
1378 // invariants and TVars are managed by the TVar watch queues which are
1379 // protected by the TVar's locks.
1380
1381 use_read_phase = ((config_use_read_phase) && (!touched_invariants));
1382
1383 result = validate_and_acquire_ownership(cap, trec, (!use_read_phase), TRUE);
1384 if (result) {
1385 // We now know that all the updated locations hold their expected values.
1386 ASSERT (trec -> state == TREC_ACTIVE);
1387
1388 if (use_read_phase) {
1389 StgInt64 max_commits_at_end;
1390 StgInt64 max_concurrent_commits;
1391 TRACE("%p : doing read check", trec);
1392 result = check_read_only(trec);
1393 TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1394
1395 max_commits_at_end = max_commits;
1396 max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1397 (n_capabilities * TOKEN_BATCH_SIZE));
1398 if (((max_concurrent_commits >> 32) > 0) || shake()) {
1399 result = FALSE;
1400 }
1401 }
1402
1403 if (result) {
1404 // We now know that all of the read-only locations held their exepcted values
1405 // at the end of the call to validate_and_acquire_ownership. This forms the
1406 // linearization point of the commit.
1407
1408 // 1. If we have touched or introduced any invariants then unhook them
1409 // from the TVars they depended on last time they were executed
1410 // and hook them on the TVars that they now depend on.
1411 if (touched_invariants) {
1412 StgInvariantCheckQueue *q = trec -> invariants_to_check;
1413 while (q != END_INVARIANT_CHECK_QUEUE) {
1414 StgAtomicInvariant *inv = q -> invariant;
1415 if (inv -> last_execution != NO_TREC) {
1416 disconnect_invariant(cap, inv);
1417 }
1418
1419 TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
1420 connect_invariant_to_trec(cap, inv, q -> my_execution);
1421
1422 TRACE("%p : unlocking invariant %p", trec, inv);
1423 unlock_inv(inv);
1424
1425 q = q -> next_queue_entry;
1426 }
1427 }
1428
1429 // 2. Make the updates required by the transaction
1430 FOR_EACH_ENTRY(trec, e, {
1431 StgTVar *s;
1432 s = e -> tvar;
1433 if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
1434 // Either the entry is an update or we're not using a read phase:
1435 // write the value back to the TVar, unlocking it if necessary.
1436
1437 ACQ_ASSERT(tvar_is_locked(s, trec));
1438 TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1439 unpark_waiters_on(cap,s);
1440 IF_STM_FG_LOCKS({
1441 s -> num_updates ++;
1442 });
1443 unlock_tvar(cap, trec, s, e -> new_value, TRUE);
1444 }
1445 ACQ_ASSERT(!tvar_is_locked(s, trec));
1446 });
1447 } else {
1448 revert_ownership(cap, trec, FALSE);
1449 }
1450 }
1451
1452 unlock_stm(trec);
1453
1454 free_stg_trec_header(cap, trec);
1455
1456 TRACE("%p : stmCommitTransaction()=%d", trec, result);
1457
1458 return result;
1459 }
1460
1461 /*......................................................................*/
1462
1463 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1464 StgTRecHeader *et;
1465 int result;
1466 ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1467 TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1468 ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1469
1470 lock_stm(trec);
1471
1472 et = trec -> enclosing_trec;
1473 result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), TRUE);
1474 if (result) {
1475 // We now know that all the updated locations hold their expected values.
1476
1477 if (config_use_read_phase) {
1478 TRACE("%p : doing read check", trec);
1479 result = check_read_only(trec);
1480 }
1481 if (result) {
1482 // We now know that all of the read-only locations held their exepcted values
1483 // at the end of the call to validate_and_acquire_ownership. This forms the
1484 // linearization point of the commit.
1485
1486 TRACE("%p : read-check succeeded", trec);
1487 FOR_EACH_ENTRY(trec, e, {
1488 // Merge each entry into the enclosing transaction record, release all
1489 // locks.
1490
1491 StgTVar *s;
1492 s = e -> tvar;
1493 if (entry_is_update(e)) {
1494 unlock_tvar(cap, trec, s, e -> expected_value, FALSE);
1495 }
1496 merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1497 ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1498 });
1499 } else {
1500 revert_ownership(cap, trec, FALSE);
1501 }
1502 }
1503
1504 unlock_stm(trec);
1505
1506 free_stg_trec_header(cap, trec);
1507
1508 TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1509
1510 return result;
1511 }
1512
1513 /*......................................................................*/
1514
1515 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1516 int result;
1517 TRACE("%p : stmWait(%p)", trec, tso);
1518 ASSERT (trec != NO_TREC);
1519 ASSERT (trec -> enclosing_trec == NO_TREC);
1520 ASSERT ((trec -> state == TREC_ACTIVE) ||
1521 (trec -> state == TREC_CONDEMNED));
1522
1523 lock_stm(trec);
1524 result = validate_and_acquire_ownership(cap, trec, TRUE, TRUE);
1525 if (result) {
1526 // The transaction is valid so far so we can actually start waiting.
1527 // (Otherwise the transaction was not valid and the thread will have to
1528 // retry it).
1529
1530 // Put ourselves to sleep. We retain locks on all the TVars involved
1531 // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1532 // in the TSO, (c) TREC_WAITING in the Trec.
1533 build_watch_queue_entries_for_trec(cap, tso, trec);
1534 park_tso(tso);
1535 trec -> state = TREC_WAITING;
1536
1537 // We haven't released ownership of the transaction yet. The TSO
1538 // has been put on the wait queue for the TVars it is waiting for,
1539 // but we haven't yet tidied up the TSO's stack and made it safe
1540 // to wake up the TSO. Therefore, we must wait until the TSO is
1541 // safe to wake up before we release ownership - when all is well,
1542 // the runtime will call stmWaitUnlock() below, with the same
1543 // TRec.
1544
1545 } else {
1546 unlock_stm(trec);
1547 free_stg_trec_header(cap, trec);
1548 }
1549
1550 TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1551 return result;
1552 }
1553
1554
1555 void
1556 stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
1557 revert_ownership(cap, trec, TRUE);
1558 unlock_stm(trec);
1559 }
1560
1561 /*......................................................................*/
1562
1563 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1564 int result;
1565 StgTRecHeader *trec = tso->trec;
1566
1567 TRACE("%p : stmReWait", trec);
1568 ASSERT (trec != NO_TREC);
1569 ASSERT (trec -> enclosing_trec == NO_TREC);
1570 ASSERT ((trec -> state == TREC_WAITING) ||
1571 (trec -> state == TREC_CONDEMNED));
1572
1573 lock_stm(trec);
1574 result = validate_and_acquire_ownership(cap, trec, TRUE, TRUE);
1575 TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1576 if (result) {
1577 // The transaction remains valid -- do nothing because it is already on
1578 // the wait queues
1579 ASSERT (trec -> state == TREC_WAITING);
1580 park_tso(tso);
1581 revert_ownership(cap, trec, TRUE);
1582 } else {
1583 // The transcation has become invalid. We can now remove it from the wait
1584 // queues.
1585 if (trec -> state != TREC_CONDEMNED) {
1586 remove_watch_queue_entries_for_trec (cap, trec);
1587 }
1588 free_stg_trec_header(cap, trec);
1589 }
1590 unlock_stm(trec);
1591
1592 TRACE("%p : stmReWait()=%d", trec, result);
1593 return result;
1594 }
1595
1596 /*......................................................................*/
1597
1598 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1599 StgClosure *result;
1600 result = tvar -> current_value;
1601
1602 #if defined(STM_FG_LOCKS)
1603 while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1604 TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1605 result = tvar -> current_value;
1606 }
1607 #endif
1608
1609 TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1610 return result;
1611 }
1612
1613 /*......................................................................*/
1614
1615 StgClosure *stmReadTVar(Capability *cap,
1616 StgTRecHeader *trec,
1617 StgTVar *tvar) {
1618 StgTRecHeader *entry_in = NULL;
1619 StgClosure *result = NULL;
1620 TRecEntry *entry = NULL;
1621 TRACE("%p : stmReadTVar(%p)", trec, tvar);
1622 ASSERT (trec != NO_TREC);
1623 ASSERT (trec -> state == TREC_ACTIVE ||
1624 trec -> state == TREC_CONDEMNED);
1625
1626 entry = get_entry_for(trec, tvar, &entry_in);
1627
1628 if (entry != NULL) {
1629 if (entry_in == trec) {
1630 // Entry found in our trec
1631 result = entry -> new_value;
1632 } else {
1633 // Entry found in another trec
1634 TRecEntry *new_entry = get_new_entry(cap, trec);
1635 new_entry -> tvar = tvar;
1636 new_entry -> expected_value = entry -> expected_value;
1637 new_entry -> new_value = entry -> new_value;
1638 result = new_entry -> new_value;
1639 }
1640 } else {
1641 // No entry found
1642 StgClosure *current_value = read_current_value(trec, tvar);
1643 TRecEntry *new_entry = get_new_entry(cap, trec);
1644 new_entry -> tvar = tvar;
1645 new_entry -> expected_value = current_value;
1646 new_entry -> new_value = current_value;
1647 result = current_value;
1648 }
1649
1650 TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1651 return result;
1652 }
1653
1654 /*......................................................................*/
1655
1656 void stmWriteTVar(Capability *cap,
1657 StgTRecHeader *trec,
1658 StgTVar *tvar,
1659 StgClosure *new_value) {
1660
1661 StgTRecHeader *entry_in = NULL;
1662 TRecEntry *entry = NULL;
1663 TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1664 ASSERT (trec != NO_TREC);
1665 ASSERT (trec -> state == TREC_ACTIVE ||
1666 trec -> state == TREC_CONDEMNED);
1667
1668 entry = get_entry_for(trec, tvar, &entry_in);
1669
1670 if (entry != NULL) {
1671 if (entry_in == trec) {
1672 // Entry found in our trec
1673 entry -> new_value = new_value;
1674 } else {
1675 // Entry found in another trec
1676 TRecEntry *new_entry = get_new_entry(cap, trec);
1677 new_entry -> tvar = tvar;
1678 new_entry -> expected_value = entry -> expected_value;
1679 new_entry -> new_value = new_value;
1680 }
1681 } else {
1682 // No entry found
1683 StgClosure *current_value = read_current_value(trec, tvar);
1684 TRecEntry *new_entry = get_new_entry(cap, trec);
1685 new_entry -> tvar = tvar;
1686 new_entry -> expected_value = current_value;
1687 new_entry -> new_value = new_value;
1688 }
1689
1690 TRACE("%p : stmWriteTVar done", trec);
1691 }
1692
1693 /*......................................................................*/