Prefer #if defined to #ifdef
[ghc.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2 * (c) The GHC Team 1998-2005
3 *
4 * STM implementation.
5 *
6 * Overview
7 * --------
8 *
9 * See the PPoPP 2005 paper "Composable memory transactions". In summary, each
10 * transaction has a TRec (transaction record) holding entries for each of the
11 * TVars (transactional variables) that it has accessed. Each entry records (a)
12 * the TVar, (b) the expected value seen in the TVar, (c) the new value that the
13 * transaction wants to write to the TVar, (d) during commit, the identity of
14 * the TRec that wrote the expected value.
15 *
16 * Separate TRecs are used for each level in a nest of transactions. This
17 * allows a nested transaction to be aborted without condemning its enclosing
18 * transactions. This is needed in the implementation of catchRetry. Note that
19 * the "expected value" in a nested transaction's TRec is the value expected to
20 * be *held in memory* if the transaction commits -- not the "new value" stored
21 * in one of the enclosing transactions. This means that validation can be done
22 * without searching through a nest of TRecs.
23 *
24 * Concurrency control
25 * -------------------
26 *
27 * Three different concurrency control schemes can be built according to the
28 * settings in STM.h:
29 *
30 * STM_UNIPROC assumes that the caller serialises invocations on the STM
31 * interface. In the Haskell RTS this means it is suitable only for
32 * non-THREADED_RTS builds.
33 *
34 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired
35 * during an invocation on the STM interface. Note that this does not mean that
36 * transactions are simply serialized -- the lock is only held *within* the
37 * implementation of stmCommitTransaction, stmWait etc.
38 *
39 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40 * and, when committing a transaction, no locks are acquired for TVars that have
41 * been read but not updated.
42 *
43 * Concurrency control is implemented in the functions:
44 *
45 * lock_stm
46 * unlock_stm
47 * lock_tvar / cond_lock_tvar
48 * unlock_tvar
49 *
50 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the
51 * implementation of these functions.
52 *
53 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54 * using STM_CG_LOCK, and otherwise they are no-ops.
55 *
56 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they have
57 * other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well as the
58 * actual business of manipulating a lock (present only in STM_FG_LOCKS builds).
59 * This is because locking a TVar is implemented by writing the lock holder's
60 * TRec into the TVar's current_value field:
61 *
62 * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value
63 * it contained.
64 *
65 * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it
66 * contains a specified value. Return true if this succeeds,
67 * false otherwise.
68 *
69 * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70 * storing a specified value in place of the lock entry.
71 *
72 * Using these operations, the typical pattern of a commit/validate/wait
73 * operation is to (a) lock the STM, (b) lock all the TVars being updated, (c)
74 * check that the TVars that were only read from still contain their expected
75 * values, (d) release the locks on the TVars, writing updates to them in the
76 * case of a commit, (e) unlock the STM.
77 *
78 * Queues of waiting threads hang off the first_watch_queue_entry field of each
79 * TVar. This may only be manipulated when holding that TVar's lock. In
80 * particular, when a thread is putting itself to sleep, it mustn't release the
81 * TVar's lock until it has added itself to the wait queue and marked its TSO as
82 * BlockedOnSTM -- this makes sure that other threads will know to wake it.
83 *
84 * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94 #include "sm/Storage.h"
95 #include "SMPClosureOps.h"
96
97 #include <stdio.h>
98
99 // ACQ_ASSERT is used for assertions which are only required for
100 // THREADED_RTS builds with fine-grained locking.
101
102 #if defined(STM_FG_LOCKS)
103 #define ACQ_ASSERT(_X) ASSERT(_X)
104 #define NACQ_ASSERT(_X) /*Nothing*/
105 #else
106 #define ACQ_ASSERT(_X) /*Nothing*/
107 #define NACQ_ASSERT(_X) ASSERT(_X)
108 #endif
109
110 /*......................................................................*/
111
112 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
113
114 // If SHAKE is defined then validation will sometimes spuriously fail. They help test
115 // unusual code paths if genuine contention is rare
116 #if defined(SHAKE)
117 static int shake_ctr = 0;
118 static int shake_lim = 1;
119
120 static int shake(void) {
121 if (((shake_ctr++) % shake_lim) == 0) {
122 shake_ctr = 1;
123 shake_lim ++;
124 return true;
125 }
126 return false;
127 }
128 #else
129 static int shake(void) {
130 return false;
131 }
132 #endif
133
134 /*......................................................................*/
135
136 // Helper macros for iterating over entries within a transaction
137 // record
138
139 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
140 StgTRecHeader *__t = (_t); \
141 StgTRecChunk *__c = __t -> current_chunk; \
142 StgWord __limit = __c -> next_entry_idx; \
143 TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit); \
144 while (__c != END_STM_CHUNK_LIST) { \
145 StgWord __i; \
146 for (__i = 0; __i < __limit; __i ++) { \
147 TRecEntry *_x = &(__c -> entries[__i]); \
148 do { CODE } while (0); \
149 } \
150 __c = __c -> prev_chunk; \
151 __limit = TREC_CHUNK_NUM_ENTRIES; \
152 } \
153 exit_for_each: \
154 if (false) goto exit_for_each; \
155 } while (0)
156
157 #define BREAK_FOR_EACH goto exit_for_each
158
159 /*......................................................................*/
160
161 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
162 // and wait queue entries without GC
163
164 #define REUSE_MEMORY
165
166 /*......................................................................*/
167
168 #define IF_STM_UNIPROC(__X) do { } while (0)
169 #define IF_STM_CG_LOCK(__X) do { } while (0)
170 #define IF_STM_FG_LOCKS(__X) do { } while (0)
171
172 #if defined(STM_UNIPROC)
173 #undef IF_STM_UNIPROC
174 #define IF_STM_UNIPROC(__X) do { __X } while (0)
175 static const StgBool config_use_read_phase = false;
176
177 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
178 TRACE("%p : lock_stm()", trec);
179 }
180
181 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
182 TRACE("%p : unlock_stm()", trec);
183 }
184
185 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
186 StgTVar *s STG_UNUSED) {
187 StgClosure *result;
188 TRACE("%p : lock_tvar(%p)", trec, s);
189 result = s -> current_value;
190 return result;
191 }
192
193 static void unlock_tvar(Capability *cap,
194 StgTRecHeader *trec STG_UNUSED,
195 StgTVar *s,
196 StgClosure *c,
197 StgBool force_update) {
198 TRACE("%p : unlock_tvar(%p)", trec, s);
199 if (force_update) {
200 s -> current_value = c;
201 dirty_TVAR(cap,s);
202 }
203 }
204
205 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
206 StgTVar *s STG_UNUSED,
207 StgClosure *expected) {
208 StgClosure *result;
209 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
210 result = s -> current_value;
211 TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
212 return (result == expected);
213 }
214
215 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
216 // Nothing -- uniproc
217 return true;
218 }
219
220 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
221 // Nothing -- uniproc
222 }
223 #endif
224
225 #if defined(STM_CG_LOCK) /*........................................*/
226
227 #undef IF_STM_CG_LOCK
228 #define IF_STM_CG_LOCK(__X) do { __X } while (0)
229 static const StgBool config_use_read_phase = false;
230 static volatile StgTRecHeader *smp_locked = NULL;
231
232 static void lock_stm(StgTRecHeader *trec) {
233 while (cas(&smp_locked, NULL, trec) != NULL) { }
234 TRACE("%p : lock_stm()", trec);
235 }
236
237 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
238 TRACE("%p : unlock_stm()", trec);
239 ASSERT(smp_locked == trec);
240 smp_locked = 0;
241 }
242
243 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
244 StgTVar *s STG_UNUSED) {
245 StgClosure *result;
246 TRACE("%p : lock_tvar(%p)", trec, s);
247 ASSERT(smp_locked == trec);
248 result = s -> current_value;
249 return result;
250 }
251
252 static void *unlock_tvar(Capability *cap,
253 StgTRecHeader *trec STG_UNUSED,
254 StgTVar *s,
255 StgClosure *c,
256 StgBool force_update) {
257 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
258 ASSERT(smp_locked == trec);
259 if (force_update) {
260 s -> current_value = c;
261 dirty_TVAR(cap,s);
262 }
263 }
264
265 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
266 StgTVar *s STG_UNUSED,
267 StgClosure *expected) {
268 StgClosure *result;
269 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
270 ASSERT(smp_locked == trec);
271 result = s -> current_value;
272 TRACE("%p : %d", result ? "success" : "failure");
273 return (result == expected);
274 }
275
276 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
277 // Nothing -- protected by STM lock
278 return true;
279 }
280
281 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
282 // Nothing -- protected by STM lock
283 }
284 #endif
285
286 #if defined(STM_FG_LOCKS) /*...................................*/
287
288 #undef IF_STM_FG_LOCKS
289 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
290 static const StgBool config_use_read_phase = true;
291
292 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
293 TRACE("%p : lock_stm()", trec);
294 }
295
296 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
297 TRACE("%p : unlock_stm()", trec);
298 }
299
300 static StgClosure *lock_tvar(StgTRecHeader *trec,
301 StgTVar *s STG_UNUSED) {
302 StgClosure *result;
303 TRACE("%p : lock_tvar(%p)", trec, s);
304 do {
305 do {
306 result = s -> current_value;
307 } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
308 } while (cas((void *)&(s -> current_value),
309 (StgWord)result, (StgWord)trec) != (StgWord)result);
310 return result;
311 }
312
313 static void unlock_tvar(Capability *cap,
314 StgTRecHeader *trec STG_UNUSED,
315 StgTVar *s,
316 StgClosure *c,
317 StgBool force_update STG_UNUSED) {
318 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
319 ASSERT(s -> current_value == (StgClosure *)trec);
320 s -> current_value = c;
321 dirty_TVAR(cap,s);
322 }
323
324 static StgBool cond_lock_tvar(StgTRecHeader *trec,
325 StgTVar *s,
326 StgClosure *expected) {
327 StgClosure *result;
328 StgWord w;
329 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
330 w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
331 result = (StgClosure *)w;
332 TRACE("%p : %s", trec, result ? "success" : "failure");
333 return (result == expected);
334 }
335
336 static StgBool lock_inv(StgAtomicInvariant *inv) {
337 return (cas(&(inv -> lock), 0, 1) == 0);
338 }
339
340 static void unlock_inv(StgAtomicInvariant *inv) {
341 ASSERT(inv -> lock == 1);
342 inv -> lock = 0;
343 }
344 #endif
345
346 /*......................................................................*/
347
348 static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
349 StgClosure *c = q -> closure;
350 const StgInfoTable *info = get_itbl(c);
351 return (info -> type) == TSO;
352 }
353
354 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
355 StgClosure *c = q -> closure;
356 return (c->header.info == &stg_ATOMIC_INVARIANT_info);
357 }
358
359 /*......................................................................*/
360
361 // Helper functions for thread blocking and unblocking
362
363 static void park_tso(StgTSO *tso) {
364 ASSERT(tso -> why_blocked == NotBlocked);
365 tso -> why_blocked = BlockedOnSTM;
366 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
367 TRACE("park_tso on tso=%p", tso);
368 }
369
370 static void unpark_tso(Capability *cap, StgTSO *tso) {
371 // We will continue unparking threads while they remain on one of the wait
372 // queues: it's up to the thread itself to remove it from the wait queues
373 // if it decides to do so when it is scheduled.
374
375 // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
376 // to avoid multiple CPUs unblocking the same TSO, and also to
377 // synchronise with throwTo(). The first time the TSO is unblocked
378 // we mark this fact by setting block_info.closure == STM_AWOKEN.
379 // This way we can avoid sending further wakeup messages in the
380 // future.
381 lockTSO(tso);
382 if (tso->why_blocked == BlockedOnSTM &&
383 tso->block_info.closure == &stg_STM_AWOKEN_closure) {
384 TRACE("unpark_tso already woken up tso=%p", tso);
385 } else if (tso -> why_blocked == BlockedOnSTM) {
386 TRACE("unpark_tso on tso=%p", tso);
387 tso->block_info.closure = &stg_STM_AWOKEN_closure;
388 tryWakeupThread(cap,tso);
389 } else {
390 TRACE("spurious unpark_tso on tso=%p", tso);
391 }
392 unlockTSO(tso);
393 }
394
395 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
396 StgTVarWatchQueue *q;
397 StgTVarWatchQueue *trail;
398 TRACE("unpark_waiters_on tvar=%p", s);
399 // unblock TSOs in reverse order, to be a bit fairer (#2319)
400 for (q = s -> first_watch_queue_entry, trail = q;
401 q != END_STM_WATCH_QUEUE;
402 q = q -> next_queue_entry) {
403 trail = q;
404 }
405 q = trail;
406 for (;
407 q != END_STM_WATCH_QUEUE;
408 q = q -> prev_queue_entry) {
409 if (watcher_is_tso(q)) {
410 unpark_tso(cap, (StgTSO *)(q -> closure));
411 }
412 }
413 }
414
415 /*......................................................................*/
416
417 // Helper functions for downstream allocation and initialization
418
419 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
420 StgAtomicInvariant *invariant) {
421 StgInvariantCheckQueue *result;
422 result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue));
423 SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
424 result -> invariant = invariant;
425 result -> my_execution = NO_TREC;
426 return result;
427 }
428
429 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
430 StgClosure *closure) {
431 StgTVarWatchQueue *result;
432 result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
433 SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
434 result -> closure = closure;
435 return result;
436 }
437
438 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
439 StgTRecChunk *result;
440 result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
441 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
442 result -> prev_chunk = END_STM_CHUNK_LIST;
443 result -> next_entry_idx = 0;
444 return result;
445 }
446
447 static StgTRecHeader *new_stg_trec_header(Capability *cap,
448 StgTRecHeader *enclosing_trec) {
449 StgTRecHeader *result;
450 result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
451 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
452
453 result -> enclosing_trec = enclosing_trec;
454 result -> current_chunk = new_stg_trec_chunk(cap);
455 result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
456
457 if (enclosing_trec == NO_TREC) {
458 result -> state = TREC_ACTIVE;
459 } else {
460 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
461 enclosing_trec -> state == TREC_CONDEMNED);
462 result -> state = enclosing_trec -> state;
463 }
464
465 return result;
466 }
467
468 /*......................................................................*/
469
470 // Allocation / deallocation functions that retain per-capability lists
471 // of closures that can be re-used
472
473 static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
474 StgAtomicInvariant *invariant) {
475 StgInvariantCheckQueue *result = NULL;
476 if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
477 result = new_stg_invariant_check_queue(cap, invariant);
478 } else {
479 result = cap -> free_invariant_check_queues;
480 result -> invariant = invariant;
481 result -> my_execution = NO_TREC;
482 cap -> free_invariant_check_queues = result -> next_queue_entry;
483 }
484 return result;
485 }
486
487 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
488 StgClosure *closure) {
489 StgTVarWatchQueue *result = NULL;
490 if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
491 result = new_stg_tvar_watch_queue(cap, closure);
492 } else {
493 result = cap -> free_tvar_watch_queues;
494 result -> closure = closure;
495 cap -> free_tvar_watch_queues = result -> next_queue_entry;
496 }
497 return result;
498 }
499
500 static void free_stg_tvar_watch_queue(Capability *cap,
501 StgTVarWatchQueue *wq) {
502 #if defined(REUSE_MEMORY)
503 wq -> next_queue_entry = cap -> free_tvar_watch_queues;
504 cap -> free_tvar_watch_queues = wq;
505 #endif
506 }
507
508 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
509 StgTRecChunk *result = NULL;
510 if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
511 result = new_stg_trec_chunk(cap);
512 } else {
513 result = cap -> free_trec_chunks;
514 cap -> free_trec_chunks = result -> prev_chunk;
515 result -> prev_chunk = END_STM_CHUNK_LIST;
516 result -> next_entry_idx = 0;
517 }
518 return result;
519 }
520
521 static void free_stg_trec_chunk(Capability *cap,
522 StgTRecChunk *c) {
523 #if defined(REUSE_MEMORY)
524 c -> prev_chunk = cap -> free_trec_chunks;
525 cap -> free_trec_chunks = c;
526 #endif
527 }
528
529 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
530 StgTRecHeader *enclosing_trec) {
531 StgTRecHeader *result = NULL;
532 if (cap -> free_trec_headers == NO_TREC) {
533 result = new_stg_trec_header(cap, enclosing_trec);
534 } else {
535 result = cap -> free_trec_headers;
536 cap -> free_trec_headers = result -> enclosing_trec;
537 result -> enclosing_trec = enclosing_trec;
538 result -> current_chunk -> next_entry_idx = 0;
539 result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
540 if (enclosing_trec == NO_TREC) {
541 result -> state = TREC_ACTIVE;
542 } else {
543 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
544 enclosing_trec -> state == TREC_CONDEMNED);
545 result -> state = enclosing_trec -> state;
546 }
547 }
548 return result;
549 }
550
551 static void free_stg_trec_header(Capability *cap,
552 StgTRecHeader *trec) {
553 #if defined(REUSE_MEMORY)
554 StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
555 while (chunk != END_STM_CHUNK_LIST) {
556 StgTRecChunk *prev_chunk = chunk -> prev_chunk;
557 free_stg_trec_chunk(cap, chunk);
558 chunk = prev_chunk;
559 }
560 trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
561 trec -> enclosing_trec = cap -> free_trec_headers;
562 cap -> free_trec_headers = trec;
563 #endif
564 }
565
566 /*......................................................................*/
567
568 // Helper functions for managing waiting lists
569
570 static void build_watch_queue_entries_for_trec(Capability *cap,
571 StgTSO *tso,
572 StgTRecHeader *trec) {
573 ASSERT(trec != NO_TREC);
574 ASSERT(trec -> enclosing_trec == NO_TREC);
575 ASSERT(trec -> state == TREC_ACTIVE);
576
577 TRACE("%p : build_watch_queue_entries_for_trec()", trec);
578
579 FOR_EACH_ENTRY(trec, e, {
580 StgTVar *s;
581 StgTVarWatchQueue *q;
582 StgTVarWatchQueue *fq;
583 s = e -> tvar;
584 TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
585 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
586 NACQ_ASSERT(s -> current_value == e -> expected_value);
587 fq = s -> first_watch_queue_entry;
588 q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
589 q -> next_queue_entry = fq;
590 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
591 if (fq != END_STM_WATCH_QUEUE) {
592 fq -> prev_queue_entry = q;
593 }
594 s -> first_watch_queue_entry = q;
595 e -> new_value = (StgClosure *) q;
596 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
597 });
598 }
599
600 static void remove_watch_queue_entries_for_trec(Capability *cap,
601 StgTRecHeader *trec) {
602 ASSERT(trec != NO_TREC);
603 ASSERT(trec -> enclosing_trec == NO_TREC);
604 ASSERT(trec -> state == TREC_WAITING ||
605 trec -> state == TREC_CONDEMNED);
606
607 TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
608
609 FOR_EACH_ENTRY(trec, e, {
610 StgTVar *s;
611 StgTVarWatchQueue *pq;
612 StgTVarWatchQueue *nq;
613 StgTVarWatchQueue *q;
614 StgClosure *saw;
615 s = e -> tvar;
616 saw = lock_tvar(trec, s);
617 q = (StgTVarWatchQueue *) (e -> new_value);
618 TRACE("%p : removing tso=%p from watch queue for tvar=%p",
619 trec,
620 q -> closure,
621 s);
622 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
623 nq = q -> next_queue_entry;
624 pq = q -> prev_queue_entry;
625 if (nq != END_STM_WATCH_QUEUE) {
626 nq -> prev_queue_entry = pq;
627 }
628 if (pq != END_STM_WATCH_QUEUE) {
629 pq -> next_queue_entry = nq;
630 } else {
631 ASSERT(s -> first_watch_queue_entry == q);
632 s -> first_watch_queue_entry = nq;
633 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
634 }
635 free_stg_tvar_watch_queue(cap, q);
636 unlock_tvar(cap, trec, s, saw, false);
637 });
638 }
639
640 /*......................................................................*/
641
642 static TRecEntry *get_new_entry(Capability *cap,
643 StgTRecHeader *t) {
644 TRecEntry *result;
645 StgTRecChunk *c;
646 int i;
647
648 c = t -> current_chunk;
649 i = c -> next_entry_idx;
650 ASSERT(c != END_STM_CHUNK_LIST);
651
652 if (i < TREC_CHUNK_NUM_ENTRIES) {
653 // Continue to use current chunk
654 result = &(c -> entries[i]);
655 c -> next_entry_idx ++;
656 } else {
657 // Current chunk is full: allocate a fresh one
658 StgTRecChunk *nc;
659 nc = alloc_stg_trec_chunk(cap);
660 nc -> prev_chunk = c;
661 nc -> next_entry_idx = 1;
662 t -> current_chunk = nc;
663 result = &(nc -> entries[0]);
664 }
665
666 return result;
667 }
668
669 /*......................................................................*/
670
671 static void merge_update_into(Capability *cap,
672 StgTRecHeader *t,
673 StgTVar *tvar,
674 StgClosure *expected_value,
675 StgClosure *new_value)
676 {
677 // Look for an entry in this trec
678 bool found = false;
679 FOR_EACH_ENTRY(t, e, {
680 StgTVar *s;
681 s = e -> tvar;
682 if (s == tvar) {
683 found = true;
684 if (e -> expected_value != expected_value) {
685 // Must abort if the two entries start from different values
686 TRACE("%p : update entries inconsistent at %p (%p vs %p)",
687 t, tvar, e -> expected_value, expected_value);
688 t -> state = TREC_CONDEMNED;
689 }
690 e -> new_value = new_value;
691 BREAK_FOR_EACH;
692 }
693 });
694
695 if (!found) {
696 // No entry so far in this trec
697 TRecEntry *ne;
698 ne = get_new_entry(cap, t);
699 ne -> tvar = tvar;
700 ne -> expected_value = expected_value;
701 ne -> new_value = new_value;
702 }
703 }
704
705 /*......................................................................*/
706
707 static void merge_read_into(Capability *cap,
708 StgTRecHeader *trec,
709 StgTVar *tvar,
710 StgClosure *expected_value)
711 {
712 StgTRecHeader *t;
713 bool found = false;
714
715 //
716 // See #7493
717 //
718 // We need to look for an existing entry *anywhere* in the stack of
719 // nested transactions. Otherwise, in stmCommitNestedTransaction()
720 // we can't tell the difference between
721 //
722 // (1) a read-only entry
723 // (2) an entry that writes back the original value
724 //
725 // Since in both cases e->new_value == e->expected_value. But in (1)
726 // we want to do nothing, and in (2) we want to update e->new_value
727 // in the outer transaction.
728 //
729 // Here we deal with the first possibility: we never create a
730 // read-only entry in an inner transaction if there is an existing
731 // outer entry; so we never have an inner read and an outer update.
732 // So then in stmCommitNestedTransaction() we know we can always
733 // write e->new_value over the outer entry, because the inner entry
734 // is the most up to date.
735 //
736 for (t = trec; !found && t != NO_TREC; t = t -> enclosing_trec)
737 {
738 FOR_EACH_ENTRY(t, e, {
739 if (e -> tvar == tvar) {
740 found = true;
741 if (e -> expected_value != expected_value) {
742 // Must abort if the two entries start from different values
743 TRACE("%p : read entries inconsistent at %p (%p vs %p)",
744 t, tvar, e -> expected_value, expected_value);
745 t -> state = TREC_CONDEMNED;
746 }
747 BREAK_FOR_EACH;
748 }
749 });
750 }
751
752 if (!found) {
753 // No entry found
754 TRecEntry *ne;
755 ne = get_new_entry(cap, trec);
756 ne -> tvar = tvar;
757 ne -> expected_value = expected_value;
758 ne -> new_value = expected_value;
759 }
760 }
761
762 /*......................................................................*/
763
764 static StgBool entry_is_update(TRecEntry *e) {
765 StgBool result;
766 result = (e -> expected_value != e -> new_value);
767 return result;
768 }
769
770 #if defined(STM_FG_LOCKS)
771 static StgBool entry_is_read_only(TRecEntry *e) {
772 StgBool result;
773 result = (e -> expected_value == e -> new_value);
774 return result;
775 }
776
777 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
778 StgClosure *c;
779 StgBool result;
780 c = s -> current_value;
781 result = (c == (StgClosure *) h);
782 return result;
783 }
784 #endif
785
786 // revert_ownership : release a lock on a TVar, storing back
787 // the value that it held when the lock was acquired. "revert_all"
788 // is set in stmWait and stmReWait when we acquired locks on all of
789 // the TVars involved. "revert_all" is not set in commit operations
790 // where we don't lock TVars that have been read from but not updated.
791
792 static void revert_ownership(Capability *cap STG_UNUSED,
793 StgTRecHeader *trec STG_UNUSED,
794 StgBool revert_all STG_UNUSED) {
795 #if defined(STM_FG_LOCKS)
796 FOR_EACH_ENTRY(trec, e, {
797 if (revert_all || entry_is_update(e)) {
798 StgTVar *s;
799 s = e -> tvar;
800 if (tvar_is_locked(s, trec)) {
801 unlock_tvar(cap, trec, s, e -> expected_value, true);
802 }
803 }
804 });
805 #endif
806 }
807
808 /*......................................................................*/
809
810 // validate_and_acquire_ownership : this performs the twin functions
811 // of checking that the TVars referred to by entries in trec hold the
812 // expected values and:
813 //
814 // - locking the TVar (on updated TVars during commit, or all TVars
815 // during wait)
816 //
817 // - recording the identity of the TRec who wrote the value seen in the
818 // TVar (on non-updated TVars during commit). These values are
819 // stashed in the TRec entries and are then checked in check_read_only
820 // to ensure that an atomic snapshot of all of these locations has been
821 // seen.
822
823 static StgBool validate_and_acquire_ownership (Capability *cap,
824 StgTRecHeader *trec,
825 int acquire_all,
826 int retain_ownership) {
827 StgBool result;
828
829 if (shake()) {
830 TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
831 return false;
832 }
833
834 ASSERT((trec -> state == TREC_ACTIVE) ||
835 (trec -> state == TREC_WAITING) ||
836 (trec -> state == TREC_CONDEMNED));
837 result = !((trec -> state) == TREC_CONDEMNED);
838 if (result) {
839 FOR_EACH_ENTRY(trec, e, {
840 StgTVar *s;
841 s = e -> tvar;
842 if (acquire_all || entry_is_update(e)) {
843 TRACE("%p : trying to acquire %p", trec, s);
844 if (!cond_lock_tvar(trec, s, e -> expected_value)) {
845 TRACE("%p : failed to acquire %p", trec, s);
846 result = false;
847 BREAK_FOR_EACH;
848 }
849 } else {
850 ASSERT(config_use_read_phase);
851 IF_STM_FG_LOCKS({
852 TRACE("%p : will need to check %p", trec, s);
853 if (s -> current_value != e -> expected_value) {
854 TRACE("%p : doesn't match", trec);
855 result = false;
856 BREAK_FOR_EACH;
857 }
858 e -> num_updates = s -> num_updates;
859 if (s -> current_value != e -> expected_value) {
860 TRACE("%p : doesn't match (race)", trec);
861 result = false;
862 BREAK_FOR_EACH;
863 } else {
864 TRACE("%p : need to check version %ld", trec, e -> num_updates);
865 }
866 });
867 }
868 });
869 }
870
871 if ((!result) || (!retain_ownership)) {
872 revert_ownership(cap, trec, acquire_all);
873 }
874
875 return result;
876 }
877
878 // check_read_only : check that we've seen an atomic snapshot of the
879 // non-updated TVars accessed by a trec. This checks that the last TRec to
880 // commit an update to the TVar is unchanged since the value was stashed in
881 // validate_and_acquire_ownership. If no udpate is seen to any TVar than
882 // all of them contained their expected values at the start of the call to
883 // check_read_only.
884 //
885 // The paper "Concurrent programming without locks" (under submission), or
886 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
887 // this kind of algorithm.
888
889 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
890 StgBool result = true;
891
892 ASSERT(config_use_read_phase);
893 IF_STM_FG_LOCKS({
894 FOR_EACH_ENTRY(trec, e, {
895 StgTVar *s;
896 s = e -> tvar;
897 if (entry_is_read_only(e)) {
898 TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
899
900 // Note we need both checks and in this order as the TVar could be
901 // locked by another transaction that is committing but has not yet
902 // incremented `num_updates` (See #7815).
903 if (s -> current_value != e -> expected_value ||
904 s -> num_updates != e -> num_updates) {
905 TRACE("%p : mismatch", trec);
906 result = false;
907 BREAK_FOR_EACH;
908 }
909 }
910 });
911 });
912
913 return result;
914 }
915
916
917 /************************************************************************/
918
919 void stmPreGCHook (Capability *cap) {
920 lock_stm(NO_TREC);
921 TRACE("stmPreGCHook");
922 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
923 cap->free_trec_chunks = END_STM_CHUNK_LIST;
924 cap->free_trec_headers = NO_TREC;
925 unlock_stm(NO_TREC);
926 }
927
928 /************************************************************************/
929
930 // check_read_only relies on version numbers held in TVars' "num_updates"
931 // fields not wrapping around while a transaction is committed. The version
932 // number is incremented each time an update is committed to the TVar
933 // This is unlikely to wrap around when 32-bit integers are used for the counts,
934 // but to ensure correctness we maintain a shared count on the maximum
935 // number of commit operations that may occur and check that this has
936 // not increased by more than 2^32 during a commit.
937
938 #define TOKEN_BATCH_SIZE 1024
939
940 static volatile StgInt64 max_commits = 0;
941
942 #if defined(THREADED_RTS)
943 static volatile StgWord token_locked = false;
944
945 static void getTokenBatch(Capability *cap) {
946 while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
947 max_commits += TOKEN_BATCH_SIZE;
948 TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
949 cap -> transaction_tokens = TOKEN_BATCH_SIZE;
950 token_locked = false;
951 }
952
953 static void getToken(Capability *cap) {
954 if (cap -> transaction_tokens == 0) {
955 getTokenBatch(cap);
956 }
957 cap -> transaction_tokens --;
958 }
959 #else
960 static void getToken(Capability *cap STG_UNUSED) {
961 // Nothing
962 }
963 #endif
964
965 /*......................................................................*/
966
967 StgTRecHeader *stmStartTransaction(Capability *cap,
968 StgTRecHeader *outer) {
969 StgTRecHeader *t;
970 TRACE("%p : stmStartTransaction with %d tokens",
971 outer,
972 cap -> transaction_tokens);
973
974 getToken(cap);
975
976 t = alloc_stg_trec_header(cap, outer);
977 TRACE("%p : stmStartTransaction()=%p", outer, t);
978 return t;
979 }
980
981 /*......................................................................*/
982
983 void stmAbortTransaction(Capability *cap,
984 StgTRecHeader *trec) {
985 StgTRecHeader *et;
986 TRACE("%p : stmAbortTransaction", trec);
987 ASSERT(trec != NO_TREC);
988 ASSERT((trec -> state == TREC_ACTIVE) ||
989 (trec -> state == TREC_WAITING) ||
990 (trec -> state == TREC_CONDEMNED));
991
992 lock_stm(trec);
993
994 et = trec -> enclosing_trec;
995 if (et == NO_TREC) {
996 // We're a top-level transaction: remove any watch queue entries that
997 // we may have.
998 TRACE("%p : aborting top-level transaction", trec);
999
1000 if (trec -> state == TREC_WAITING) {
1001 ASSERT(trec -> enclosing_trec == NO_TREC);
1002 TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
1003 remove_watch_queue_entries_for_trec(cap, trec);
1004 }
1005
1006 } else {
1007 // We're a nested transaction: merge our read set into our parent's
1008 TRACE("%p : retaining read-set into parent %p", trec, et);
1009
1010 FOR_EACH_ENTRY(trec, e, {
1011 StgTVar *s = e -> tvar;
1012 merge_read_into(cap, et, s, e -> expected_value);
1013 });
1014 }
1015
1016 trec -> state = TREC_ABORTED;
1017 unlock_stm(trec);
1018
1019 TRACE("%p : stmAbortTransaction done", trec);
1020 }
1021
1022 /*......................................................................*/
1023
1024 void stmFreeAbortedTRec(Capability *cap,
1025 StgTRecHeader *trec) {
1026 TRACE("%p : stmFreeAbortedTRec", trec);
1027 ASSERT(trec != NO_TREC);
1028 ASSERT((trec -> state == TREC_CONDEMNED) ||
1029 (trec -> state == TREC_ABORTED));
1030
1031 free_stg_trec_header(cap, trec);
1032
1033 TRACE("%p : stmFreeAbortedTRec done", trec);
1034 }
1035
1036 /*......................................................................*/
1037
1038 void stmCondemnTransaction(Capability *cap,
1039 StgTRecHeader *trec) {
1040 TRACE("%p : stmCondemnTransaction", trec);
1041 ASSERT(trec != NO_TREC);
1042 ASSERT((trec -> state == TREC_ACTIVE) ||
1043 (trec -> state == TREC_WAITING) ||
1044 (trec -> state == TREC_CONDEMNED));
1045
1046 lock_stm(trec);
1047 if (trec -> state == TREC_WAITING) {
1048 ASSERT(trec -> enclosing_trec == NO_TREC);
1049 TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
1050 remove_watch_queue_entries_for_trec(cap, trec);
1051 }
1052 trec -> state = TREC_CONDEMNED;
1053 unlock_stm(trec);
1054
1055 TRACE("%p : stmCondemnTransaction done", trec);
1056 }
1057
1058 /*......................................................................*/
1059
1060 StgBool stmValidateNestOfTransactions(Capability *cap, StgTRecHeader *trec) {
1061 StgTRecHeader *t;
1062
1063 TRACE("%p : stmValidateNestOfTransactions", trec);
1064 ASSERT(trec != NO_TREC);
1065 ASSERT((trec -> state == TREC_ACTIVE) ||
1066 (trec -> state == TREC_WAITING) ||
1067 (trec -> state == TREC_CONDEMNED));
1068
1069 lock_stm(trec);
1070
1071 t = trec;
1072 StgBool result = true;
1073 while (t != NO_TREC) {
1074 result &= validate_and_acquire_ownership(cap, t, true, false);
1075 t = t -> enclosing_trec;
1076 }
1077
1078 if (!result && trec -> state != TREC_WAITING) {
1079 trec -> state = TREC_CONDEMNED;
1080 }
1081
1082 unlock_stm(trec);
1083
1084 TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1085 return result;
1086 }
1087
1088 /*......................................................................*/
1089
1090 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1091 TRecEntry *result = NULL;
1092
1093 TRACE("%p : get_entry_for TVar %p", trec, tvar);
1094 ASSERT(trec != NO_TREC);
1095
1096 do {
1097 FOR_EACH_ENTRY(trec, e, {
1098 if (e -> tvar == tvar) {
1099 result = e;
1100 if (in != NULL) {
1101 *in = trec;
1102 }
1103 BREAK_FOR_EACH;
1104 }
1105 });
1106 trec = trec -> enclosing_trec;
1107 } while (result == NULL && trec != NO_TREC);
1108
1109 return result;
1110 }
1111
1112 /*......................................................................*/
1113
1114 /*
1115 * Add/remove links between an invariant TVars. The caller must have
1116 * locked the TVars involved and the invariant.
1117 */
1118
1119 static void disconnect_invariant(Capability *cap,
1120 StgAtomicInvariant *inv) {
1121 StgTRecHeader *last_execution = inv -> last_execution;
1122
1123 TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
1124
1125 FOR_EACH_ENTRY(last_execution, e, {
1126 StgTVar *s = e -> tvar;
1127 StgTVarWatchQueue *q = s -> first_watch_queue_entry;
1128 DEBUG_ONLY( StgBool found = false );
1129 TRACE(" looking for trec on tvar=%p", s);
1130 for (q = s -> first_watch_queue_entry;
1131 q != END_STM_WATCH_QUEUE;
1132 q = q -> next_queue_entry) {
1133 if (q -> closure == (StgClosure*)inv) {
1134 StgTVarWatchQueue *pq;
1135 StgTVarWatchQueue *nq;
1136 nq = q -> next_queue_entry;
1137 pq = q -> prev_queue_entry;
1138 if (nq != END_STM_WATCH_QUEUE) {
1139 nq -> prev_queue_entry = pq;
1140 }
1141 if (pq != END_STM_WATCH_QUEUE) {
1142 pq -> next_queue_entry = nq;
1143 } else {
1144 ASSERT(s -> first_watch_queue_entry == q);
1145 s -> first_watch_queue_entry = nq;
1146 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
1147 }
1148 TRACE(" found it in watch queue entry %p", q);
1149 free_stg_tvar_watch_queue(cap, q);
1150 DEBUG_ONLY( found = true );
1151 break;
1152 }
1153 }
1154 ASSERT(found);
1155 });
1156 inv -> last_execution = NO_TREC;
1157 }
1158
1159 static void connect_invariant_to_trec(Capability *cap,
1160 StgAtomicInvariant *inv,
1161 StgTRecHeader *my_execution) {
1162 TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
1163
1164 ASSERT(inv -> last_execution == NO_TREC);
1165
1166 FOR_EACH_ENTRY(my_execution, e, {
1167 StgTVar *s = e -> tvar;
1168 StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
1169 StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
1170
1171 // We leave "last_execution" holding the values that will be
1172 // in the heap after the transaction we're in the process
1173 // of committing has finished.
1174 TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
1175 if (entry != NULL) {
1176 e -> expected_value = entry -> new_value;
1177 e -> new_value = entry -> new_value;
1178 }
1179
1180 TRACE(" linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
1181 q -> next_queue_entry = fq;
1182 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
1183 if (fq != END_STM_WATCH_QUEUE) {
1184 fq -> prev_queue_entry = q;
1185 }
1186 s -> first_watch_queue_entry = q;
1187 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
1188 });
1189
1190 inv -> last_execution = my_execution;
1191 }
1192
1193 /*
1194 * Add a new invariant to the trec's list of invariants to check on commit
1195 */
1196 void stmAddInvariantToCheck(Capability *cap,
1197 StgTRecHeader *trec,
1198 StgClosure *code) {
1199 StgAtomicInvariant *invariant;
1200 StgInvariantCheckQueue *q;
1201 TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
1202 ASSERT(trec != NO_TREC);
1203 ASSERT(trec -> state == TREC_ACTIVE ||
1204 trec -> state == TREC_CONDEMNED);
1205
1206
1207 // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
1208 // to signal that this is a new invariant in the current atomic block
1209
1210 invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant));
1211 TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
1212 SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
1213 invariant -> code = code;
1214 invariant -> last_execution = NO_TREC;
1215 invariant -> lock = 0;
1216
1217 // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
1218
1219 q = alloc_stg_invariant_check_queue(cap, invariant);
1220 TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
1221 q -> invariant = invariant;
1222 q -> my_execution = NO_TREC;
1223 q -> next_queue_entry = trec -> invariants_to_check;
1224 trec -> invariants_to_check = q;
1225
1226 TRACE("%p : stmAddInvariantToCheck done", trec);
1227 }
1228
1229 /*
1230 * Fill in the trec's list of invariants that might be violated by the
1231 * current transaction.
1232 */
1233
1234 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
1235 StgTRecChunk *c;
1236 TRACE("%p : stmGetInvariantsToCheck, head was %p",
1237 trec,
1238 trec -> invariants_to_check);
1239
1240 ASSERT(trec != NO_TREC);
1241 ASSERT((trec -> state == TREC_ACTIVE) ||
1242 (trec -> state == TREC_WAITING) ||
1243 (trec -> state == TREC_CONDEMNED));
1244 ASSERT(trec -> enclosing_trec == NO_TREC);
1245
1246 lock_stm(trec);
1247 c = trec -> current_chunk;
1248 while (c != END_STM_CHUNK_LIST) {
1249 unsigned int i;
1250 for (i = 0; i < c -> next_entry_idx; i ++) {
1251 TRecEntry *e = &(c -> entries[i]);
1252 if (entry_is_update(e)) {
1253 StgTVar *s = e -> tvar;
1254 StgClosure *old = lock_tvar(trec, s);
1255
1256 // Pick up any invariants on the TVar being updated
1257 // by entry "e"
1258
1259 StgTVarWatchQueue *q;
1260 TRACE("%p : checking for invariants on %p", trec, s);
1261 for (q = s -> first_watch_queue_entry;
1262 q != END_STM_WATCH_QUEUE;
1263 q = q -> next_queue_entry) {
1264 if (watcher_is_invariant(q)) {
1265 StgBool found = false;
1266 StgInvariantCheckQueue *q2;
1267 TRACE("%p : Touching invariant %p", trec, q -> closure);
1268 for (q2 = trec -> invariants_to_check;
1269 q2 != END_INVARIANT_CHECK_QUEUE;
1270 q2 = q2 -> next_queue_entry) {
1271 if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
1272 TRACE("%p : Already found %p", trec, q -> closure);
1273 found = true;
1274 break;
1275 }
1276 }
1277
1278 if (!found) {
1279 StgInvariantCheckQueue *q3;
1280 TRACE("%p : Not already found %p", trec, q -> closure);
1281 q3 = alloc_stg_invariant_check_queue(cap,
1282 (StgAtomicInvariant*) q -> closure);
1283 q3 -> next_queue_entry = trec -> invariants_to_check;
1284 trec -> invariants_to_check = q3;
1285 }
1286 }
1287 }
1288
1289 unlock_tvar(cap, trec, s, old, false);
1290 }
1291 }
1292 c = c -> prev_chunk;
1293 }
1294
1295 unlock_stm(trec);
1296
1297 TRACE("%p : stmGetInvariantsToCheck, head now %p",
1298 trec,
1299 trec -> invariants_to_check);
1300
1301 return (trec -> invariants_to_check);
1302 }
1303
1304 /*......................................................................*/
1305
1306 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1307 StgInt64 max_commits_at_start = max_commits;
1308 StgBool touched_invariants;
1309 StgBool use_read_phase;
1310
1311 TRACE("%p : stmCommitTransaction()", trec);
1312 ASSERT(trec != NO_TREC);
1313
1314 lock_stm(trec);
1315
1316 ASSERT(trec -> enclosing_trec == NO_TREC);
1317 ASSERT((trec -> state == TREC_ACTIVE) ||
1318 (trec -> state == TREC_CONDEMNED));
1319
1320 // touched_invariants is true if we've written to a TVar with invariants
1321 // attached to it, or if we're trying to add a new invariant to the system.
1322
1323 touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
1324
1325 // If we have touched invariants then (i) lock the invariant, and (ii) add
1326 // the invariant's read set to our own. Step (i) is needed to serialize
1327 // concurrent transactions that attempt to make conflicting updates
1328 // to the invariant's trec (suppose it read from t1 and t2, and that one
1329 // concurrent transcation writes only to t1, and a second writes only to
1330 // t2). Step (ii) is needed so that both transactions will lock t1 and t2
1331 // to gain access to their wait lists (and hence be able to unhook the
1332 // invariant from both tvars).
1333
1334 if (touched_invariants) {
1335 StgInvariantCheckQueue *q = trec -> invariants_to_check;
1336 TRACE("%p : locking invariants", trec);
1337 while (q != END_INVARIANT_CHECK_QUEUE) {
1338 StgTRecHeader *inv_old_trec;
1339 StgAtomicInvariant *inv;
1340 TRACE("%p : locking invariant %p", trec, q -> invariant);
1341 inv = q -> invariant;
1342 if (!lock_inv(inv)) {
1343 TRACE("%p : failed to lock %p", trec, inv);
1344 trec -> state = TREC_CONDEMNED;
1345 break;
1346 }
1347
1348 inv_old_trec = inv -> last_execution;
1349 if (inv_old_trec != NO_TREC) {
1350 StgTRecChunk *c = inv_old_trec -> current_chunk;
1351 while (c != END_STM_CHUNK_LIST) {
1352 unsigned int i;
1353 for (i = 0; i < c -> next_entry_idx; i ++) {
1354 TRecEntry *e = &(c -> entries[i]);
1355 TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
1356 merge_read_into (cap, trec, e -> tvar, e -> expected_value);
1357 }
1358 c = c -> prev_chunk;
1359 }
1360 }
1361 q = q -> next_queue_entry;
1362 }
1363 TRACE("%p : finished locking invariants", trec);
1364 }
1365
1366 // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1367 // (i) the configuration lets us use a read phase, and (ii) we've not
1368 // touched or introduced any invariants.
1369 //
1370 // In principle we could extend the implementation to support a read-phase
1371 // and invariants, but it complicates the logic: the links between
1372 // invariants and TVars are managed by the TVar watch queues which are
1373 // protected by the TVar's locks.
1374
1375 use_read_phase = ((config_use_read_phase) && (!touched_invariants));
1376
1377 bool result = validate_and_acquire_ownership(cap, trec, (!use_read_phase), true);
1378 if (result) {
1379 // We now know that all the updated locations hold their expected values.
1380 ASSERT(trec -> state == TREC_ACTIVE);
1381
1382 if (use_read_phase) {
1383 StgInt64 max_commits_at_end;
1384 StgInt64 max_concurrent_commits;
1385 TRACE("%p : doing read check", trec);
1386 result = check_read_only(trec);
1387 TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1388
1389 max_commits_at_end = max_commits;
1390 max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1391 (n_capabilities * TOKEN_BATCH_SIZE));
1392 if (((max_concurrent_commits >> 32) > 0) || shake()) {
1393 result = false;
1394 }
1395 }
1396
1397 if (result) {
1398 // We now know that all of the read-only locations held their expected values
1399 // at the end of the call to validate_and_acquire_ownership. This forms the
1400 // linearization point of the commit.
1401
1402 // 1. If we have touched or introduced any invariants then unhook them
1403 // from the TVars they depended on last time they were executed
1404 // and hook them on the TVars that they now depend on.
1405 if (touched_invariants) {
1406 StgInvariantCheckQueue *q = trec -> invariants_to_check;
1407 while (q != END_INVARIANT_CHECK_QUEUE) {
1408 StgAtomicInvariant *inv = q -> invariant;
1409 if (inv -> last_execution != NO_TREC) {
1410 disconnect_invariant(cap, inv);
1411 }
1412
1413 TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
1414 connect_invariant_to_trec(cap, inv, q -> my_execution);
1415
1416 TRACE("%p : unlocking invariant %p", trec, inv);
1417 unlock_inv(inv);
1418
1419 q = q -> next_queue_entry;
1420 }
1421 }
1422
1423 // 2. Make the updates required by the transaction
1424 FOR_EACH_ENTRY(trec, e, {
1425 StgTVar *s;
1426 s = e -> tvar;
1427 if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
1428 // Either the entry is an update or we're not using a read phase:
1429 // write the value back to the TVar, unlocking it if necessary.
1430
1431 ACQ_ASSERT(tvar_is_locked(s, trec));
1432 TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1433 unpark_waiters_on(cap,s);
1434 IF_STM_FG_LOCKS({
1435 s -> num_updates ++;
1436 });
1437 unlock_tvar(cap, trec, s, e -> new_value, true);
1438 }
1439 ACQ_ASSERT(!tvar_is_locked(s, trec));
1440 });
1441 } else {
1442 revert_ownership(cap, trec, false);
1443 }
1444 }
1445
1446 unlock_stm(trec);
1447
1448 free_stg_trec_header(cap, trec);
1449
1450 TRACE("%p : stmCommitTransaction()=%d", trec, result);
1451
1452 return result;
1453 }
1454
1455 /*......................................................................*/
1456
1457 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1458 StgTRecHeader *et;
1459 ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1460 TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1461 ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1462
1463 lock_stm(trec);
1464
1465 et = trec -> enclosing_trec;
1466 bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
1467 if (result) {
1468 // We now know that all the updated locations hold their expected values.
1469
1470 if (config_use_read_phase) {
1471 TRACE("%p : doing read check", trec);
1472 result = check_read_only(trec);
1473 }
1474 if (result) {
1475 // We now know that all of the read-only locations held their expected values
1476 // at the end of the call to validate_and_acquire_ownership. This forms the
1477 // linearization point of the commit.
1478
1479 TRACE("%p : read-check succeeded", trec);
1480 FOR_EACH_ENTRY(trec, e, {
1481 // Merge each entry into the enclosing transaction record, release all
1482 // locks.
1483
1484 StgTVar *s;
1485 s = e -> tvar;
1486 if (entry_is_update(e)) {
1487 unlock_tvar(cap, trec, s, e -> expected_value, false);
1488 }
1489 merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1490 ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1491 });
1492 } else {
1493 revert_ownership(cap, trec, false);
1494 }
1495 }
1496
1497 unlock_stm(trec);
1498
1499 free_stg_trec_header(cap, trec);
1500
1501 TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1502
1503 return result;
1504 }
1505
1506 /*......................................................................*/
1507
1508 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1509 TRACE("%p : stmWait(%p)", trec, tso);
1510 ASSERT(trec != NO_TREC);
1511 ASSERT(trec -> enclosing_trec == NO_TREC);
1512 ASSERT((trec -> state == TREC_ACTIVE) ||
1513 (trec -> state == TREC_CONDEMNED));
1514
1515 lock_stm(trec);
1516 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1517 if (result) {
1518 // The transaction is valid so far so we can actually start waiting.
1519 // (Otherwise the transaction was not valid and the thread will have to
1520 // retry it).
1521
1522 // Put ourselves to sleep. We retain locks on all the TVars involved
1523 // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1524 // in the TSO, (c) TREC_WAITING in the Trec.
1525 build_watch_queue_entries_for_trec(cap, tso, trec);
1526 park_tso(tso);
1527 trec -> state = TREC_WAITING;
1528
1529 // We haven't released ownership of the transaction yet. The TSO
1530 // has been put on the wait queue for the TVars it is waiting for,
1531 // but we haven't yet tidied up the TSO's stack and made it safe
1532 // to wake up the TSO. Therefore, we must wait until the TSO is
1533 // safe to wake up before we release ownership - when all is well,
1534 // the runtime will call stmWaitUnlock() below, with the same
1535 // TRec.
1536
1537 } else {
1538 unlock_stm(trec);
1539 free_stg_trec_header(cap, trec);
1540 }
1541
1542 TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1543 return result;
1544 }
1545
1546
1547 void
1548 stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
1549 revert_ownership(cap, trec, true);
1550 unlock_stm(trec);
1551 }
1552
1553 /*......................................................................*/
1554
1555 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1556 StgTRecHeader *trec = tso->trec;
1557
1558 TRACE("%p : stmReWait", trec);
1559 ASSERT(trec != NO_TREC);
1560 ASSERT(trec -> enclosing_trec == NO_TREC);
1561 ASSERT((trec -> state == TREC_WAITING) ||
1562 (trec -> state == TREC_CONDEMNED));
1563
1564 lock_stm(trec);
1565 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1566 TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1567 if (result) {
1568 // The transaction remains valid -- do nothing because it is already on
1569 // the wait queues
1570 ASSERT(trec -> state == TREC_WAITING);
1571 park_tso(tso);
1572 revert_ownership(cap, trec, true);
1573 } else {
1574 // The transcation has become invalid. We can now remove it from the wait
1575 // queues.
1576 if (trec -> state != TREC_CONDEMNED) {
1577 remove_watch_queue_entries_for_trec (cap, trec);
1578 }
1579 free_stg_trec_header(cap, trec);
1580 }
1581 unlock_stm(trec);
1582
1583 TRACE("%p : stmReWait()=%d", trec, result);
1584 return result;
1585 }
1586
1587 /*......................................................................*/
1588
1589 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1590 StgClosure *result;
1591 result = tvar -> current_value;
1592
1593 #if defined(STM_FG_LOCKS)
1594 while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1595 TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1596 result = tvar -> current_value;
1597 }
1598 #endif
1599
1600 TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1601 return result;
1602 }
1603
1604 /*......................................................................*/
1605
1606 StgClosure *stmReadTVar(Capability *cap,
1607 StgTRecHeader *trec,
1608 StgTVar *tvar) {
1609 StgTRecHeader *entry_in = NULL;
1610 StgClosure *result = NULL;
1611 TRecEntry *entry = NULL;
1612 TRACE("%p : stmReadTVar(%p)", trec, tvar);
1613 ASSERT(trec != NO_TREC);
1614 ASSERT(trec -> state == TREC_ACTIVE ||
1615 trec -> state == TREC_CONDEMNED);
1616
1617 entry = get_entry_for(trec, tvar, &entry_in);
1618
1619 if (entry != NULL) {
1620 if (entry_in == trec) {
1621 // Entry found in our trec
1622 result = entry -> new_value;
1623 } else {
1624 // Entry found in another trec
1625 TRecEntry *new_entry = get_new_entry(cap, trec);
1626 new_entry -> tvar = tvar;
1627 new_entry -> expected_value = entry -> expected_value;
1628 new_entry -> new_value = entry -> new_value;
1629 result = new_entry -> new_value;
1630 }
1631 } else {
1632 // No entry found
1633 StgClosure *current_value = read_current_value(trec, tvar);
1634 TRecEntry *new_entry = get_new_entry(cap, trec);
1635 new_entry -> tvar = tvar;
1636 new_entry -> expected_value = current_value;
1637 new_entry -> new_value = current_value;
1638 result = current_value;
1639 }
1640
1641 TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1642 return result;
1643 }
1644
1645 /*......................................................................*/
1646
1647 void stmWriteTVar(Capability *cap,
1648 StgTRecHeader *trec,
1649 StgTVar *tvar,
1650 StgClosure *new_value) {
1651
1652 StgTRecHeader *entry_in = NULL;
1653 TRecEntry *entry = NULL;
1654 TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1655 ASSERT(trec != NO_TREC);
1656 ASSERT(trec -> state == TREC_ACTIVE ||
1657 trec -> state == TREC_CONDEMNED);
1658
1659 entry = get_entry_for(trec, tvar, &entry_in);
1660
1661 if (entry != NULL) {
1662 if (entry_in == trec) {
1663 // Entry found in our trec
1664 entry -> new_value = new_value;
1665 } else {
1666 // Entry found in another trec
1667 TRecEntry *new_entry = get_new_entry(cap, trec);
1668 new_entry -> tvar = tvar;
1669 new_entry -> expected_value = entry -> expected_value;
1670 new_entry -> new_value = new_value;
1671 }
1672 } else {
1673 // No entry found
1674 StgClosure *current_value = read_current_value(trec, tvar);
1675 TRecEntry *new_entry = get_new_entry(cap, trec);
1676 new_entry -> tvar = tvar;
1677 new_entry -> expected_value = current_value;
1678 new_entry -> new_value = new_value;
1679 }
1680
1681 TRACE("%p : stmWriteTVar done", trec);
1682 }
1683
1684 /*......................................................................*/