Document SRT scavenging behavior of scavenge_block() and scavenge_one()
[ghc.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2 * (c) The GHC Team 1998-2005
3 *
4 * STM implementation.
5 *
6 * Overview
7 * --------
8 *
9 * See the PPoPP 2005 paper "Composable memory transactions". In summary, each
10 * transaction has a TRec (transaction record) holding entries for each of the
11 * TVars (transactional variables) that it has accessed. Each entry records (a)
12 * the TVar, (b) the expected value seen in the TVar, (c) the new value that the
13 * transaction wants to write to the TVar, (d) during commit, the identity of
14 * the TRec that wrote the expected value.
15 *
16 * Separate TRecs are used for each level in a nest of transactions. This
17 * allows a nested transaction to be aborted without condemning its enclosing
18 * transactions. This is needed in the implementation of catchRetry. Note that
19 * the "expected value" in a nested transaction's TRec is the value expected to
20 * be *held in memory* if the transaction commits -- not the "new value" stored
21 * in one of the enclosing transactions. This means that validation can be done
22 * without searching through a nest of TRecs.
23 *
24 * Concurrency control
25 * -------------------
26 *
27 * Three different concurrency control schemes can be built according to the
28 * settings in STM.h:
29 *
30 * STM_UNIPROC assumes that the caller serialises invocations on the STM
31 * interface. In the Haskell RTS this means it is suitable only for
32 * non-THREADED_RTS builds.
33 *
34 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired
35 * during an invocation on the STM interface. Note that this does not mean that
36 * transactions are simply serialized -- the lock is only held *within* the
37 * implementation of stmCommitTransaction, stmWait etc.
38 *
39 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40 * and, when committing a transaction, no locks are acquired for TVars that have
41 * been read but not updated.
42 *
43 * Concurrency control is implemented in the functions:
44 *
45 * lock_stm
46 * unlock_stm
47 * lock_tvar / cond_lock_tvar
48 * unlock_tvar
49 *
50 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the
51 * implementation of these functions.
52 *
53 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54 * using STM_CG_LOCK, and otherwise they are no-ops.
55 *
56 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they have
57 * other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well as the
58 * actual business of manipulating a lock (present only in STM_FG_LOCKS builds).
59 * This is because locking a TVar is implemented by writing the lock holder's
60 * TRec into the TVar's current_value field:
61 *
62 * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value
63 * it contained.
64 *
65 * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it
66 * contains a specified value. Return true if this succeeds,
67 * false otherwise.
68 *
69 * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70 * storing a specified value in place of the lock entry.
71 *
72 * Using these operations, the typical pattern of a commit/validate/wait
73 * operation is to (a) lock the STM, (b) lock all the TVars being updated, (c)
74 * check that the TVars that were only read from still contain their expected
75 * values, (d) release the locks on the TVars, writing updates to them in the
76 * case of a commit, (e) unlock the STM.
77 *
78 * Queues of waiting threads hang off the first_watch_queue_entry field of each
79 * TVar. This may only be manipulated when holding that TVar's lock. In
80 * particular, when a thread is putting itself to sleep, it mustn't release the
81 * TVar's lock until it has added itself to the wait queue and marked its TSO as
82 * BlockedOnSTM -- this makes sure that other threads will know to wake it.
83 *
84 * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94 #include "sm/Storage.h"
95 #include "SMPClosureOps.h"
96
97 #include <stdio.h>
98
99 // ACQ_ASSERT is used for assertions which are only required for
100 // THREADED_RTS builds with fine-grained locking.
101
102 #if defined(STM_FG_LOCKS)
103 #define ACQ_ASSERT(_X) ASSERT(_X)
104 #define NACQ_ASSERT(_X) /*Nothing*/
105 #else
106 #define ACQ_ASSERT(_X) /*Nothing*/
107 #define NACQ_ASSERT(_X) ASSERT(_X)
108 #endif
109
110 /*......................................................................*/
111
112 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
113
114 // If SHAKE is defined then validation will sometimes spuriously fail. They help test
115 // unusual code paths if genuine contention is rare
116 #if defined(SHAKE)
117 static int shake_ctr = 0;
118 static int shake_lim = 1;
119
120 static int shake(void) {
121 if (((shake_ctr++) % shake_lim) == 0) {
122 shake_ctr = 1;
123 shake_lim ++;
124 return true;
125 }
126 return false;
127 }
128 #else
129 static int shake(void) {
130 return false;
131 }
132 #endif
133
134 /*......................................................................*/
135
136 // Helper macros for iterating over entries within a transaction
137 // record
138
139 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
140 StgTRecHeader *__t = (_t); \
141 StgTRecChunk *__c = __t -> current_chunk; \
142 StgWord __limit = __c -> next_entry_idx; \
143 TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit); \
144 while (__c != END_STM_CHUNK_LIST) { \
145 StgWord __i; \
146 for (__i = 0; __i < __limit; __i ++) { \
147 TRecEntry *_x = &(__c -> entries[__i]); \
148 do { CODE } while (0); \
149 } \
150 __c = __c -> prev_chunk; \
151 __limit = TREC_CHUNK_NUM_ENTRIES; \
152 } \
153 exit_for_each: \
154 if (false) goto exit_for_each; \
155 } while (0)
156
157 #define BREAK_FOR_EACH goto exit_for_each
158
159 /*......................................................................*/
160
161 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
162 // and wait queue entries without GC
163
164 #define REUSE_MEMORY
165
166 /*......................................................................*/
167
168 #define IF_STM_UNIPROC(__X) do { } while (0)
169 #define IF_STM_CG_LOCK(__X) do { } while (0)
170 #define IF_STM_FG_LOCKS(__X) do { } while (0)
171
172 #if defined(STM_UNIPROC)
173 #undef IF_STM_UNIPROC
174 #define IF_STM_UNIPROC(__X) do { __X } while (0)
175 static const StgBool config_use_read_phase = false;
176
177 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
178 TRACE("%p : lock_stm()", trec);
179 }
180
181 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
182 TRACE("%p : unlock_stm()", trec);
183 }
184
185 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
186 StgTVar *s STG_UNUSED) {
187 StgClosure *result;
188 TRACE("%p : lock_tvar(%p)", trec, s);
189 result = s -> current_value;
190 return result;
191 }
192
193 static void unlock_tvar(Capability *cap,
194 StgTRecHeader *trec STG_UNUSED,
195 StgTVar *s,
196 StgClosure *c,
197 StgBool force_update) {
198 TRACE("%p : unlock_tvar(%p)", trec, s);
199 if (force_update) {
200 s -> current_value = c;
201 dirty_TVAR(cap,s);
202 }
203 }
204
205 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
206 StgTVar *s STG_UNUSED,
207 StgClosure *expected) {
208 StgClosure *result;
209 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
210 result = s -> current_value;
211 TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
212 return (result == expected);
213 }
214 #endif
215
216 #if defined(STM_CG_LOCK) /*........................................*/
217
218 #undef IF_STM_CG_LOCK
219 #define IF_STM_CG_LOCK(__X) do { __X } while (0)
220 static const StgBool config_use_read_phase = false;
221 static volatile StgTRecHeader *smp_locked = NULL;
222
223 static void lock_stm(StgTRecHeader *trec) {
224 while (cas(&smp_locked, NULL, trec) != NULL) { }
225 TRACE("%p : lock_stm()", trec);
226 }
227
228 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
229 TRACE("%p : unlock_stm()", trec);
230 ASSERT(smp_locked == trec);
231 smp_locked = 0;
232 }
233
234 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
235 StgTVar *s STG_UNUSED) {
236 StgClosure *result;
237 TRACE("%p : lock_tvar(%p)", trec, s);
238 ASSERT(smp_locked == trec);
239 result = s -> current_value;
240 return result;
241 }
242
243 static void *unlock_tvar(Capability *cap,
244 StgTRecHeader *trec STG_UNUSED,
245 StgTVar *s,
246 StgClosure *c,
247 StgBool force_update) {
248 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
249 ASSERT(smp_locked == trec);
250 if (force_update) {
251 s -> current_value = c;
252 dirty_TVAR(cap,s);
253 }
254 }
255
256 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
257 StgTVar *s STG_UNUSED,
258 StgClosure *expected) {
259 StgClosure *result;
260 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
261 ASSERT(smp_locked == trec);
262 result = s -> current_value;
263 TRACE("%p : %d", result ? "success" : "failure");
264 return (result == expected);
265 }
266 #endif
267
268 #if defined(STM_FG_LOCKS) /*...................................*/
269
270 #undef IF_STM_FG_LOCKS
271 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272 static const StgBool config_use_read_phase = true;
273
274 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
275 TRACE("%p : lock_stm()", trec);
276 }
277
278 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
279 TRACE("%p : unlock_stm()", trec);
280 }
281
282 static StgClosure *lock_tvar(StgTRecHeader *trec,
283 StgTVar *s STG_UNUSED) {
284 StgClosure *result;
285 TRACE("%p : lock_tvar(%p)", trec, s);
286 do {
287 do {
288 result = s -> current_value;
289 } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
290 } while (cas((void *)&(s -> current_value),
291 (StgWord)result, (StgWord)trec) != (StgWord)result);
292 return result;
293 }
294
295 static void unlock_tvar(Capability *cap,
296 StgTRecHeader *trec STG_UNUSED,
297 StgTVar *s,
298 StgClosure *c,
299 StgBool force_update STG_UNUSED) {
300 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
301 ASSERT(s -> current_value == (StgClosure *)trec);
302 s -> current_value = c;
303 dirty_TVAR(cap,s);
304 }
305
306 static StgBool cond_lock_tvar(StgTRecHeader *trec,
307 StgTVar *s,
308 StgClosure *expected) {
309 StgClosure *result;
310 StgWord w;
311 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
312 w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
313 result = (StgClosure *)w;
314 TRACE("%p : %s", trec, result ? "success" : "failure");
315 return (result == expected);
316 }
317 #endif
318
319 /*......................................................................*/
320
321 // Helper functions for thread blocking and unblocking
322
323 static void park_tso(StgTSO *tso) {
324 ASSERT(tso -> why_blocked == NotBlocked);
325 tso -> why_blocked = BlockedOnSTM;
326 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
327 TRACE("park_tso on tso=%p", tso);
328 }
329
330 static void unpark_tso(Capability *cap, StgTSO *tso) {
331 // We will continue unparking threads while they remain on one of the wait
332 // queues: it's up to the thread itself to remove it from the wait queues
333 // if it decides to do so when it is scheduled.
334
335 // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
336 // to avoid multiple CPUs unblocking the same TSO, and also to
337 // synchronise with throwTo(). The first time the TSO is unblocked
338 // we mark this fact by setting block_info.closure == STM_AWOKEN.
339 // This way we can avoid sending further wakeup messages in the
340 // future.
341 lockTSO(tso);
342 if (tso->why_blocked == BlockedOnSTM &&
343 tso->block_info.closure == &stg_STM_AWOKEN_closure) {
344 TRACE("unpark_tso already woken up tso=%p", tso);
345 } else if (tso -> why_blocked == BlockedOnSTM) {
346 TRACE("unpark_tso on tso=%p", tso);
347 tso->block_info.closure = &stg_STM_AWOKEN_closure;
348 tryWakeupThread(cap,tso);
349 } else {
350 TRACE("spurious unpark_tso on tso=%p", tso);
351 }
352 unlockTSO(tso);
353 }
354
355 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
356 StgTVarWatchQueue *q;
357 StgTVarWatchQueue *trail;
358 TRACE("unpark_waiters_on tvar=%p", s);
359 // unblock TSOs in reverse order, to be a bit fairer (#2319)
360 for (q = s -> first_watch_queue_entry, trail = q;
361 q != END_STM_WATCH_QUEUE;
362 q = q -> next_queue_entry) {
363 trail = q;
364 }
365 q = trail;
366 for (;
367 q != END_STM_WATCH_QUEUE;
368 q = q -> prev_queue_entry) {
369 unpark_tso(cap, (StgTSO *)(q -> closure));
370 }
371 }
372
373 /*......................................................................*/
374
375 // Helper functions for downstream allocation and initialization
376
377 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
378 StgClosure *closure) {
379 StgTVarWatchQueue *result;
380 result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
381 SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
382 result -> closure = closure;
383 return result;
384 }
385
386 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
387 StgTRecChunk *result;
388 result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
389 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
390 result -> prev_chunk = END_STM_CHUNK_LIST;
391 result -> next_entry_idx = 0;
392 return result;
393 }
394
395 static StgTRecHeader *new_stg_trec_header(Capability *cap,
396 StgTRecHeader *enclosing_trec) {
397 StgTRecHeader *result;
398 result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
399 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
400
401 result -> enclosing_trec = enclosing_trec;
402 result -> current_chunk = new_stg_trec_chunk(cap);
403
404 if (enclosing_trec == NO_TREC) {
405 result -> state = TREC_ACTIVE;
406 } else {
407 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
408 enclosing_trec -> state == TREC_CONDEMNED);
409 result -> state = enclosing_trec -> state;
410 }
411
412 return result;
413 }
414
415 /*......................................................................*/
416
417 // Allocation / deallocation functions that retain per-capability lists
418 // of closures that can be re-used
419
420 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
421 StgClosure *closure) {
422 StgTVarWatchQueue *result = NULL;
423 if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
424 result = new_stg_tvar_watch_queue(cap, closure);
425 } else {
426 result = cap -> free_tvar_watch_queues;
427 result -> closure = closure;
428 cap -> free_tvar_watch_queues = result -> next_queue_entry;
429 }
430 return result;
431 }
432
433 static void free_stg_tvar_watch_queue(Capability *cap,
434 StgTVarWatchQueue *wq) {
435 #if defined(REUSE_MEMORY)
436 wq -> next_queue_entry = cap -> free_tvar_watch_queues;
437 cap -> free_tvar_watch_queues = wq;
438 #endif
439 }
440
441 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
442 StgTRecChunk *result = NULL;
443 if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
444 result = new_stg_trec_chunk(cap);
445 } else {
446 result = cap -> free_trec_chunks;
447 cap -> free_trec_chunks = result -> prev_chunk;
448 result -> prev_chunk = END_STM_CHUNK_LIST;
449 result -> next_entry_idx = 0;
450 }
451 return result;
452 }
453
454 static void free_stg_trec_chunk(Capability *cap,
455 StgTRecChunk *c) {
456 #if defined(REUSE_MEMORY)
457 c -> prev_chunk = cap -> free_trec_chunks;
458 cap -> free_trec_chunks = c;
459 #endif
460 }
461
462 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
463 StgTRecHeader *enclosing_trec) {
464 StgTRecHeader *result = NULL;
465 if (cap -> free_trec_headers == NO_TREC) {
466 result = new_stg_trec_header(cap, enclosing_trec);
467 } else {
468 result = cap -> free_trec_headers;
469 cap -> free_trec_headers = result -> enclosing_trec;
470 result -> enclosing_trec = enclosing_trec;
471 result -> current_chunk -> next_entry_idx = 0;
472 if (enclosing_trec == NO_TREC) {
473 result -> state = TREC_ACTIVE;
474 } else {
475 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
476 enclosing_trec -> state == TREC_CONDEMNED);
477 result -> state = enclosing_trec -> state;
478 }
479 }
480 return result;
481 }
482
483 static void free_stg_trec_header(Capability *cap,
484 StgTRecHeader *trec) {
485 #if defined(REUSE_MEMORY)
486 StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
487 while (chunk != END_STM_CHUNK_LIST) {
488 StgTRecChunk *prev_chunk = chunk -> prev_chunk;
489 free_stg_trec_chunk(cap, chunk);
490 chunk = prev_chunk;
491 }
492 trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
493 trec -> enclosing_trec = cap -> free_trec_headers;
494 cap -> free_trec_headers = trec;
495 #endif
496 }
497
498 /*......................................................................*/
499
500 // Helper functions for managing waiting lists
501
502 static void build_watch_queue_entries_for_trec(Capability *cap,
503 StgTSO *tso,
504 StgTRecHeader *trec) {
505 ASSERT(trec != NO_TREC);
506 ASSERT(trec -> enclosing_trec == NO_TREC);
507 ASSERT(trec -> state == TREC_ACTIVE);
508
509 TRACE("%p : build_watch_queue_entries_for_trec()", trec);
510
511 FOR_EACH_ENTRY(trec, e, {
512 StgTVar *s;
513 StgTVarWatchQueue *q;
514 StgTVarWatchQueue *fq;
515 s = e -> tvar;
516 TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
517 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
518 NACQ_ASSERT(s -> current_value == e -> expected_value);
519 fq = s -> first_watch_queue_entry;
520 q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
521 q -> next_queue_entry = fq;
522 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
523 if (fq != END_STM_WATCH_QUEUE) {
524 fq -> prev_queue_entry = q;
525 }
526 s -> first_watch_queue_entry = q;
527 e -> new_value = (StgClosure *) q;
528 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
529 });
530 }
531
532 static void remove_watch_queue_entries_for_trec(Capability *cap,
533 StgTRecHeader *trec) {
534 ASSERT(trec != NO_TREC);
535 ASSERT(trec -> enclosing_trec == NO_TREC);
536 ASSERT(trec -> state == TREC_WAITING ||
537 trec -> state == TREC_CONDEMNED);
538
539 TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
540
541 FOR_EACH_ENTRY(trec, e, {
542 StgTVar *s;
543 StgTVarWatchQueue *pq;
544 StgTVarWatchQueue *nq;
545 StgTVarWatchQueue *q;
546 StgClosure *saw;
547 s = e -> tvar;
548 saw = lock_tvar(trec, s);
549 q = (StgTVarWatchQueue *) (e -> new_value);
550 TRACE("%p : removing tso=%p from watch queue for tvar=%p",
551 trec,
552 q -> closure,
553 s);
554 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
555 nq = q -> next_queue_entry;
556 pq = q -> prev_queue_entry;
557 if (nq != END_STM_WATCH_QUEUE) {
558 nq -> prev_queue_entry = pq;
559 }
560 if (pq != END_STM_WATCH_QUEUE) {
561 pq -> next_queue_entry = nq;
562 } else {
563 ASSERT(s -> first_watch_queue_entry == q);
564 s -> first_watch_queue_entry = nq;
565 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
566 }
567 free_stg_tvar_watch_queue(cap, q);
568 unlock_tvar(cap, trec, s, saw, false);
569 });
570 }
571
572 /*......................................................................*/
573
574 static TRecEntry *get_new_entry(Capability *cap,
575 StgTRecHeader *t) {
576 TRecEntry *result;
577 StgTRecChunk *c;
578 int i;
579
580 c = t -> current_chunk;
581 i = c -> next_entry_idx;
582 ASSERT(c != END_STM_CHUNK_LIST);
583
584 if (i < TREC_CHUNK_NUM_ENTRIES) {
585 // Continue to use current chunk
586 result = &(c -> entries[i]);
587 c -> next_entry_idx ++;
588 } else {
589 // Current chunk is full: allocate a fresh one
590 StgTRecChunk *nc;
591 nc = alloc_stg_trec_chunk(cap);
592 nc -> prev_chunk = c;
593 nc -> next_entry_idx = 1;
594 t -> current_chunk = nc;
595 result = &(nc -> entries[0]);
596 }
597
598 return result;
599 }
600
601 /*......................................................................*/
602
603 static void merge_update_into(Capability *cap,
604 StgTRecHeader *t,
605 StgTVar *tvar,
606 StgClosure *expected_value,
607 StgClosure *new_value)
608 {
609 // Look for an entry in this trec
610 bool found = false;
611 FOR_EACH_ENTRY(t, e, {
612 StgTVar *s;
613 s = e -> tvar;
614 if (s == tvar) {
615 found = true;
616 if (e -> expected_value != expected_value) {
617 // Must abort if the two entries start from different values
618 TRACE("%p : update entries inconsistent at %p (%p vs %p)",
619 t, tvar, e -> expected_value, expected_value);
620 t -> state = TREC_CONDEMNED;
621 }
622 e -> new_value = new_value;
623 BREAK_FOR_EACH;
624 }
625 });
626
627 if (!found) {
628 // No entry so far in this trec
629 TRecEntry *ne;
630 ne = get_new_entry(cap, t);
631 ne -> tvar = tvar;
632 ne -> expected_value = expected_value;
633 ne -> new_value = new_value;
634 }
635 }
636
637 /*......................................................................*/
638
639 static void merge_read_into(Capability *cap,
640 StgTRecHeader *trec,
641 StgTVar *tvar,
642 StgClosure *expected_value)
643 {
644 StgTRecHeader *t;
645 bool found = false;
646
647 //
648 // See #7493
649 //
650 // We need to look for an existing entry *anywhere* in the stack of
651 // nested transactions. Otherwise, in stmCommitNestedTransaction()
652 // we can't tell the difference between
653 //
654 // (1) a read-only entry
655 // (2) an entry that writes back the original value
656 //
657 // Since in both cases e->new_value == e->expected_value. But in (1)
658 // we want to do nothing, and in (2) we want to update e->new_value
659 // in the outer transaction.
660 //
661 // Here we deal with the first possibility: we never create a
662 // read-only entry in an inner transaction if there is an existing
663 // outer entry; so we never have an inner read and an outer update.
664 // So then in stmCommitNestedTransaction() we know we can always
665 // write e->new_value over the outer entry, because the inner entry
666 // is the most up to date.
667 //
668 for (t = trec; !found && t != NO_TREC; t = t -> enclosing_trec)
669 {
670 FOR_EACH_ENTRY(t, e, {
671 if (e -> tvar == tvar) {
672 found = true;
673 if (e -> expected_value != expected_value) {
674 // Must abort if the two entries start from different values
675 TRACE("%p : read entries inconsistent at %p (%p vs %p)",
676 t, tvar, e -> expected_value, expected_value);
677 t -> state = TREC_CONDEMNED;
678 }
679 BREAK_FOR_EACH;
680 }
681 });
682 }
683
684 if (!found) {
685 // No entry found
686 TRecEntry *ne;
687 ne = get_new_entry(cap, trec);
688 ne -> tvar = tvar;
689 ne -> expected_value = expected_value;
690 ne -> new_value = expected_value;
691 }
692 }
693
694 /*......................................................................*/
695
696 static StgBool entry_is_update(TRecEntry *e) {
697 StgBool result;
698 result = (e -> expected_value != e -> new_value);
699 return result;
700 }
701
702 #if defined(STM_FG_LOCKS)
703 static StgBool entry_is_read_only(TRecEntry *e) {
704 StgBool result;
705 result = (e -> expected_value == e -> new_value);
706 return result;
707 }
708
709 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
710 StgClosure *c;
711 StgBool result;
712 c = s -> current_value;
713 result = (c == (StgClosure *) h);
714 return result;
715 }
716 #endif
717
718 // revert_ownership : release a lock on a TVar, storing back
719 // the value that it held when the lock was acquired. "revert_all"
720 // is set in stmWait and stmReWait when we acquired locks on all of
721 // the TVars involved. "revert_all" is not set in commit operations
722 // where we don't lock TVars that have been read from but not updated.
723
724 static void revert_ownership(Capability *cap STG_UNUSED,
725 StgTRecHeader *trec STG_UNUSED,
726 StgBool revert_all STG_UNUSED) {
727 #if defined(STM_FG_LOCKS)
728 FOR_EACH_ENTRY(trec, e, {
729 if (revert_all || entry_is_update(e)) {
730 StgTVar *s;
731 s = e -> tvar;
732 if (tvar_is_locked(s, trec)) {
733 unlock_tvar(cap, trec, s, e -> expected_value, true);
734 }
735 }
736 });
737 #endif
738 }
739
740 /*......................................................................*/
741
742 // validate_and_acquire_ownership : this performs the twin functions
743 // of checking that the TVars referred to by entries in trec hold the
744 // expected values and:
745 //
746 // - locking the TVar (on updated TVars during commit, or all TVars
747 // during wait)
748 //
749 // - recording the identity of the TRec who wrote the value seen in the
750 // TVar (on non-updated TVars during commit). These values are
751 // stashed in the TRec entries and are then checked in check_read_only
752 // to ensure that an atomic snapshot of all of these locations has been
753 // seen.
754
755 static StgBool validate_and_acquire_ownership (Capability *cap,
756 StgTRecHeader *trec,
757 int acquire_all,
758 int retain_ownership) {
759 StgBool result;
760
761 if (shake()) {
762 TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
763 return false;
764 }
765
766 ASSERT((trec -> state == TREC_ACTIVE) ||
767 (trec -> state == TREC_WAITING) ||
768 (trec -> state == TREC_CONDEMNED));
769 result = !((trec -> state) == TREC_CONDEMNED);
770 if (result) {
771 FOR_EACH_ENTRY(trec, e, {
772 StgTVar *s;
773 s = e -> tvar;
774 if (acquire_all || entry_is_update(e)) {
775 TRACE("%p : trying to acquire %p", trec, s);
776 if (!cond_lock_tvar(trec, s, e -> expected_value)) {
777 TRACE("%p : failed to acquire %p", trec, s);
778 result = false;
779 BREAK_FOR_EACH;
780 }
781 } else {
782 ASSERT(config_use_read_phase);
783 IF_STM_FG_LOCKS({
784 TRACE("%p : will need to check %p", trec, s);
785 if (s -> current_value != e -> expected_value) {
786 TRACE("%p : doesn't match", trec);
787 result = false;
788 BREAK_FOR_EACH;
789 }
790 e -> num_updates = s -> num_updates;
791 if (s -> current_value != e -> expected_value) {
792 TRACE("%p : doesn't match (race)", trec);
793 result = false;
794 BREAK_FOR_EACH;
795 } else {
796 TRACE("%p : need to check version %ld", trec, e -> num_updates);
797 }
798 });
799 }
800 });
801 }
802
803 if ((!result) || (!retain_ownership)) {
804 revert_ownership(cap, trec, acquire_all);
805 }
806
807 return result;
808 }
809
810 // check_read_only : check that we've seen an atomic snapshot of the
811 // non-updated TVars accessed by a trec. This checks that the last TRec to
812 // commit an update to the TVar is unchanged since the value was stashed in
813 // validate_and_acquire_ownership. If no udpate is seen to any TVar than
814 // all of them contained their expected values at the start of the call to
815 // check_read_only.
816 //
817 // The paper "Concurrent programming without locks" (under submission), or
818 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
819 // this kind of algorithm.
820
821 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
822 StgBool result = true;
823
824 ASSERT(config_use_read_phase);
825 IF_STM_FG_LOCKS({
826 FOR_EACH_ENTRY(trec, e, {
827 StgTVar *s;
828 s = e -> tvar;
829 if (entry_is_read_only(e)) {
830 TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
831
832 // Note we need both checks and in this order as the TVar could be
833 // locked by another transaction that is committing but has not yet
834 // incremented `num_updates` (See #7815).
835 if (s -> current_value != e -> expected_value ||
836 s -> num_updates != e -> num_updates) {
837 TRACE("%p : mismatch", trec);
838 result = false;
839 BREAK_FOR_EACH;
840 }
841 }
842 });
843 });
844
845 return result;
846 }
847
848
849 /************************************************************************/
850
851 void stmPreGCHook (Capability *cap) {
852 lock_stm(NO_TREC);
853 TRACE("stmPreGCHook");
854 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
855 cap->free_trec_chunks = END_STM_CHUNK_LIST;
856 cap->free_trec_headers = NO_TREC;
857 unlock_stm(NO_TREC);
858 }
859
860 /************************************************************************/
861
862 // check_read_only relies on version numbers held in TVars' "num_updates"
863 // fields not wrapping around while a transaction is committed. The version
864 // number is incremented each time an update is committed to the TVar
865 // This is unlikely to wrap around when 32-bit integers are used for the counts,
866 // but to ensure correctness we maintain a shared count on the maximum
867 // number of commit operations that may occur and check that this has
868 // not increased by more than 2^32 during a commit.
869
870 #define TOKEN_BATCH_SIZE 1024
871
872 static volatile StgInt64 max_commits = 0;
873
874 #if defined(THREADED_RTS)
875 static volatile StgWord token_locked = false;
876
877 static void getTokenBatch(Capability *cap) {
878 while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
879 max_commits += TOKEN_BATCH_SIZE;
880 TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
881 cap -> transaction_tokens = TOKEN_BATCH_SIZE;
882 token_locked = false;
883 }
884
885 static void getToken(Capability *cap) {
886 if (cap -> transaction_tokens == 0) {
887 getTokenBatch(cap);
888 }
889 cap -> transaction_tokens --;
890 }
891 #else
892 static void getToken(Capability *cap STG_UNUSED) {
893 // Nothing
894 }
895 #endif
896
897 /*......................................................................*/
898
899 StgTRecHeader *stmStartTransaction(Capability *cap,
900 StgTRecHeader *outer) {
901 StgTRecHeader *t;
902 TRACE("%p : stmStartTransaction with %d tokens",
903 outer,
904 cap -> transaction_tokens);
905
906 getToken(cap);
907
908 t = alloc_stg_trec_header(cap, outer);
909 TRACE("%p : stmStartTransaction()=%p", outer, t);
910 return t;
911 }
912
913 /*......................................................................*/
914
915 void stmAbortTransaction(Capability *cap,
916 StgTRecHeader *trec) {
917 StgTRecHeader *et;
918 TRACE("%p : stmAbortTransaction", trec);
919 ASSERT(trec != NO_TREC);
920 ASSERT((trec -> state == TREC_ACTIVE) ||
921 (trec -> state == TREC_WAITING) ||
922 (trec -> state == TREC_CONDEMNED));
923
924 lock_stm(trec);
925
926 et = trec -> enclosing_trec;
927 if (et == NO_TREC) {
928 // We're a top-level transaction: remove any watch queue entries that
929 // we may have.
930 TRACE("%p : aborting top-level transaction", trec);
931
932 if (trec -> state == TREC_WAITING) {
933 ASSERT(trec -> enclosing_trec == NO_TREC);
934 TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
935 remove_watch_queue_entries_for_trec(cap, trec);
936 }
937
938 } else {
939 // We're a nested transaction: merge our read set into our parent's
940 TRACE("%p : retaining read-set into parent %p", trec, et);
941
942 FOR_EACH_ENTRY(trec, e, {
943 StgTVar *s = e -> tvar;
944 merge_read_into(cap, et, s, e -> expected_value);
945 });
946 }
947
948 trec -> state = TREC_ABORTED;
949 unlock_stm(trec);
950
951 TRACE("%p : stmAbortTransaction done", trec);
952 }
953
954 /*......................................................................*/
955
956 void stmFreeAbortedTRec(Capability *cap,
957 StgTRecHeader *trec) {
958 TRACE("%p : stmFreeAbortedTRec", trec);
959 ASSERT(trec != NO_TREC);
960 ASSERT((trec -> state == TREC_CONDEMNED) ||
961 (trec -> state == TREC_ABORTED));
962
963 free_stg_trec_header(cap, trec);
964
965 TRACE("%p : stmFreeAbortedTRec done", trec);
966 }
967
968 /*......................................................................*/
969
970 void stmCondemnTransaction(Capability *cap,
971 StgTRecHeader *trec) {
972 TRACE("%p : stmCondemnTransaction", trec);
973 ASSERT(trec != NO_TREC);
974 ASSERT((trec -> state == TREC_ACTIVE) ||
975 (trec -> state == TREC_WAITING) ||
976 (trec -> state == TREC_CONDEMNED));
977
978 lock_stm(trec);
979 if (trec -> state == TREC_WAITING) {
980 ASSERT(trec -> enclosing_trec == NO_TREC);
981 TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
982 remove_watch_queue_entries_for_trec(cap, trec);
983 }
984 trec -> state = TREC_CONDEMNED;
985 unlock_stm(trec);
986
987 TRACE("%p : stmCondemnTransaction done", trec);
988 }
989
990 /*......................................................................*/
991
992 StgBool stmValidateNestOfTransactions(Capability *cap, StgTRecHeader *trec) {
993 StgTRecHeader *t;
994
995 TRACE("%p : stmValidateNestOfTransactions", trec);
996 ASSERT(trec != NO_TREC);
997 ASSERT((trec -> state == TREC_ACTIVE) ||
998 (trec -> state == TREC_WAITING) ||
999 (trec -> state == TREC_CONDEMNED));
1000
1001 lock_stm(trec);
1002
1003 t = trec;
1004 StgBool result = true;
1005 while (t != NO_TREC) {
1006 result &= validate_and_acquire_ownership(cap, t, true, false);
1007 t = t -> enclosing_trec;
1008 }
1009
1010 if (!result && trec -> state != TREC_WAITING) {
1011 trec -> state = TREC_CONDEMNED;
1012 }
1013
1014 unlock_stm(trec);
1015
1016 TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1017 return result;
1018 }
1019
1020 /*......................................................................*/
1021
1022 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1023 TRecEntry *result = NULL;
1024
1025 TRACE("%p : get_entry_for TVar %p", trec, tvar);
1026 ASSERT(trec != NO_TREC);
1027
1028 do {
1029 FOR_EACH_ENTRY(trec, e, {
1030 if (e -> tvar == tvar) {
1031 result = e;
1032 if (in != NULL) {
1033 *in = trec;
1034 }
1035 BREAK_FOR_EACH;
1036 }
1037 });
1038 trec = trec -> enclosing_trec;
1039 } while (result == NULL && trec != NO_TREC);
1040
1041 return result;
1042 }
1043
1044 /*......................................................................*/
1045
1046 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1047 StgInt64 max_commits_at_start = max_commits;
1048
1049 TRACE("%p : stmCommitTransaction()", trec);
1050 ASSERT(trec != NO_TREC);
1051
1052 lock_stm(trec);
1053
1054 ASSERT(trec -> enclosing_trec == NO_TREC);
1055 ASSERT((trec -> state == TREC_ACTIVE) ||
1056 (trec -> state == TREC_CONDEMNED));
1057
1058 // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1059 // the configuration lets us use a read phase.
1060
1061 bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
1062 if (result) {
1063 // We now know that all the updated locations hold their expected values.
1064 ASSERT(trec -> state == TREC_ACTIVE);
1065
1066 if (config_use_read_phase) {
1067 StgInt64 max_commits_at_end;
1068 StgInt64 max_concurrent_commits;
1069 TRACE("%p : doing read check", trec);
1070 result = check_read_only(trec);
1071 TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1072
1073 max_commits_at_end = max_commits;
1074 max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1075 (n_capabilities * TOKEN_BATCH_SIZE));
1076 if (((max_concurrent_commits >> 32) > 0) || shake()) {
1077 result = false;
1078 }
1079 }
1080
1081 if (result) {
1082 // We now know that all of the read-only locations held their expected values
1083 // at the end of the call to validate_and_acquire_ownership. This forms the
1084 // linearization point of the commit.
1085
1086 // Make the updates required by the transaction.
1087 FOR_EACH_ENTRY(trec, e, {
1088 StgTVar *s;
1089 s = e -> tvar;
1090 if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) {
1091 // Either the entry is an update or we're not using a read phase:
1092 // write the value back to the TVar, unlocking it if necessary.
1093
1094 ACQ_ASSERT(tvar_is_locked(s, trec));
1095 TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1096 unpark_waiters_on(cap,s);
1097 IF_STM_FG_LOCKS({
1098 s -> num_updates ++;
1099 });
1100 unlock_tvar(cap, trec, s, e -> new_value, true);
1101 }
1102 ACQ_ASSERT(!tvar_is_locked(s, trec));
1103 });
1104 } else {
1105 revert_ownership(cap, trec, false);
1106 }
1107 }
1108
1109 unlock_stm(trec);
1110
1111 free_stg_trec_header(cap, trec);
1112
1113 TRACE("%p : stmCommitTransaction()=%d", trec, result);
1114
1115 return result;
1116 }
1117
1118 /*......................................................................*/
1119
1120 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1121 StgTRecHeader *et;
1122 ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1123 TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1124 ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1125
1126 lock_stm(trec);
1127
1128 et = trec -> enclosing_trec;
1129 bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
1130 if (result) {
1131 // We now know that all the updated locations hold their expected values.
1132
1133 if (config_use_read_phase) {
1134 TRACE("%p : doing read check", trec);
1135 result = check_read_only(trec);
1136 }
1137 if (result) {
1138 // We now know that all of the read-only locations held their expected values
1139 // at the end of the call to validate_and_acquire_ownership. This forms the
1140 // linearization point of the commit.
1141
1142 TRACE("%p : read-check succeeded", trec);
1143 FOR_EACH_ENTRY(trec, e, {
1144 // Merge each entry into the enclosing transaction record, release all
1145 // locks.
1146
1147 StgTVar *s;
1148 s = e -> tvar;
1149 if (entry_is_update(e)) {
1150 unlock_tvar(cap, trec, s, e -> expected_value, false);
1151 }
1152 merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1153 ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1154 });
1155 } else {
1156 revert_ownership(cap, trec, false);
1157 }
1158 }
1159
1160 unlock_stm(trec);
1161
1162 free_stg_trec_header(cap, trec);
1163
1164 TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1165
1166 return result;
1167 }
1168
1169 /*......................................................................*/
1170
1171 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1172 TRACE("%p : stmWait(%p)", trec, tso);
1173 ASSERT(trec != NO_TREC);
1174 ASSERT(trec -> enclosing_trec == NO_TREC);
1175 ASSERT((trec -> state == TREC_ACTIVE) ||
1176 (trec -> state == TREC_CONDEMNED));
1177
1178 lock_stm(trec);
1179 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1180 if (result) {
1181 // The transaction is valid so far so we can actually start waiting.
1182 // (Otherwise the transaction was not valid and the thread will have to
1183 // retry it).
1184
1185 // Put ourselves to sleep. We retain locks on all the TVars involved
1186 // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1187 // in the TSO, (c) TREC_WAITING in the Trec.
1188 build_watch_queue_entries_for_trec(cap, tso, trec);
1189 park_tso(tso);
1190 trec -> state = TREC_WAITING;
1191
1192 // We haven't released ownership of the transaction yet. The TSO
1193 // has been put on the wait queue for the TVars it is waiting for,
1194 // but we haven't yet tidied up the TSO's stack and made it safe
1195 // to wake up the TSO. Therefore, we must wait until the TSO is
1196 // safe to wake up before we release ownership - when all is well,
1197 // the runtime will call stmWaitUnlock() below, with the same
1198 // TRec.
1199
1200 } else {
1201 unlock_stm(trec);
1202 free_stg_trec_header(cap, trec);
1203 }
1204
1205 TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1206 return result;
1207 }
1208
1209
1210 void
1211 stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
1212 revert_ownership(cap, trec, true);
1213 unlock_stm(trec);
1214 }
1215
1216 /*......................................................................*/
1217
1218 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1219 StgTRecHeader *trec = tso->trec;
1220
1221 TRACE("%p : stmReWait", trec);
1222 ASSERT(trec != NO_TREC);
1223 ASSERT(trec -> enclosing_trec == NO_TREC);
1224 ASSERT((trec -> state == TREC_WAITING) ||
1225 (trec -> state == TREC_CONDEMNED));
1226
1227 lock_stm(trec);
1228 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1229 TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1230 if (result) {
1231 // The transaction remains valid -- do nothing because it is already on
1232 // the wait queues
1233 ASSERT(trec -> state == TREC_WAITING);
1234 park_tso(tso);
1235 revert_ownership(cap, trec, true);
1236 } else {
1237 // The transcation has become invalid. We can now remove it from the wait
1238 // queues.
1239 if (trec -> state != TREC_CONDEMNED) {
1240 remove_watch_queue_entries_for_trec (cap, trec);
1241 }
1242 free_stg_trec_header(cap, trec);
1243 }
1244 unlock_stm(trec);
1245
1246 TRACE("%p : stmReWait()=%d", trec, result);
1247 return result;
1248 }
1249
1250 /*......................................................................*/
1251
1252 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1253 StgClosure *result;
1254 result = tvar -> current_value;
1255
1256 #if defined(STM_FG_LOCKS)
1257 while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1258 TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1259 result = tvar -> current_value;
1260 }
1261 #endif
1262
1263 TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1264 return result;
1265 }
1266
1267 /*......................................................................*/
1268
1269 StgClosure *stmReadTVar(Capability *cap,
1270 StgTRecHeader *trec,
1271 StgTVar *tvar) {
1272 StgTRecHeader *entry_in = NULL;
1273 StgClosure *result = NULL;
1274 TRecEntry *entry = NULL;
1275 TRACE("%p : stmReadTVar(%p)", trec, tvar);
1276 ASSERT(trec != NO_TREC);
1277 ASSERT(trec -> state == TREC_ACTIVE ||
1278 trec -> state == TREC_CONDEMNED);
1279
1280 entry = get_entry_for(trec, tvar, &entry_in);
1281
1282 if (entry != NULL) {
1283 if (entry_in == trec) {
1284 // Entry found in our trec
1285 result = entry -> new_value;
1286 } else {
1287 // Entry found in another trec
1288 TRecEntry *new_entry = get_new_entry(cap, trec);
1289 new_entry -> tvar = tvar;
1290 new_entry -> expected_value = entry -> expected_value;
1291 new_entry -> new_value = entry -> new_value;
1292 result = new_entry -> new_value;
1293 }
1294 } else {
1295 // No entry found
1296 StgClosure *current_value = read_current_value(trec, tvar);
1297 TRecEntry *new_entry = get_new_entry(cap, trec);
1298 new_entry -> tvar = tvar;
1299 new_entry -> expected_value = current_value;
1300 new_entry -> new_value = current_value;
1301 result = current_value;
1302 }
1303
1304 TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1305 return result;
1306 }
1307
1308 /*......................................................................*/
1309
1310 void stmWriteTVar(Capability *cap,
1311 StgTRecHeader *trec,
1312 StgTVar *tvar,
1313 StgClosure *new_value) {
1314
1315 StgTRecHeader *entry_in = NULL;
1316 TRecEntry *entry = NULL;
1317 TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1318 ASSERT(trec != NO_TREC);
1319 ASSERT(trec -> state == TREC_ACTIVE ||
1320 trec -> state == TREC_CONDEMNED);
1321
1322 entry = get_entry_for(trec, tvar, &entry_in);
1323
1324 if (entry != NULL) {
1325 if (entry_in == trec) {
1326 // Entry found in our trec
1327 entry -> new_value = new_value;
1328 } else {
1329 // Entry found in another trec
1330 TRecEntry *new_entry = get_new_entry(cap, trec);
1331 new_entry -> tvar = tvar;
1332 new_entry -> expected_value = entry -> expected_value;
1333 new_entry -> new_value = new_value;
1334 }
1335 } else {
1336 // No entry found
1337 StgClosure *current_value = read_current_value(trec, tvar);
1338 TRecEntry *new_entry = get_new_entry(cap, trec);
1339 new_entry -> tvar = tvar;
1340 new_entry -> expected_value = current_value;
1341 new_entry -> new_value = new_value;
1342 }
1343
1344 TRACE("%p : stmWriteTVar done", trec);
1345 }
1346
1347 /*......................................................................*/