Change the representation of the MVar blocked queue
[ghc.git] / rts / Messages.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2010
4 *
5 * Inter-Capability message passing
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17
18 /* ----------------------------------------------------------------------------
19 Send a message to another Capability
20 ------------------------------------------------------------------------- */
21
22 #ifdef THREADED_RTS
23
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26 ACQUIRE_LOCK(&to_cap->lock);
27
28 #ifdef DEBUG
29 {
30 const StgInfoTable *i = msg->header.info;
31 if (i != &stg_MSG_THROWTO_info &&
32 i != &stg_MSG_BLACKHOLE_info &&
33 i != &stg_MSG_TRY_WAKEUP_info &&
34 i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
35 i != &stg_WHITEHOLE_info) {
36 barf("sendMessage: %p", i);
37 }
38 }
39 #endif
40
41 msg->link = to_cap->inbox;
42 to_cap->inbox = msg;
43
44 recordClosureMutated(from_cap,(StgClosure*)msg);
45
46 if (to_cap->running_task == NULL) {
47 to_cap->running_task = myTask();
48 // precond for releaseCapability_()
49 releaseCapability_(to_cap,rtsFalse);
50 } else {
51 contextSwitchCapability(to_cap);
52 }
53
54 RELEASE_LOCK(&to_cap->lock);
55 }
56
57 #endif /* THREADED_RTS */
58
59 /* ----------------------------------------------------------------------------
60 Handle a message
61 ------------------------------------------------------------------------- */
62
63 #ifdef THREADED_RTS
64
65 void
66 executeMessage (Capability *cap, Message *m)
67 {
68 const StgInfoTable *i;
69
70 loop:
71 write_barrier(); // allow m->header to be modified by another thread
72 i = m->header.info;
73 if (i == &stg_MSG_TRY_WAKEUP_info)
74 {
75 StgTSO *tso = ((MessageWakeup *)m)->tso;
76 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
77 (lnat)tso->id);
78 tryWakeupThread(cap, tso);
79 }
80 else if (i == &stg_MSG_THROWTO_info)
81 {
82 MessageThrowTo *t = (MessageThrowTo *)m;
83 nat r;
84 const StgInfoTable *i;
85
86 i = lockClosure((StgClosure*)m);
87 if (i != &stg_MSG_THROWTO_info) {
88 unlockClosure((StgClosure*)m, i);
89 goto loop;
90 }
91
92 debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
93 (lnat)t->source->id, (lnat)t->target->id);
94
95 ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
96 ASSERT(t->source->block_info.closure == (StgClosure *)m);
97
98 r = throwToMsg(cap, t);
99
100 switch (r) {
101 case THROWTO_SUCCESS:
102 // this message is done
103 unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
104 tryWakeupThread(cap, t->source);
105 break;
106 case THROWTO_BLOCKED:
107 // unlock the message
108 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
109 break;
110 }
111 }
112 else if (i == &stg_MSG_BLACKHOLE_info)
113 {
114 nat r;
115 MessageBlackHole *b = (MessageBlackHole*)m;
116
117 r = messageBlackHole(cap, b);
118 if (r == 0) {
119 tryWakeupThread(cap, b->tso);
120 }
121 return;
122 }
123 else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
124 {
125 // message was revoked
126 return;
127 }
128 else if (i == &stg_WHITEHOLE_info)
129 {
130 goto loop;
131 }
132 else
133 {
134 barf("executeMessage: %p", i);
135 }
136 }
137
138 #endif
139
140 /* ----------------------------------------------------------------------------
141 Handle a MSG_BLACKHOLE message
142
143 This is called from two places: either we just entered a BLACKHOLE
144 (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
145 cap->inbox.
146
147 We need to establish whether the BLACKHOLE belongs to
148 this Capability, and
149 - if so, arrange to block the current thread on it
150 - otherwise, forward the message to the right place
151
152 Returns:
153 - 0 if the blocked thread can be woken up by the caller
154 - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
155 at some point in the future.
156
157 ------------------------------------------------------------------------- */
158
159 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
160 {
161 const StgInfoTable *info;
162 StgClosure *p;
163 StgBlockingQueue *bq;
164 StgClosure *bh = msg->bh;
165 StgTSO *owner;
166
167 debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
168 (lnat)msg->tso->id, msg->bh);
169
170 info = bh->header.info;
171
172 // If we got this message in our inbox, it might be that the
173 // BLACKHOLE has already been updated, and GC has shorted out the
174 // indirection, so the pointer no longer points to a BLACKHOLE at
175 // all.
176 if (info != &stg_BLACKHOLE_info &&
177 info != &stg_CAF_BLACKHOLE_info &&
178 info != &stg_WHITEHOLE_info) {
179 // if it is a WHITEHOLE, then a thread is in the process of
180 // trying to BLACKHOLE it. But we know that it was once a
181 // BLACKHOLE, so there is at least a valid pointer in the
182 // payload, so we can carry on.
183 return 0;
184 }
185
186 // we know at this point that the closure
187 loop:
188 p = ((StgInd*)bh)->indirectee;
189 info = p->header.info;
190
191 if (info == &stg_IND_info)
192 {
193 // This could happen, if e.g. we got a BLOCKING_QUEUE that has
194 // just been replaced with an IND by another thread in
195 // updateThunk(). In which case, if we read the indirectee
196 // again we should get the value.
197 goto loop;
198 }
199
200 else if (info == &stg_TSO_info)
201 {
202 owner = deRefTSO((StgTSO *)p);
203
204 #ifdef THREADED_RTS
205 if (owner->cap != cap) {
206 sendMessage(cap, owner->cap, (Message*)msg);
207 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
208 return 1;
209 }
210 #endif
211 // owner is the owner of the BLACKHOLE, and resides on this
212 // Capability. msg->tso is the first thread to block on this
213 // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
214
215 bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
216
217 // initialise the BLOCKING_QUEUE object
218 SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
219 bq->bh = bh;
220 bq->queue = msg;
221 bq->owner = owner;
222
223 msg->link = (MessageBlackHole*)END_TSO_QUEUE;
224
225 // All BLOCKING_QUEUES are linked in a list on owner->bq, so
226 // that we can search through them in the event that there is
227 // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
228 // becomes orphaned (see updateThunk()).
229 bq->link = owner->bq;
230 owner->bq = bq;
231 dirty_TSO(cap, owner); // we modified owner->bq
232
233 // If the owner of the blackhole is currently runnable, then
234 // bump it to the front of the run queue. This gives the
235 // blocked-on thread a little boost which should help unblock
236 // this thread, and may avoid a pile-up of other threads
237 // becoming blocked on the same BLACKHOLE (#3838).
238 //
239 // NB. we check to make sure that the owner is not the same as
240 // the current thread, since in that case it will not be on
241 // the run queue.
242 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
243 removeFromRunQueue(cap, owner);
244 pushOnRunQueue(cap,owner);
245 }
246
247 // point to the BLOCKING_QUEUE from the BLACKHOLE
248 write_barrier(); // make the BQ visible
249 ((StgInd*)bh)->indirectee = (StgClosure *)bq;
250 recordClosureMutated(cap,bh); // bh was mutated
251
252 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
253 (lnat)msg->tso->id, (lnat)owner->id);
254
255 return 1; // blocked
256 }
257 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
258 info == &stg_BLOCKING_QUEUE_DIRTY_info)
259 {
260 StgBlockingQueue *bq = (StgBlockingQueue *)p;
261
262 ASSERT(bq->bh == bh);
263
264 owner = deRefTSO(bq->owner);
265
266 ASSERT(owner != END_TSO_QUEUE);
267
268 #ifdef THREADED_RTS
269 if (owner->cap != cap) {
270 sendMessage(cap, owner->cap, (Message*)msg);
271 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
272 return 1;
273 }
274 #endif
275
276 msg->link = bq->queue;
277 bq->queue = msg;
278 recordClosureMutated(cap,(StgClosure*)msg);
279
280 if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
281 bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
282 recordClosureMutated(cap,(StgClosure*)bq);
283 }
284
285 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
286 (lnat)msg->tso->id, (lnat)owner->id);
287
288 // See above, #3838
289 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
290 removeFromRunQueue(cap, owner);
291 pushOnRunQueue(cap,owner);
292 }
293
294 return 1; // blocked
295 }
296
297 return 0; // not blocked
298 }
299