Enable two-step allocator on FreeBSD
[ghc.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2 * (c) The GHC Team 1998-2005
3 *
4 * STM implementation.
5 *
6 * Overview
7 * --------
8 *
9 * See the PPoPP 2005 paper "Composable memory transactions". In summary, each
10 * transaction has a TRec (transaction record) holding entries for each of the
11 * TVars (transactional variables) that it has accessed. Each entry records (a)
12 * the TVar, (b) the expected value seen in the TVar, (c) the new value that the
13 * transaction wants to write to the TVar, (d) during commit, the identity of
14 * the TRec that wrote the expected value.
15 *
16 * Separate TRecs are used for each level in a nest of transactions. This
17 * allows a nested transaction to be aborted without condemning its enclosing
18 * transactions. This is needed in the implementation of catchRetry. Note that
19 * the "expected value" in a nested transaction's TRec is the value expected to
20 * be *held in memory* if the transaction commits -- not the "new value" stored
21 * in one of the enclosing transactions. This means that validation can be done
22 * without searching through a nest of TRecs.
23 *
24 * Concurrency control
25 * -------------------
26 *
27 * Three different concurrency control schemes can be built according to the
28 * settings in STM.h:
29 *
30 * STM_UNIPROC assumes that the caller serialises invocations on the STM
31 * interface. In the Haskell RTS this means it is suitable only for
32 * non-THREADED_RTS builds.
33 *
34 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired
35 * during an invocation on the STM interface. Note that this does not mean that
36 * transactions are simply serialized -- the lock is only held *within* the
37 * implementation of stmCommitTransaction, stmWait etc.
38 *
39 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40 * and, when committing a transaction, no locks are acquired for TVars that have
41 * been read but not updated.
42 *
43 * Concurrency control is implemented in the functions:
44 *
45 * lock_stm
46 * unlock_stm
47 * lock_tvar / cond_lock_tvar
48 * unlock_tvar
49 *
50 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the
51 * implementation of these functions.
52 *
53 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54 * using STM_CG_LOCK, and otherwise they are no-ops.
55 *
56 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they have
57 * other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well as the
58 * actual business of manipulating a lock (present only in STM_FG_LOCKS builds).
59 * This is because locking a TVar is implemented by writing the lock holder's
60 * TRec into the TVar's current_value field:
61 *
62 * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value
63 * it contained.
64 *
65 * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it
66 * contains a specified value. Return true if this succeeds,
67 * false otherwise.
68 *
69 * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70 * storing a specified value in place of the lock entry.
71 *
72 * Using these operations, the typical pattern of a commit/validate/wait
73 * operation is to (a) lock the STM, (b) lock all the TVars being updated, (c)
74 * check that the TVars that were only read from still contain their expected
75 * values, (d) release the locks on the TVars, writing updates to them in the
76 * case of a commit, (e) unlock the STM.
77 *
78 * Queues of waiting threads hang off the first_watch_queue_entry field of each
79 * TVar. This may only be manipulated when holding that TVar's lock. In
80 * particular, when a thread is putting itself to sleep, it mustn't release the
81 * TVar's lock until it has added itself to the wait queue and marked its TSO as
82 * BlockedOnSTM -- this makes sure that other threads will know to wake it.
83 *
84 * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94 #include "sm/Storage.h"
95 #include "SMPClosureOps.h"
96
97 #include <stdio.h>
98
99 // ACQ_ASSERT is used for assertions which are only required for
100 // THREADED_RTS builds with fine-grained locking.
101
102 #if defined(STM_FG_LOCKS)
103 #define ACQ_ASSERT(_X) ASSERT(_X)
104 #define NACQ_ASSERT(_X) /*Nothing*/
105 #else
106 #define ACQ_ASSERT(_X) /*Nothing*/
107 #define NACQ_ASSERT(_X) ASSERT(_X)
108 #endif
109
110 /*......................................................................*/
111
112 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
113
114 // If SHAKE is defined then validation will sometimes spuriously fail. They help test
115 // unusual code paths if genuine contention is rare
116 #if defined(SHAKE)
117 static int shake_ctr = 0;
118 static int shake_lim = 1;
119
120 static int shake(void) {
121 if (((shake_ctr++) % shake_lim) == 0) {
122 shake_ctr = 1;
123 shake_lim ++;
124 return true;
125 }
126 return false;
127 }
128 #else
129 static int shake(void) {
130 return false;
131 }
132 #endif
133
134 /*......................................................................*/
135
136 // Helper macros for iterating over entries within a transaction
137 // record
138
139 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
140 StgTRecHeader *__t = (_t); \
141 StgTRecChunk *__c = __t -> current_chunk; \
142 StgWord __limit = __c -> next_entry_idx; \
143 TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit); \
144 while (__c != END_STM_CHUNK_LIST) { \
145 StgWord __i; \
146 for (__i = 0; __i < __limit; __i ++) { \
147 TRecEntry *_x = &(__c -> entries[__i]); \
148 do { CODE } while (0); \
149 } \
150 __c = __c -> prev_chunk; \
151 __limit = TREC_CHUNK_NUM_ENTRIES; \
152 } \
153 exit_for_each: \
154 if (false) goto exit_for_each; \
155 } while (0)
156
157 #define BREAK_FOR_EACH goto exit_for_each
158
159 /*......................................................................*/
160
161 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
162 // and wait queue entries without GC
163
164 #define REUSE_MEMORY
165
166 /*......................................................................*/
167
168 #define IF_STM_UNIPROC(__X) do { } while (0)
169 #define IF_STM_CG_LOCK(__X) do { } while (0)
170 #define IF_STM_FG_LOCKS(__X) do { } while (0)
171
172 #if defined(STM_UNIPROC)
173 #undef IF_STM_UNIPROC
174 #define IF_STM_UNIPROC(__X) do { __X } while (0)
175 static const StgBool config_use_read_phase = false;
176
177 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
178 TRACE("%p : lock_stm()", trec);
179 }
180
181 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
182 TRACE("%p : unlock_stm()", trec);
183 }
184
185 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
186 StgTVar *s STG_UNUSED) {
187 StgClosure *result;
188 TRACE("%p : lock_tvar(%p)", trec, s);
189 result = s -> current_value;
190 return result;
191 }
192
193 static void unlock_tvar(Capability *cap,
194 StgTRecHeader *trec STG_UNUSED,
195 StgTVar *s,
196 StgClosure *c,
197 StgBool force_update) {
198 TRACE("%p : unlock_tvar(%p)", trec, s);
199 if (force_update) {
200 s -> current_value = c;
201 dirty_TVAR(cap,s);
202 }
203 }
204
205 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
206 StgTVar *s STG_UNUSED,
207 StgClosure *expected) {
208 StgClosure *result;
209 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
210 result = s -> current_value;
211 TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
212 return (result == expected);
213 }
214 #endif
215
216 #if defined(STM_CG_LOCK) /*........................................*/
217
218 #undef IF_STM_CG_LOCK
219 #define IF_STM_CG_LOCK(__X) do { __X } while (0)
220 static const StgBool config_use_read_phase = false;
221 static volatile StgTRecHeader *smp_locked = NULL;
222
223 static void lock_stm(StgTRecHeader *trec) {
224 while (cas(&smp_locked, NULL, trec) != NULL) { }
225 TRACE("%p : lock_stm()", trec);
226 }
227
228 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
229 TRACE("%p : unlock_stm()", trec);
230 ASSERT(smp_locked == trec);
231 smp_locked = 0;
232 }
233
234 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
235 StgTVar *s STG_UNUSED) {
236 StgClosure *result;
237 TRACE("%p : lock_tvar(%p)", trec, s);
238 ASSERT(smp_locked == trec);
239 result = s -> current_value;
240 return result;
241 }
242
243 static void *unlock_tvar(Capability *cap,
244 StgTRecHeader *trec STG_UNUSED,
245 StgTVar *s,
246 StgClosure *c,
247 StgBool force_update) {
248 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
249 ASSERT(smp_locked == trec);
250 if (force_update) {
251 s -> current_value = c;
252 dirty_TVAR(cap,s);
253 }
254 }
255
256 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
257 StgTVar *s STG_UNUSED,
258 StgClosure *expected) {
259 StgClosure *result;
260 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
261 ASSERT(smp_locked == trec);
262 result = s -> current_value;
263 TRACE("%p : %d", result ? "success" : "failure");
264 return (result == expected);
265 }
266 #endif
267
268 #if defined(STM_FG_LOCKS) /*...................................*/
269
270 #undef IF_STM_FG_LOCKS
271 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272 static const StgBool config_use_read_phase = true;
273
274 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
275 TRACE("%p : lock_stm()", trec);
276 }
277
278 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
279 TRACE("%p : unlock_stm()", trec);
280 }
281
282 static StgClosure *lock_tvar(StgTRecHeader *trec,
283 StgTVar *s STG_UNUSED) {
284 StgClosure *result;
285 TRACE("%p : lock_tvar(%p)", trec, s);
286 do {
287 do {
288 result = s -> current_value;
289 } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
290 } while (cas((void *)&(s -> current_value),
291 (StgWord)result, (StgWord)trec) != (StgWord)result);
292 return result;
293 }
294
295 static void unlock_tvar(Capability *cap,
296 StgTRecHeader *trec STG_UNUSED,
297 StgTVar *s,
298 StgClosure *c,
299 StgBool force_update STG_UNUSED) {
300 TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
301 ASSERT(s -> current_value == (StgClosure *)trec);
302 s -> current_value = c;
303 dirty_TVAR(cap,s);
304 }
305
306 static StgBool cond_lock_tvar(StgTRecHeader *trec,
307 StgTVar *s,
308 StgClosure *expected) {
309 StgClosure *result;
310 StgWord w;
311 TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
312 w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
313 result = (StgClosure *)w;
314 TRACE("%p : %s", trec, result ? "success" : "failure");
315 return (result == expected);
316 }
317 #endif
318
319 /*......................................................................*/
320
321 // Helper functions for thread blocking and unblocking
322
323 static void park_tso(StgTSO *tso) {
324 ASSERT(tso -> why_blocked == NotBlocked);
325 tso -> why_blocked = BlockedOnSTM;
326 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
327 TRACE("park_tso on tso=%p", tso);
328 }
329
330 static void unpark_tso(Capability *cap, StgTSO *tso) {
331 // We will continue unparking threads while they remain on one of the wait
332 // queues: it's up to the thread itself to remove it from the wait queues
333 // if it decides to do so when it is scheduled.
334
335 // Only the capability that owns this TSO may unblock it. We can
336 // call tryWakeupThread() which will either unblock it directly if
337 // it belongs to this cap, or send a message to the owning cap
338 // otherwise.
339
340 // But we don't really want to send multiple messages if we write
341 // to the same TVar multiple times, and the owning cap hasn't yet
342 // woken up the thread and removed it from the TVar's watch list.
343 // So, we use the tso->block_info as a flag to indicate whether
344 // we've already done tryWakeupThread() for this thread.
345
346 // Safety Note: we hold the TVar lock at this point, so we know
347 // that this thread is definitely still blocked, since the first
348 // thing a thread will do when it runs is remove itself from the
349 // TVar watch queues, and to do that it would need to lock the
350 // TVar.
351
352 if (tso->block_info.closure != &stg_STM_AWOKEN_closure) {
353 // safe to do a non-atomic test-and-set here, because it's
354 // fine if we do multiple tryWakeupThread()s.
355 tso->block_info.closure = &stg_STM_AWOKEN_closure;
356 tryWakeupThread(cap,tso);
357 }
358 }
359
360 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
361 StgTVarWatchQueue *q;
362 StgTVarWatchQueue *trail;
363 TRACE("unpark_waiters_on tvar=%p", s);
364 // unblock TSOs in reverse order, to be a bit fairer (#2319)
365 for (q = s -> first_watch_queue_entry, trail = q;
366 q != END_STM_WATCH_QUEUE;
367 q = q -> next_queue_entry) {
368 trail = q;
369 }
370 q = trail;
371 for (;
372 q != END_STM_WATCH_QUEUE;
373 q = q -> prev_queue_entry) {
374 unpark_tso(cap, (StgTSO *)(q -> closure));
375 }
376 }
377
378 /*......................................................................*/
379
380 // Helper functions for downstream allocation and initialization
381
382 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
383 StgClosure *closure) {
384 StgTVarWatchQueue *result;
385 result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
386 SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
387 result -> closure = closure;
388 return result;
389 }
390
391 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
392 StgTRecChunk *result;
393 result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
394 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
395 result -> prev_chunk = END_STM_CHUNK_LIST;
396 result -> next_entry_idx = 0;
397 return result;
398 }
399
400 static StgTRecHeader *new_stg_trec_header(Capability *cap,
401 StgTRecHeader *enclosing_trec) {
402 StgTRecHeader *result;
403 result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
404 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
405
406 result -> enclosing_trec = enclosing_trec;
407 result -> current_chunk = new_stg_trec_chunk(cap);
408
409 if (enclosing_trec == NO_TREC) {
410 result -> state = TREC_ACTIVE;
411 } else {
412 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
413 enclosing_trec -> state == TREC_CONDEMNED);
414 result -> state = enclosing_trec -> state;
415 }
416
417 return result;
418 }
419
420 /*......................................................................*/
421
422 // Allocation / deallocation functions that retain per-capability lists
423 // of closures that can be re-used
424
425 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
426 StgClosure *closure) {
427 StgTVarWatchQueue *result = NULL;
428 if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
429 result = new_stg_tvar_watch_queue(cap, closure);
430 } else {
431 result = cap -> free_tvar_watch_queues;
432 result -> closure = closure;
433 cap -> free_tvar_watch_queues = result -> next_queue_entry;
434 }
435 return result;
436 }
437
438 static void free_stg_tvar_watch_queue(Capability *cap,
439 StgTVarWatchQueue *wq) {
440 #if defined(REUSE_MEMORY)
441 wq -> next_queue_entry = cap -> free_tvar_watch_queues;
442 cap -> free_tvar_watch_queues = wq;
443 #endif
444 }
445
446 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
447 StgTRecChunk *result = NULL;
448 if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
449 result = new_stg_trec_chunk(cap);
450 } else {
451 result = cap -> free_trec_chunks;
452 cap -> free_trec_chunks = result -> prev_chunk;
453 result -> prev_chunk = END_STM_CHUNK_LIST;
454 result -> next_entry_idx = 0;
455 }
456 return result;
457 }
458
459 static void free_stg_trec_chunk(Capability *cap,
460 StgTRecChunk *c) {
461 #if defined(REUSE_MEMORY)
462 c -> prev_chunk = cap -> free_trec_chunks;
463 cap -> free_trec_chunks = c;
464 #endif
465 }
466
467 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
468 StgTRecHeader *enclosing_trec) {
469 StgTRecHeader *result = NULL;
470 if (cap -> free_trec_headers == NO_TREC) {
471 result = new_stg_trec_header(cap, enclosing_trec);
472 } else {
473 result = cap -> free_trec_headers;
474 cap -> free_trec_headers = result -> enclosing_trec;
475 result -> enclosing_trec = enclosing_trec;
476 result -> current_chunk -> next_entry_idx = 0;
477 if (enclosing_trec == NO_TREC) {
478 result -> state = TREC_ACTIVE;
479 } else {
480 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
481 enclosing_trec -> state == TREC_CONDEMNED);
482 result -> state = enclosing_trec -> state;
483 }
484 }
485 return result;
486 }
487
488 static void free_stg_trec_header(Capability *cap,
489 StgTRecHeader *trec) {
490 #if defined(REUSE_MEMORY)
491 StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
492 while (chunk != END_STM_CHUNK_LIST) {
493 StgTRecChunk *prev_chunk = chunk -> prev_chunk;
494 free_stg_trec_chunk(cap, chunk);
495 chunk = prev_chunk;
496 }
497 trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
498 trec -> enclosing_trec = cap -> free_trec_headers;
499 cap -> free_trec_headers = trec;
500 #endif
501 }
502
503 /*......................................................................*/
504
505 // Helper functions for managing waiting lists
506
507 static void build_watch_queue_entries_for_trec(Capability *cap,
508 StgTSO *tso,
509 StgTRecHeader *trec) {
510 ASSERT(trec != NO_TREC);
511 ASSERT(trec -> enclosing_trec == NO_TREC);
512 ASSERT(trec -> state == TREC_ACTIVE);
513
514 TRACE("%p : build_watch_queue_entries_for_trec()", trec);
515
516 FOR_EACH_ENTRY(trec, e, {
517 StgTVar *s;
518 StgTVarWatchQueue *q;
519 StgTVarWatchQueue *fq;
520 s = e -> tvar;
521 TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
522 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
523 NACQ_ASSERT(s -> current_value == e -> expected_value);
524 fq = s -> first_watch_queue_entry;
525 q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
526 q -> next_queue_entry = fq;
527 q -> prev_queue_entry = END_STM_WATCH_QUEUE;
528 if (fq != END_STM_WATCH_QUEUE) {
529 fq -> prev_queue_entry = q;
530 }
531 s -> first_watch_queue_entry = q;
532 e -> new_value = (StgClosure *) q;
533 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
534 });
535 }
536
537 static void remove_watch_queue_entries_for_trec(Capability *cap,
538 StgTRecHeader *trec) {
539 ASSERT(trec != NO_TREC);
540 ASSERT(trec -> enclosing_trec == NO_TREC);
541 ASSERT(trec -> state == TREC_WAITING ||
542 trec -> state == TREC_CONDEMNED);
543
544 TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
545
546 FOR_EACH_ENTRY(trec, e, {
547 StgTVar *s;
548 StgTVarWatchQueue *pq;
549 StgTVarWatchQueue *nq;
550 StgTVarWatchQueue *q;
551 StgClosure *saw;
552 s = e -> tvar;
553 saw = lock_tvar(trec, s);
554 q = (StgTVarWatchQueue *) (e -> new_value);
555 TRACE("%p : removing tso=%p from watch queue for tvar=%p",
556 trec,
557 q -> closure,
558 s);
559 ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
560 nq = q -> next_queue_entry;
561 pq = q -> prev_queue_entry;
562 if (nq != END_STM_WATCH_QUEUE) {
563 nq -> prev_queue_entry = pq;
564 }
565 if (pq != END_STM_WATCH_QUEUE) {
566 pq -> next_queue_entry = nq;
567 } else {
568 ASSERT(s -> first_watch_queue_entry == q);
569 s -> first_watch_queue_entry = nq;
570 dirty_TVAR(cap,s); // we modified first_watch_queue_entry
571 }
572 free_stg_tvar_watch_queue(cap, q);
573 unlock_tvar(cap, trec, s, saw, false);
574 });
575 }
576
577 /*......................................................................*/
578
579 static TRecEntry *get_new_entry(Capability *cap,
580 StgTRecHeader *t) {
581 TRecEntry *result;
582 StgTRecChunk *c;
583 int i;
584
585 c = t -> current_chunk;
586 i = c -> next_entry_idx;
587 ASSERT(c != END_STM_CHUNK_LIST);
588
589 if (i < TREC_CHUNK_NUM_ENTRIES) {
590 // Continue to use current chunk
591 result = &(c -> entries[i]);
592 c -> next_entry_idx ++;
593 } else {
594 // Current chunk is full: allocate a fresh one
595 StgTRecChunk *nc;
596 nc = alloc_stg_trec_chunk(cap);
597 nc -> prev_chunk = c;
598 nc -> next_entry_idx = 1;
599 t -> current_chunk = nc;
600 result = &(nc -> entries[0]);
601 }
602
603 return result;
604 }
605
606 /*......................................................................*/
607
608 static void merge_update_into(Capability *cap,
609 StgTRecHeader *t,
610 StgTVar *tvar,
611 StgClosure *expected_value,
612 StgClosure *new_value)
613 {
614 // Look for an entry in this trec
615 bool found = false;
616 FOR_EACH_ENTRY(t, e, {
617 StgTVar *s;
618 s = e -> tvar;
619 if (s == tvar) {
620 found = true;
621 if (e -> expected_value != expected_value) {
622 // Must abort if the two entries start from different values
623 TRACE("%p : update entries inconsistent at %p (%p vs %p)",
624 t, tvar, e -> expected_value, expected_value);
625 t -> state = TREC_CONDEMNED;
626 }
627 e -> new_value = new_value;
628 BREAK_FOR_EACH;
629 }
630 });
631
632 if (!found) {
633 // No entry so far in this trec
634 TRecEntry *ne;
635 ne = get_new_entry(cap, t);
636 ne -> tvar = tvar;
637 ne -> expected_value = expected_value;
638 ne -> new_value = new_value;
639 }
640 }
641
642 /*......................................................................*/
643
644 static void merge_read_into(Capability *cap,
645 StgTRecHeader *trec,
646 StgTVar *tvar,
647 StgClosure *expected_value)
648 {
649 StgTRecHeader *t;
650 bool found = false;
651
652 //
653 // See #7493
654 //
655 // We need to look for an existing entry *anywhere* in the stack of
656 // nested transactions. Otherwise, in stmCommitNestedTransaction()
657 // we can't tell the difference between
658 //
659 // (1) a read-only entry
660 // (2) an entry that writes back the original value
661 //
662 // Since in both cases e->new_value == e->expected_value. But in (1)
663 // we want to do nothing, and in (2) we want to update e->new_value
664 // in the outer transaction.
665 //
666 // Here we deal with the first possibility: we never create a
667 // read-only entry in an inner transaction if there is an existing
668 // outer entry; so we never have an inner read and an outer update.
669 // So then in stmCommitNestedTransaction() we know we can always
670 // write e->new_value over the outer entry, because the inner entry
671 // is the most up to date.
672 //
673 for (t = trec; !found && t != NO_TREC; t = t -> enclosing_trec)
674 {
675 FOR_EACH_ENTRY(t, e, {
676 if (e -> tvar == tvar) {
677 found = true;
678 if (e -> expected_value != expected_value) {
679 // Must abort if the two entries start from different values
680 TRACE("%p : read entries inconsistent at %p (%p vs %p)",
681 t, tvar, e -> expected_value, expected_value);
682 t -> state = TREC_CONDEMNED;
683 }
684 BREAK_FOR_EACH;
685 }
686 });
687 }
688
689 if (!found) {
690 // No entry found
691 TRecEntry *ne;
692 ne = get_new_entry(cap, trec);
693 ne -> tvar = tvar;
694 ne -> expected_value = expected_value;
695 ne -> new_value = expected_value;
696 }
697 }
698
699 /*......................................................................*/
700
701 static StgBool entry_is_update(TRecEntry *e) {
702 StgBool result;
703 result = (e -> expected_value != e -> new_value);
704 return result;
705 }
706
707 #if defined(STM_FG_LOCKS)
708 static StgBool entry_is_read_only(TRecEntry *e) {
709 StgBool result;
710 result = (e -> expected_value == e -> new_value);
711 return result;
712 }
713
714 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
715 StgClosure *c;
716 StgBool result;
717 c = s -> current_value;
718 result = (c == (StgClosure *) h);
719 return result;
720 }
721 #endif
722
723 // revert_ownership : release a lock on a TVar, storing back
724 // the value that it held when the lock was acquired. "revert_all"
725 // is set in stmWait and stmReWait when we acquired locks on all of
726 // the TVars involved. "revert_all" is not set in commit operations
727 // where we don't lock TVars that have been read from but not updated.
728
729 static void revert_ownership(Capability *cap STG_UNUSED,
730 StgTRecHeader *trec STG_UNUSED,
731 StgBool revert_all STG_UNUSED) {
732 #if defined(STM_FG_LOCKS)
733 FOR_EACH_ENTRY(trec, e, {
734 if (revert_all || entry_is_update(e)) {
735 StgTVar *s;
736 s = e -> tvar;
737 if (tvar_is_locked(s, trec)) {
738 unlock_tvar(cap, trec, s, e -> expected_value, true);
739 }
740 }
741 });
742 #endif
743 }
744
745 /*......................................................................*/
746
747 // validate_and_acquire_ownership : this performs the twin functions
748 // of checking that the TVars referred to by entries in trec hold the
749 // expected values and:
750 //
751 // - locking the TVar (on updated TVars during commit, or all TVars
752 // during wait)
753 //
754 // - recording the identity of the TRec who wrote the value seen in the
755 // TVar (on non-updated TVars during commit). These values are
756 // stashed in the TRec entries and are then checked in check_read_only
757 // to ensure that an atomic snapshot of all of these locations has been
758 // seen.
759
760 static StgBool validate_and_acquire_ownership (Capability *cap,
761 StgTRecHeader *trec,
762 int acquire_all,
763 int retain_ownership) {
764 StgBool result;
765
766 if (shake()) {
767 TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
768 return false;
769 }
770
771 ASSERT((trec -> state == TREC_ACTIVE) ||
772 (trec -> state == TREC_WAITING) ||
773 (trec -> state == TREC_CONDEMNED));
774 result = !((trec -> state) == TREC_CONDEMNED);
775 if (result) {
776 FOR_EACH_ENTRY(trec, e, {
777 StgTVar *s;
778 s = e -> tvar;
779 if (acquire_all || entry_is_update(e)) {
780 TRACE("%p : trying to acquire %p", trec, s);
781 if (!cond_lock_tvar(trec, s, e -> expected_value)) {
782 TRACE("%p : failed to acquire %p", trec, s);
783 result = false;
784 BREAK_FOR_EACH;
785 }
786 } else {
787 ASSERT(config_use_read_phase);
788 IF_STM_FG_LOCKS({
789 TRACE("%p : will need to check %p", trec, s);
790 if (s -> current_value != e -> expected_value) {
791 TRACE("%p : doesn't match", trec);
792 result = false;
793 BREAK_FOR_EACH;
794 }
795 e -> num_updates = s -> num_updates;
796 if (s -> current_value != e -> expected_value) {
797 TRACE("%p : doesn't match (race)", trec);
798 result = false;
799 BREAK_FOR_EACH;
800 } else {
801 TRACE("%p : need to check version %ld", trec, e -> num_updates);
802 }
803 });
804 }
805 });
806 }
807
808 if ((!result) || (!retain_ownership)) {
809 revert_ownership(cap, trec, acquire_all);
810 }
811
812 return result;
813 }
814
815 // check_read_only : check that we've seen an atomic snapshot of the
816 // non-updated TVars accessed by a trec. This checks that the last TRec to
817 // commit an update to the TVar is unchanged since the value was stashed in
818 // validate_and_acquire_ownership. If no udpate is seen to any TVar than
819 // all of them contained their expected values at the start of the call to
820 // check_read_only.
821 //
822 // The paper "Concurrent programming without locks" (under submission), or
823 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
824 // this kind of algorithm.
825
826 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
827 StgBool result = true;
828
829 ASSERT(config_use_read_phase);
830 IF_STM_FG_LOCKS({
831 FOR_EACH_ENTRY(trec, e, {
832 StgTVar *s;
833 s = e -> tvar;
834 if (entry_is_read_only(e)) {
835 TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
836
837 // Note we need both checks and in this order as the TVar could be
838 // locked by another transaction that is committing but has not yet
839 // incremented `num_updates` (See #7815).
840 if (s -> current_value != e -> expected_value ||
841 s -> num_updates != e -> num_updates) {
842 TRACE("%p : mismatch", trec);
843 result = false;
844 BREAK_FOR_EACH;
845 }
846 }
847 });
848 });
849
850 return result;
851 }
852
853
854 /************************************************************************/
855
856 void stmPreGCHook (Capability *cap) {
857 lock_stm(NO_TREC);
858 TRACE("stmPreGCHook");
859 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
860 cap->free_trec_chunks = END_STM_CHUNK_LIST;
861 cap->free_trec_headers = NO_TREC;
862 unlock_stm(NO_TREC);
863 }
864
865 /************************************************************************/
866
867 // check_read_only relies on version numbers held in TVars' "num_updates"
868 // fields not wrapping around while a transaction is committed. The version
869 // number is incremented each time an update is committed to the TVar
870 // This is unlikely to wrap around when 32-bit integers are used for the counts,
871 // but to ensure correctness we maintain a shared count on the maximum
872 // number of commit operations that may occur and check that this has
873 // not increased by more than 2^32 during a commit.
874
875 #define TOKEN_BATCH_SIZE 1024
876
877 static volatile StgInt64 max_commits = 0;
878
879 #if defined(THREADED_RTS)
880 static volatile StgWord token_locked = false;
881
882 static void getTokenBatch(Capability *cap) {
883 while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
884 max_commits += TOKEN_BATCH_SIZE;
885 TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
886 cap -> transaction_tokens = TOKEN_BATCH_SIZE;
887 token_locked = false;
888 }
889
890 static void getToken(Capability *cap) {
891 if (cap -> transaction_tokens == 0) {
892 getTokenBatch(cap);
893 }
894 cap -> transaction_tokens --;
895 }
896 #else
897 static void getToken(Capability *cap STG_UNUSED) {
898 // Nothing
899 }
900 #endif
901
902 /*......................................................................*/
903
904 StgTRecHeader *stmStartTransaction(Capability *cap,
905 StgTRecHeader *outer) {
906 StgTRecHeader *t;
907 TRACE("%p : stmStartTransaction with %d tokens",
908 outer,
909 cap -> transaction_tokens);
910
911 getToken(cap);
912
913 t = alloc_stg_trec_header(cap, outer);
914 TRACE("%p : stmStartTransaction()=%p", outer, t);
915 return t;
916 }
917
918 /*......................................................................*/
919
920 void stmAbortTransaction(Capability *cap,
921 StgTRecHeader *trec) {
922 StgTRecHeader *et;
923 TRACE("%p : stmAbortTransaction", trec);
924 ASSERT(trec != NO_TREC);
925 ASSERT((trec -> state == TREC_ACTIVE) ||
926 (trec -> state == TREC_WAITING) ||
927 (trec -> state == TREC_CONDEMNED));
928
929 lock_stm(trec);
930
931 et = trec -> enclosing_trec;
932 if (et == NO_TREC) {
933 // We're a top-level transaction: remove any watch queue entries that
934 // we may have.
935 TRACE("%p : aborting top-level transaction", trec);
936
937 if (trec -> state == TREC_WAITING) {
938 ASSERT(trec -> enclosing_trec == NO_TREC);
939 TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
940 remove_watch_queue_entries_for_trec(cap, trec);
941 }
942
943 } else {
944 // We're a nested transaction: merge our read set into our parent's
945 TRACE("%p : retaining read-set into parent %p", trec, et);
946
947 FOR_EACH_ENTRY(trec, e, {
948 StgTVar *s = e -> tvar;
949 merge_read_into(cap, et, s, e -> expected_value);
950 });
951 }
952
953 trec -> state = TREC_ABORTED;
954 unlock_stm(trec);
955
956 TRACE("%p : stmAbortTransaction done", trec);
957 }
958
959 /*......................................................................*/
960
961 void stmFreeAbortedTRec(Capability *cap,
962 StgTRecHeader *trec) {
963 TRACE("%p : stmFreeAbortedTRec", trec);
964 ASSERT(trec != NO_TREC);
965 ASSERT((trec -> state == TREC_CONDEMNED) ||
966 (trec -> state == TREC_ABORTED));
967
968 free_stg_trec_header(cap, trec);
969
970 TRACE("%p : stmFreeAbortedTRec done", trec);
971 }
972
973 /*......................................................................*/
974
975 void stmCondemnTransaction(Capability *cap,
976 StgTRecHeader *trec) {
977 TRACE("%p : stmCondemnTransaction", trec);
978 ASSERT(trec != NO_TREC);
979 ASSERT((trec -> state == TREC_ACTIVE) ||
980 (trec -> state == TREC_WAITING) ||
981 (trec -> state == TREC_CONDEMNED));
982
983 lock_stm(trec);
984 if (trec -> state == TREC_WAITING) {
985 ASSERT(trec -> enclosing_trec == NO_TREC);
986 TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
987 remove_watch_queue_entries_for_trec(cap, trec);
988 }
989 trec -> state = TREC_CONDEMNED;
990 unlock_stm(trec);
991
992 TRACE("%p : stmCondemnTransaction done", trec);
993 }
994
995 /*......................................................................*/
996
997 StgBool stmValidateNestOfTransactions(Capability *cap, StgTRecHeader *trec) {
998 StgTRecHeader *t;
999
1000 TRACE("%p : stmValidateNestOfTransactions", trec);
1001 ASSERT(trec != NO_TREC);
1002 ASSERT((trec -> state == TREC_ACTIVE) ||
1003 (trec -> state == TREC_WAITING) ||
1004 (trec -> state == TREC_CONDEMNED));
1005
1006 lock_stm(trec);
1007
1008 t = trec;
1009 StgBool result = true;
1010 while (t != NO_TREC) {
1011 result &= validate_and_acquire_ownership(cap, t, true, false);
1012 t = t -> enclosing_trec;
1013 }
1014
1015 if (!result && trec -> state != TREC_WAITING) {
1016 trec -> state = TREC_CONDEMNED;
1017 }
1018
1019 unlock_stm(trec);
1020
1021 TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1022 return result;
1023 }
1024
1025 /*......................................................................*/
1026
1027 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1028 TRecEntry *result = NULL;
1029
1030 TRACE("%p : get_entry_for TVar %p", trec, tvar);
1031 ASSERT(trec != NO_TREC);
1032
1033 do {
1034 FOR_EACH_ENTRY(trec, e, {
1035 if (e -> tvar == tvar) {
1036 result = e;
1037 if (in != NULL) {
1038 *in = trec;
1039 }
1040 BREAK_FOR_EACH;
1041 }
1042 });
1043 trec = trec -> enclosing_trec;
1044 } while (result == NULL && trec != NO_TREC);
1045
1046 return result;
1047 }
1048
1049 /*......................................................................*/
1050
1051 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1052 StgInt64 max_commits_at_start = max_commits;
1053
1054 TRACE("%p : stmCommitTransaction()", trec);
1055 ASSERT(trec != NO_TREC);
1056
1057 lock_stm(trec);
1058
1059 ASSERT(trec -> enclosing_trec == NO_TREC);
1060 ASSERT((trec -> state == TREC_ACTIVE) ||
1061 (trec -> state == TREC_CONDEMNED));
1062
1063 // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1064 // the configuration lets us use a read phase.
1065
1066 bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
1067 if (result) {
1068 // We now know that all the updated locations hold their expected values.
1069 ASSERT(trec -> state == TREC_ACTIVE);
1070
1071 if (config_use_read_phase) {
1072 StgInt64 max_commits_at_end;
1073 StgInt64 max_concurrent_commits;
1074 TRACE("%p : doing read check", trec);
1075 result = check_read_only(trec);
1076 TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1077
1078 max_commits_at_end = max_commits;
1079 max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1080 (n_capabilities * TOKEN_BATCH_SIZE));
1081 if (((max_concurrent_commits >> 32) > 0) || shake()) {
1082 result = false;
1083 }
1084 }
1085
1086 if (result) {
1087 // We now know that all of the read-only locations held their expected values
1088 // at the end of the call to validate_and_acquire_ownership. This forms the
1089 // linearization point of the commit.
1090
1091 // Make the updates required by the transaction.
1092 FOR_EACH_ENTRY(trec, e, {
1093 StgTVar *s;
1094 s = e -> tvar;
1095 if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) {
1096 // Either the entry is an update or we're not using a read phase:
1097 // write the value back to the TVar, unlocking it if necessary.
1098
1099 ACQ_ASSERT(tvar_is_locked(s, trec));
1100 TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1101 unpark_waiters_on(cap,s);
1102 IF_STM_FG_LOCKS({
1103 s -> num_updates ++;
1104 });
1105 unlock_tvar(cap, trec, s, e -> new_value, true);
1106 }
1107 ACQ_ASSERT(!tvar_is_locked(s, trec));
1108 });
1109 } else {
1110 revert_ownership(cap, trec, false);
1111 }
1112 }
1113
1114 unlock_stm(trec);
1115
1116 free_stg_trec_header(cap, trec);
1117
1118 TRACE("%p : stmCommitTransaction()=%d", trec, result);
1119
1120 return result;
1121 }
1122
1123 /*......................................................................*/
1124
1125 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1126 StgTRecHeader *et;
1127 ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1128 TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1129 ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1130
1131 lock_stm(trec);
1132
1133 et = trec -> enclosing_trec;
1134 bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
1135 if (result) {
1136 // We now know that all the updated locations hold their expected values.
1137
1138 if (config_use_read_phase) {
1139 TRACE("%p : doing read check", trec);
1140 result = check_read_only(trec);
1141 }
1142 if (result) {
1143 // We now know that all of the read-only locations held their expected values
1144 // at the end of the call to validate_and_acquire_ownership. This forms the
1145 // linearization point of the commit.
1146
1147 TRACE("%p : read-check succeeded", trec);
1148 FOR_EACH_ENTRY(trec, e, {
1149 // Merge each entry into the enclosing transaction record, release all
1150 // locks.
1151
1152 StgTVar *s;
1153 s = e -> tvar;
1154 if (entry_is_update(e)) {
1155 unlock_tvar(cap, trec, s, e -> expected_value, false);
1156 }
1157 merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1158 ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1159 });
1160 } else {
1161 revert_ownership(cap, trec, false);
1162 }
1163 }
1164
1165 unlock_stm(trec);
1166
1167 free_stg_trec_header(cap, trec);
1168
1169 TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1170
1171 return result;
1172 }
1173
1174 /*......................................................................*/
1175
1176 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1177 TRACE("%p : stmWait(%p)", trec, tso);
1178 ASSERT(trec != NO_TREC);
1179 ASSERT(trec -> enclosing_trec == NO_TREC);
1180 ASSERT((trec -> state == TREC_ACTIVE) ||
1181 (trec -> state == TREC_CONDEMNED));
1182
1183 lock_stm(trec);
1184 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1185 if (result) {
1186 // The transaction is valid so far so we can actually start waiting.
1187 // (Otherwise the transaction was not valid and the thread will have to
1188 // retry it).
1189
1190 // Put ourselves to sleep. We retain locks on all the TVars involved
1191 // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1192 // in the TSO, (c) TREC_WAITING in the Trec.
1193 build_watch_queue_entries_for_trec(cap, tso, trec);
1194 park_tso(tso);
1195 trec -> state = TREC_WAITING;
1196
1197 // We haven't released ownership of the transaction yet. The TSO
1198 // has been put on the wait queue for the TVars it is waiting for,
1199 // but we haven't yet tidied up the TSO's stack and made it safe
1200 // to wake up the TSO. Therefore, we must wait until the TSO is
1201 // safe to wake up before we release ownership - when all is well,
1202 // the runtime will call stmWaitUnlock() below, with the same
1203 // TRec.
1204
1205 } else {
1206 unlock_stm(trec);
1207 free_stg_trec_header(cap, trec);
1208 }
1209
1210 TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1211 return result;
1212 }
1213
1214
1215 void
1216 stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
1217 revert_ownership(cap, trec, true);
1218 unlock_stm(trec);
1219 }
1220
1221 /*......................................................................*/
1222
1223 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1224 StgTRecHeader *trec = tso->trec;
1225
1226 TRACE("%p : stmReWait", trec);
1227 ASSERT(trec != NO_TREC);
1228 ASSERT(trec -> enclosing_trec == NO_TREC);
1229 ASSERT((trec -> state == TREC_WAITING) ||
1230 (trec -> state == TREC_CONDEMNED));
1231
1232 lock_stm(trec);
1233 bool result = validate_and_acquire_ownership(cap, trec, true, true);
1234 TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1235 if (result) {
1236 // The transaction remains valid -- do nothing because it is already on
1237 // the wait queues
1238 ASSERT(trec -> state == TREC_WAITING);
1239 park_tso(tso);
1240 revert_ownership(cap, trec, true);
1241 } else {
1242 // The transcation has become invalid. We can now remove it from the wait
1243 // queues.
1244 if (trec -> state != TREC_CONDEMNED) {
1245 remove_watch_queue_entries_for_trec (cap, trec);
1246 }
1247 free_stg_trec_header(cap, trec);
1248 }
1249 unlock_stm(trec);
1250
1251 TRACE("%p : stmReWait()=%d", trec, result);
1252 return result;
1253 }
1254
1255 /*......................................................................*/
1256
1257 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1258 StgClosure *result;
1259 result = tvar -> current_value;
1260
1261 #if defined(STM_FG_LOCKS)
1262 while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1263 TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1264 result = tvar -> current_value;
1265 }
1266 #endif
1267
1268 TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1269 return result;
1270 }
1271
1272 /*......................................................................*/
1273
1274 StgClosure *stmReadTVar(Capability *cap,
1275 StgTRecHeader *trec,
1276 StgTVar *tvar) {
1277 StgTRecHeader *entry_in = NULL;
1278 StgClosure *result = NULL;
1279 TRecEntry *entry = NULL;
1280 TRACE("%p : stmReadTVar(%p)", trec, tvar);
1281 ASSERT(trec != NO_TREC);
1282 ASSERT(trec -> state == TREC_ACTIVE ||
1283 trec -> state == TREC_CONDEMNED);
1284
1285 entry = get_entry_for(trec, tvar, &entry_in);
1286
1287 if (entry != NULL) {
1288 if (entry_in == trec) {
1289 // Entry found in our trec
1290 result = entry -> new_value;
1291 } else {
1292 // Entry found in another trec
1293 TRecEntry *new_entry = get_new_entry(cap, trec);
1294 new_entry -> tvar = tvar;
1295 new_entry -> expected_value = entry -> expected_value;
1296 new_entry -> new_value = entry -> new_value;
1297 result = new_entry -> new_value;
1298 }
1299 } else {
1300 // No entry found
1301 StgClosure *current_value = read_current_value(trec, tvar);
1302 TRecEntry *new_entry = get_new_entry(cap, trec);
1303 new_entry -> tvar = tvar;
1304 new_entry -> expected_value = current_value;
1305 new_entry -> new_value = current_value;
1306 result = current_value;
1307 }
1308
1309 TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1310 return result;
1311 }
1312
1313 /*......................................................................*/
1314
1315 void stmWriteTVar(Capability *cap,
1316 StgTRecHeader *trec,
1317 StgTVar *tvar,
1318 StgClosure *new_value) {
1319
1320 StgTRecHeader *entry_in = NULL;
1321 TRecEntry *entry = NULL;
1322 TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1323 ASSERT(trec != NO_TREC);
1324 ASSERT(trec -> state == TREC_ACTIVE ||
1325 trec -> state == TREC_CONDEMNED);
1326
1327 entry = get_entry_for(trec, tvar, &entry_in);
1328
1329 if (entry != NULL) {
1330 if (entry_in == trec) {
1331 // Entry found in our trec
1332 entry -> new_value = new_value;
1333 } else {
1334 // Entry found in another trec
1335 TRecEntry *new_entry = get_new_entry(cap, trec);
1336 new_entry -> tvar = tvar;
1337 new_entry -> expected_value = entry -> expected_value;
1338 new_entry -> new_value = new_value;
1339 }
1340 } else {
1341 // No entry found
1342 StgClosure *current_value = read_current_value(trec, tvar);
1343 TRecEntry *new_entry = get_new_entry(cap, trec);
1344 new_entry -> tvar = tvar;
1345 new_entry -> expected_value = current_value;
1346 new_entry -> new_value = new_value;
1347 }
1348
1349 TRACE("%p : stmWriteTVar done", trec);
1350 }
1351
1352 /*......................................................................*/