2b40f76e938c466f109c3e6a26f760d392053038
[ghc.git] / rts / Messages.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2010
4 *
5 * Inter-Capability message passing
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17
18 /* ----------------------------------------------------------------------------
19 Send a message to another Capability
20 ------------------------------------------------------------------------- */
21
22 #ifdef THREADED_RTS
23
24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26 ACQUIRE_LOCK(&to_cap->lock);
27
28 #ifdef DEBUG
29 {
30 const StgInfoTable *i = msg->header.info;
31 if (i != &stg_MSG_WAKEUP_info &&
32 i != &stg_MSG_THROWTO_info &&
33 i != &stg_MSG_BLACKHOLE_info &&
34 i != &stg_MSG_TRY_WAKEUP_info &&
35 i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
36 i != &stg_WHITEHOLE_info) {
37 barf("sendMessage: %p", i);
38 }
39 }
40 #endif
41
42 msg->link = to_cap->inbox;
43 to_cap->inbox = msg;
44
45 recordClosureMutated(from_cap,(StgClosure*)msg);
46
47 if (to_cap->running_task == NULL) {
48 to_cap->running_task = myTask();
49 // precond for releaseCapability_()
50 releaseCapability_(to_cap,rtsFalse);
51 } else {
52 contextSwitchCapability(to_cap);
53 }
54
55 RELEASE_LOCK(&to_cap->lock);
56 }
57
58 #endif /* THREADED_RTS */
59
60 /* ----------------------------------------------------------------------------
61 Handle a message
62 ------------------------------------------------------------------------- */
63
64 #ifdef THREADED_RTS
65
66 void
67 executeMessage (Capability *cap, Message *m)
68 {
69 const StgInfoTable *i;
70
71 loop:
72 write_barrier(); // allow m->header to be modified by another thread
73 i = m->header.info;
74 if (i == &stg_MSG_WAKEUP_info)
75 {
76 // the plan is to eventually get rid of these and use
77 // TRY_WAKEUP instead.
78 MessageWakeup *w = (MessageWakeup *)m;
79 StgTSO *tso = w->tso;
80 debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
81 (lnat)tso->id);
82 ASSERT(tso->cap == cap);
83 ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
84 ASSERT(tso->block_info.closure == (StgClosure *)m);
85 tso->why_blocked = NotBlocked;
86 appendToRunQueue(cap, tso);
87 }
88 else if (i == &stg_MSG_TRY_WAKEUP_info)
89 {
90 StgTSO *tso = ((MessageWakeup *)m)->tso;
91 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
92 (lnat)tso->id);
93 tryWakeupThread(cap, tso);
94 }
95 else if (i == &stg_MSG_THROWTO_info)
96 {
97 MessageThrowTo *t = (MessageThrowTo *)m;
98 nat r;
99 const StgInfoTable *i;
100
101 i = lockClosure((StgClosure*)m);
102 if (i != &stg_MSG_THROWTO_info) {
103 unlockClosure((StgClosure*)m, i);
104 goto loop;
105 }
106
107 debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
108 (lnat)t->source->id, (lnat)t->target->id);
109
110 ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
111 ASSERT(t->source->block_info.closure == (StgClosure *)m);
112
113 r = throwToMsg(cap, t);
114
115 switch (r) {
116 case THROWTO_SUCCESS:
117 ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
118 t->source->sp += 3;
119 unblockOne(cap, t->source);
120 // this message is done
121 unlockClosure((StgClosure*)m, &stg_IND_info);
122 break;
123 case THROWTO_BLOCKED:
124 // unlock the message
125 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
126 break;
127 }
128 }
129 else if (i == &stg_MSG_BLACKHOLE_info)
130 {
131 nat r;
132 MessageBlackHole *b = (MessageBlackHole*)m;
133
134 r = messageBlackHole(cap, b);
135 if (r == 0) {
136 tryWakeupThread(cap, b->tso);
137 }
138 return;
139 }
140 else if (i == &stg_IND_info)
141 {
142 // message was revoked
143 return;
144 }
145 else if (i == &stg_WHITEHOLE_info)
146 {
147 goto loop;
148 }
149 else
150 {
151 barf("executeMessage: %p", i);
152 }
153 }
154
155 #endif
156
157 /* ----------------------------------------------------------------------------
158 Handle a MSG_BLACKHOLE message
159
160 This is called from two places: either we just entered a BLACKHOLE
161 (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
162 cap->inbox.
163
164 We need to establish whether the BLACKHOLE belongs to
165 this Capability, and
166 - if so, arrange to block the current thread on it
167 - otherwise, forward the message to the right place
168
169 Returns:
170 - 0 if the blocked thread can be woken up by the caller
171 - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
172 at some point in the future.
173
174 ------------------------------------------------------------------------- */
175
176 nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
177 {
178 const StgInfoTable *info;
179 StgClosure *p;
180 StgBlockingQueue *bq;
181 StgClosure *bh = msg->bh;
182 StgTSO *owner;
183
184 debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
185 (lnat)msg->tso->id, msg->bh);
186
187 info = bh->header.info;
188
189 // If we got this message in our inbox, it might be that the
190 // BLACKHOLE has already been updated, and GC has shorted out the
191 // indirection, so the pointer no longer points to a BLACKHOLE at
192 // all.
193 if (info != &stg_BLACKHOLE_info &&
194 info != &stg_CAF_BLACKHOLE_info &&
195 info != &stg_WHITEHOLE_info) {
196 // if it is a WHITEHOLE, then a thread is in the process of
197 // trying to BLACKHOLE it. But we know that it was once a
198 // BLACKHOLE, so there is at least a valid pointer in the
199 // payload, so we can carry on.
200 return 0;
201 }
202
203 // we know at this point that the closure
204 loop:
205 p = ((StgInd*)bh)->indirectee;
206 info = p->header.info;
207
208 if (info == &stg_IND_info)
209 {
210 // This could happen, if e.g. we got a BLOCKING_QUEUE that has
211 // just been replaced with an IND by another thread in
212 // updateThunk(). In which case, if we read the indirectee
213 // again we should get the value.
214 goto loop;
215 }
216
217 else if (info == &stg_TSO_info)
218 {
219 owner = deRefTSO((StgTSO *)p);
220
221 #ifdef THREADED_RTS
222 if (owner->cap != cap) {
223 sendMessage(cap, owner->cap, (Message*)msg);
224 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
225 return 1;
226 }
227 #endif
228 // owner is the owner of the BLACKHOLE, and resides on this
229 // Capability. msg->tso is the first thread to block on this
230 // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
231
232 bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
233
234 // initialise the BLOCKING_QUEUE object
235 SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
236 bq->bh = bh;
237 bq->queue = msg;
238 bq->owner = owner;
239
240 msg->link = (MessageBlackHole*)END_TSO_QUEUE;
241
242 // All BLOCKING_QUEUES are linked in a list on owner->bq, so
243 // that we can search through them in the event that there is
244 // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
245 // becomes orphaned (see updateThunk()).
246 bq->link = owner->bq;
247 owner->bq = bq;
248 dirty_TSO(cap, owner); // we modified owner->bq
249
250 // point to the BLOCKING_QUEUE from the BLACKHOLE
251 write_barrier(); // make the BQ visible
252 ((StgInd*)bh)->indirectee = (StgClosure *)bq;
253 recordClosureMutated(cap,bh); // bh was mutated
254
255 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
256 (lnat)msg->tso->id, (lnat)owner->id);
257
258 return 1; // blocked
259 }
260 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
261 info == &stg_BLOCKING_QUEUE_DIRTY_info)
262 {
263 StgBlockingQueue *bq = (StgBlockingQueue *)p;
264
265 ASSERT(bq->bh == bh);
266
267 owner = deRefTSO(bq->owner);
268
269 ASSERT(owner != END_TSO_QUEUE);
270
271 #ifdef THREADED_RTS
272 if (owner->cap != cap) {
273 sendMessage(cap, owner->cap, (Message*)msg);
274 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
275 return 1;
276 }
277 #endif
278
279 msg->link = bq->queue;
280 bq->queue = msg;
281 recordClosureMutated(cap,(StgClosure*)msg);
282
283 if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
284 bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
285 recordClosureMutated(cap,bq);
286 }
287
288 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
289 (lnat)msg->tso->id, (lnat)owner->id);
290
291 return 1; // blocked
292 }
293
294 return 0; // not blocked
295 }
296