Implement atomicReadMVar, fixing #4001.
[ghc.git] / rts / RaiseAsync.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25
26 static StgTSO* raiseAsync (Capability *cap,
27 StgTSO *tso,
28 StgClosure *exception,
29 rtsBool stop_at_atomically,
30 StgUpdateFrame *stop_here);
31
32 static void removeFromQueues(Capability *cap, StgTSO *tso);
33
34 static void removeFromMVarBlockedQueue (StgTSO *tso);
35
36 static void blockedThrowTo (Capability *cap,
37 StgTSO *target, MessageThrowTo *msg);
38
39 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
40 Capability *target_cap USED_IF_THREADS,
41 MessageThrowTo *msg USED_IF_THREADS);
42
43 /* -----------------------------------------------------------------------------
44 throwToSingleThreaded
45
46 This version of throwTo is safe to use if and only if one of the
47 following holds:
48
49 - !THREADED_RTS
50
51 - all the other threads in the system are stopped (eg. during GC).
52
53 - we surely own the target TSO (eg. we just took it from the
54 run queue of the current capability, or we are running it).
55
56 It doesn't cater for blocking the source thread until the exception
57 has been raised.
58 -------------------------------------------------------------------------- */
59
60 static void
61 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
62 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
63 {
64 // Thread already dead?
65 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
66 return;
67 }
68
69 // Remove it from any blocking queues
70 removeFromQueues(cap,tso);
71
72 raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
73 }
74
75 void
76 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
77 {
78 throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL);
79 }
80
81 void
82 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
83 rtsBool stop_at_atomically)
84 {
85 throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
86 }
87
88 void // cannot return a different TSO
89 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
90 {
91 throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here);
92 }
93
94 /* -----------------------------------------------------------------------------
95 throwTo
96
97 This function may be used to throw an exception from one thread to
98 another, during the course of normal execution. This is a tricky
99 task: the target thread might be running on another CPU, or it
100 may be blocked and could be woken up at any point by another CPU.
101 We have some delicate synchronisation to do.
102
103 The underlying scheme when multiple Capabilities are in use is
104 message passing: when the target of a throwTo is on another
105 Capability, we send a message (a MessageThrowTo closure) to that
106 Capability.
107
108 If the throwTo needs to block because the target TSO is masking
109 exceptions (the TSO_BLOCKEX flag), then the message is placed on
110 the blocked_exceptions queue attached to the target TSO. When the
111 target TSO enters the unmasked state again, it must check the
112 queue. The blocked_exceptions queue is not locked; only the
113 Capability owning the TSO may modify it.
114
115 To make things simpler for throwTo, we always create the message
116 first before deciding what to do. The message may get sent, or it
117 may get attached to a TSO's blocked_exceptions queue, or the
118 exception may get thrown immediately and the message dropped,
119 depending on the current state of the target.
120
121 Currently we send a message if the target belongs to another
122 Capability, and it is
123
124 - NotBlocked, BlockedOnMsgThrowTo,
125 BlockedOnCCall_Interruptible
126
127 - or it is masking exceptions (TSO_BLOCKEX)
128
129 Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
130 BlockedOnBlackHole then we acquire ownership of the TSO by locking
131 its parent container (e.g. the MVar) and then raise the exception.
132 We might change these cases to be more message-passing-like in the
133 future.
134
135 Returns:
136
137 NULL exception was raised, ok to continue
138
139 MessageThrowTo * exception was not raised; the source TSO
140 should now put itself in the state
141 BlockedOnMsgThrowTo, and when it is ready
142 it should unlock the mssage using
143 unlockClosure(msg, &stg_MSG_THROWTO_info);
144 If it decides not to raise the exception after
145 all, it can revoke it safely with
146 unlockClosure(msg, &stg_MSG_NULL_info);
147
148 -------------------------------------------------------------------------- */
149
150 MessageThrowTo *
151 throwTo (Capability *cap, // the Capability we hold
152 StgTSO *source, // the TSO sending the exception (or NULL)
153 StgTSO *target, // the TSO receiving the exception
154 StgClosure *exception) // the exception closure
155 {
156 MessageThrowTo *msg;
157
158 msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
159 // the message starts locked; see below
160 SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
161 msg->source = source;
162 msg->target = target;
163 msg->exception = exception;
164
165 switch (throwToMsg(cap, msg))
166 {
167 case THROWTO_SUCCESS:
168 // unlock the message now, otherwise we leave a WHITEHOLE in
169 // the heap (#6103)
170 SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
171 return NULL;
172
173 case THROWTO_BLOCKED:
174 default:
175 // the caller will unlock the message when it is ready. We
176 // cannot unlock it yet, because the calling thread will need
177 // to tidy up its state first.
178 return msg;
179 }
180 }
181
182
183 nat
184 throwToMsg (Capability *cap, MessageThrowTo *msg)
185 {
186 StgWord status;
187 StgTSO *target = msg->target;
188 Capability *target_cap;
189
190 goto check_target;
191
192 retry:
193 write_barrier();
194 debugTrace(DEBUG_sched, "throwTo: retrying...");
195
196 check_target:
197 ASSERT(target != END_TSO_QUEUE);
198
199 // Thread already dead?
200 if (target->what_next == ThreadComplete
201 || target->what_next == ThreadKilled) {
202 return THROWTO_SUCCESS;
203 }
204
205 debugTraceCap(DEBUG_sched, cap,
206 "throwTo: from thread %lu to thread %lu",
207 (unsigned long)msg->source->id,
208 (unsigned long)msg->target->id);
209
210 #ifdef DEBUG
211 traceThreadStatus(DEBUG_sched, target);
212 #endif
213
214 target_cap = target->cap;
215 if (target->cap != cap) {
216 throwToSendMsg(cap, target_cap, msg);
217 return THROWTO_BLOCKED;
218 }
219
220 status = target->why_blocked;
221
222 switch (status) {
223 case NotBlocked:
224 {
225 if ((target->flags & TSO_BLOCKEX) == 0) {
226 // It's on our run queue and not blocking exceptions
227 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
228 return THROWTO_SUCCESS;
229 } else {
230 blockedThrowTo(cap,target,msg);
231 return THROWTO_BLOCKED;
232 }
233 }
234
235 case BlockedOnMsgThrowTo:
236 {
237 const StgInfoTable *i;
238 MessageThrowTo *m;
239
240 m = target->block_info.throwto;
241
242 // target is local to this cap, but has sent a throwto
243 // message to another cap.
244 //
245 // The source message is locked. We need to revoke the
246 // target's message so that we can raise the exception, so
247 // we attempt to lock it.
248
249 // There's a possibility of a deadlock if two threads are both
250 // trying to throwTo each other (or more generally, a cycle of
251 // threads). To break the symmetry we compare the addresses
252 // of the MessageThrowTo objects, and the one for which m <
253 // msg gets to spin, while the other can only try to lock
254 // once, but must then back off and unlock both before trying
255 // again.
256 if (m < msg) {
257 i = lockClosure((StgClosure *)m);
258 } else {
259 i = tryLockClosure((StgClosure *)m);
260 if (i == NULL) {
261 // debugBelch("collision\n");
262 throwToSendMsg(cap, target->cap, msg);
263 return THROWTO_BLOCKED;
264 }
265 }
266
267 if (i == &stg_MSG_NULL_info) {
268 // we know there's a MSG_TRY_WAKEUP on the way, so we
269 // might as well just do it now. The message will
270 // be a no-op when it arrives.
271 unlockClosure((StgClosure*)m, i);
272 tryWakeupThread(cap, target);
273 goto retry;
274 }
275
276 if (i != &stg_MSG_THROWTO_info) {
277 // if it's a MSG_NULL, this TSO has been woken up by another Cap
278 unlockClosure((StgClosure*)m, i);
279 goto retry;
280 }
281
282 if ((target->flags & TSO_BLOCKEX) &&
283 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
284 unlockClosure((StgClosure*)m, i);
285 blockedThrowTo(cap,target,msg);
286 return THROWTO_BLOCKED;
287 }
288
289 // nobody else can wake up this TSO after we claim the message
290 doneWithMsgThrowTo(m);
291
292 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
293 return THROWTO_SUCCESS;
294 }
295
296 case BlockedOnMVar:
297 case BlockedOnMVarRead:
298 {
299 /*
300 To establish ownership of this TSO, we need to acquire a
301 lock on the MVar that it is blocked on.
302 */
303 StgMVar *mvar;
304 StgInfoTable *info USED_IF_THREADS;
305
306 mvar = (StgMVar *)target->block_info.closure;
307
308 // ASSUMPTION: tso->block_info must always point to a
309 // closure. In the threaded RTS it does.
310 switch (get_itbl((StgClosure *)mvar)->type) {
311 case MVAR_CLEAN:
312 case MVAR_DIRTY:
313 break;
314 default:
315 goto retry;
316 }
317
318 info = lockClosure((StgClosure *)mvar);
319
320 // we have the MVar, let's check whether the thread
321 // is still blocked on the same MVar.
322 if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
323 || (StgMVar *)target->block_info.closure != mvar) {
324 unlockClosure((StgClosure *)mvar, info);
325 goto retry;
326 }
327
328 if (target->_link == END_TSO_QUEUE) {
329 // the MVar operation has already completed. There is a
330 // MSG_TRY_WAKEUP on the way, but we can just wake up the
331 // thread now anyway and ignore the message when it
332 // arrives.
333 unlockClosure((StgClosure *)mvar, info);
334 tryWakeupThread(cap, target);
335 goto retry;
336 }
337
338 if ((target->flags & TSO_BLOCKEX) &&
339 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
340 blockedThrowTo(cap,target,msg);
341 unlockClosure((StgClosure *)mvar, info);
342 return THROWTO_BLOCKED;
343 } else {
344 // revoke the MVar operation
345 removeFromMVarBlockedQueue(target);
346 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
347 unlockClosure((StgClosure *)mvar, info);
348 return THROWTO_SUCCESS;
349 }
350 }
351
352 case BlockedOnBlackHole:
353 {
354 if (target->flags & TSO_BLOCKEX) {
355 // BlockedOnBlackHole is not interruptible.
356 blockedThrowTo(cap,target,msg);
357 return THROWTO_BLOCKED;
358 } else {
359 // Revoke the message by replacing it with IND. We're not
360 // locking anything here, so we might still get a TRY_WAKEUP
361 // message from the owner of the blackhole some time in the
362 // future, but that doesn't matter.
363 ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
364 OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
365 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
366 return THROWTO_SUCCESS;
367 }
368 }
369
370 case BlockedOnSTM:
371 lockTSO(target);
372 // Unblocking BlockedOnSTM threads requires the TSO to be
373 // locked; see STM.c:unpark_tso().
374 if (target->why_blocked != BlockedOnSTM) {
375 unlockTSO(target);
376 goto retry;
377 }
378 if ((target->flags & TSO_BLOCKEX) &&
379 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
380 blockedThrowTo(cap,target,msg);
381 unlockTSO(target);
382 return THROWTO_BLOCKED;
383 } else {
384 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
385 unlockTSO(target);
386 return THROWTO_SUCCESS;
387 }
388
389 case BlockedOnCCall_Interruptible:
390 #ifdef THREADED_RTS
391 {
392 Task *task = NULL;
393 // walk suspended_ccalls to find the correct worker thread
394 InCall *incall;
395 for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
396 if (incall->suspended_tso == target) {
397 task = incall->task;
398 break;
399 }
400 }
401 if (task != NULL) {
402 blockedThrowTo(cap, target, msg);
403 if (!((target->flags & TSO_BLOCKEX) &&
404 ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
405 interruptWorkerTask(task);
406 }
407 return THROWTO_BLOCKED;
408 } else {
409 debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
410 }
411 // fall to next
412 }
413 #endif
414 case BlockedOnCCall:
415 blockedThrowTo(cap,target,msg);
416 return THROWTO_BLOCKED;
417
418 #ifndef THREADEDED_RTS
419 case BlockedOnRead:
420 case BlockedOnWrite:
421 case BlockedOnDelay:
422 #if defined(mingw32_HOST_OS)
423 case BlockedOnDoProc:
424 #endif
425 if ((target->flags & TSO_BLOCKEX) &&
426 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
427 blockedThrowTo(cap,target,msg);
428 return THROWTO_BLOCKED;
429 } else {
430 removeFromQueues(cap,target);
431 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
432 return THROWTO_SUCCESS;
433 }
434 #endif
435
436 case ThreadMigrating:
437 // if is is ThreadMigrating and tso->cap is ours, then it
438 // *must* be migrating *to* this capability. If it were
439 // migrating away from the capability, then tso->cap would
440 // point to the destination.
441 //
442 // There is a MSG_WAKEUP in the message queue for this thread,
443 // but we can just do it preemptively:
444 tryWakeupThread(cap, target);
445 // and now retry, the thread should be runnable.
446 goto retry;
447
448 default:
449 barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
450 }
451 barf("throwTo");
452 }
453
454 static void
455 throwToSendMsg (Capability *cap STG_UNUSED,
456 Capability *target_cap USED_IF_THREADS,
457 MessageThrowTo *msg USED_IF_THREADS)
458
459 {
460 #ifdef THREADED_RTS
461 debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
462
463 sendMessage(cap, target_cap, (Message*)msg);
464 #endif
465 }
466
467 // Block a throwTo message on the target TSO's blocked_exceptions
468 // queue. The current Capability must own the target TSO in order to
469 // modify the blocked_exceptions queue.
470 static void
471 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
472 {
473 debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
474 (unsigned long)target->id);
475
476 ASSERT(target->cap == cap);
477
478 msg->link = target->blocked_exceptions;
479 target->blocked_exceptions = msg;
480 dirty_TSO(cap,target); // we modified the blocked_exceptions queue
481 }
482
483 /* -----------------------------------------------------------------------------
484 Waking up threads blocked in throwTo
485
486 There are two ways to do this: maybePerformBlockedException() will
487 perform the throwTo() for the thread at the head of the queue
488 immediately, and leave the other threads on the queue.
489 maybePerformBlockedException() also checks the TSO_BLOCKEX flag
490 before raising an exception.
491
492 awakenBlockedExceptionQueue() will wake up all the threads in the
493 queue, but not perform any throwTo() immediately. This might be
494 more appropriate when the target thread is the one actually running
495 (see Exception.cmm).
496
497 Returns: non-zero if an exception was raised, zero otherwise.
498 -------------------------------------------------------------------------- */
499
500 int
501 maybePerformBlockedException (Capability *cap, StgTSO *tso)
502 {
503 MessageThrowTo *msg;
504 const StgInfoTable *i;
505 StgTSO *source;
506
507 if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
508 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
509 awakenBlockedExceptionQueue(cap,tso);
510 return 1;
511 } else {
512 return 0;
513 }
514 }
515
516 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
517 (tso->flags & TSO_BLOCKEX) != 0) {
518 debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
519 }
520
521 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
522 && ((tso->flags & TSO_BLOCKEX) == 0
523 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
524
525 // We unblock just the first thread on the queue, and perform
526 // its throw immediately.
527 loop:
528 msg = tso->blocked_exceptions;
529 if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
530 i = lockClosure((StgClosure*)msg);
531 tso->blocked_exceptions = (MessageThrowTo*)msg->link;
532 if (i == &stg_MSG_NULL_info) {
533 unlockClosure((StgClosure*)msg,i);
534 goto loop;
535 }
536
537 throwToSingleThreaded(cap, msg->target, msg->exception);
538 source = msg->source;
539 doneWithMsgThrowTo(msg);
540 tryWakeupThread(cap, source);
541 return 1;
542 }
543 return 0;
544 }
545
546 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
547 // blocked exceptions.
548
549 void
550 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
551 {
552 MessageThrowTo *msg;
553 const StgInfoTable *i;
554 StgTSO *source;
555
556 for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
557 msg = (MessageThrowTo*)msg->link) {
558 i = lockClosure((StgClosure *)msg);
559 if (i != &stg_MSG_NULL_info) {
560 source = msg->source;
561 doneWithMsgThrowTo(msg);
562 tryWakeupThread(cap, source);
563 } else {
564 unlockClosure((StgClosure *)msg,i);
565 }
566 }
567 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
568 }
569
570 /* -----------------------------------------------------------------------------
571 Remove a thread from blocking queues.
572
573 This is for use when we raise an exception in another thread, which
574 may be blocked.
575
576 Precondition: we have exclusive access to the TSO, via the same set
577 of conditions as throwToSingleThreaded() (c.f.).
578 -------------------------------------------------------------------------- */
579
580 static void
581 removeFromMVarBlockedQueue (StgTSO *tso)
582 {
583 StgMVar *mvar = (StgMVar*)tso->block_info.closure;
584 StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
585
586 if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
587 // already removed from this MVar
588 return;
589 }
590
591 // Assume the MVar is locked. (not assertable; sometimes it isn't
592 // actually WHITEHOLE'd).
593
594 // We want to remove the MVAR_TSO_QUEUE object from the queue. It
595 // isn't doubly-linked so we can't actually remove it; instead we
596 // just overwrite it with an IND if possible and let the GC short
597 // it out. However, we have to be careful to maintain the deque
598 // structure:
599
600 if (mvar->head == q) {
601 mvar->head = q->link;
602 OVERWRITE_INFO(q, &stg_IND_info);
603 if (mvar->tail == q) {
604 mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
605 }
606 }
607 else if (mvar->tail == q) {
608 // we can't replace it with an IND in this case, because then
609 // we lose the tail pointer when the GC shorts out the IND.
610 // So we use MSG_NULL as a kind of non-dupable indirection;
611 // these are ignored by takeMVar/putMVar.
612 OVERWRITE_INFO(q, &stg_MSG_NULL_info);
613 }
614 else {
615 OVERWRITE_INFO(q, &stg_IND_info);
616 }
617
618 // revoke the MVar operation
619 tso->_link = END_TSO_QUEUE;
620 }
621
622 static void
623 removeFromQueues(Capability *cap, StgTSO *tso)
624 {
625 switch (tso->why_blocked) {
626
627 case NotBlocked:
628 case ThreadMigrating:
629 return;
630
631 case BlockedOnSTM:
632 // Be careful: nothing to do here! We tell the scheduler that the
633 // thread is runnable and we leave it to the stack-walking code to
634 // abort the transaction while unwinding the stack. We should
635 // perhaps have a debugging test to make sure that this really
636 // happens and that the 'zombie' transaction does not get
637 // committed.
638 goto done;
639
640 case BlockedOnMVar:
641 case BlockedOnMVarRead:
642 removeFromMVarBlockedQueue(tso);
643 goto done;
644
645 case BlockedOnBlackHole:
646 // nothing to do
647 goto done;
648
649 case BlockedOnMsgThrowTo:
650 {
651 MessageThrowTo *m = tso->block_info.throwto;
652 // The message is locked by us, unless we got here via
653 // deleteAllThreads(), in which case we own all the
654 // capabilities.
655 // ASSERT(m->header.info == &stg_WHITEHOLE_info);
656
657 // unlock and revoke it at the same time
658 doneWithMsgThrowTo(m);
659 break;
660 }
661
662 #if !defined(THREADED_RTS)
663 case BlockedOnRead:
664 case BlockedOnWrite:
665 #if defined(mingw32_HOST_OS)
666 case BlockedOnDoProc:
667 #endif
668 removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
669 #if defined(mingw32_HOST_OS)
670 /* (Cooperatively) signal that the worker thread should abort
671 * the request.
672 */
673 abandonWorkRequest(tso->block_info.async_result->reqID);
674 #endif
675 goto done;
676
677 case BlockedOnDelay:
678 removeThreadFromQueue(cap, &sleeping_queue, tso);
679 goto done;
680 #endif
681
682 default:
683 barf("removeFromQueues: %d", tso->why_blocked);
684 }
685
686 done:
687 tso->why_blocked = NotBlocked;
688 appendToRunQueue(cap, tso);
689 }
690
691 /* -----------------------------------------------------------------------------
692 * raiseAsync()
693 *
694 * The following function implements the magic for raising an
695 * asynchronous exception in an existing thread.
696 *
697 * We first remove the thread from any queue on which it might be
698 * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
699 * TSO blocked_exception queues.
700 *
701 * We strip the stack down to the innermost CATCH_FRAME, building
702 * thunks in the heap for all the active computations, so they can
703 * be restarted if necessary. When we reach a CATCH_FRAME, we build
704 * an application of the handler to the exception, and push it on
705 * the top of the stack.
706 *
707 * How exactly do we save all the active computations? We create an
708 * AP_STACK for every UpdateFrame on the stack. Entering one of these
709 * AP_STACKs pushes everything from the corresponding update frame
710 * upwards onto the stack. (Actually, it pushes everything up to the
711 * next update frame plus a pointer to the next AP_STACK object.
712 * Entering the next AP_STACK object pushes more onto the stack until we
713 * reach the last AP_STACK object - at which point the stack should look
714 * exactly as it did when we killed the TSO and we can continue
715 * execution by entering the closure on top of the stack.
716 *
717 * We can also kill a thread entirely - this happens if either (a) the
718 * exception passed to raiseAsync is NULL, or (b) there's no
719 * CATCH_FRAME on the stack. In either case, we strip the entire
720 * stack and replace the thread with a zombie.
721 *
722 * ToDo: in THREADED_RTS mode, this function is only safe if either
723 * (a) we hold all the Capabilities (eg. in GC, or if there is only
724 * one Capability), or (b) we own the Capability that the TSO is
725 * currently blocked on or on the run queue of.
726 *
727 * -------------------------------------------------------------------------- */
728
729 static StgTSO *
730 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
731 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
732 {
733 StgRetInfoTable *info;
734 StgPtr sp, frame;
735 StgClosure *updatee;
736 nat i;
737 StgStack *stack;
738
739 debugTraceCap(DEBUG_sched, cap,
740 "raising exception in thread %ld.", (long)tso->id);
741
742 #if defined(PROFILING)
743 /*
744 * Debugging tool: on raising an exception, show where we are.
745 * See also Exception.cmm:stg_raisezh.
746 * This wasn't done for asynchronous exceptions originally; see #1450
747 */
748 if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
749 {
750 fprintCCS_stderr(tso->prof.cccs,exception,tso);
751 }
752 #endif
753 // ASSUMES: the thread is not already complete or dead
754 // Upper layers should deal with that.
755 ASSERT(tso->what_next != ThreadComplete &&
756 tso->what_next != ThreadKilled);
757
758 // only if we own this TSO (except that deleteThread() calls this
759 ASSERT(tso->cap == cap);
760
761 stack = tso->stackobj;
762
763 // mark it dirty; we're about to change its stack.
764 dirty_TSO(cap, tso);
765 dirty_STACK(cap, stack);
766
767 sp = stack->sp;
768
769 if (stop_here != NULL) {
770 updatee = stop_here->updatee;
771 } else {
772 updatee = NULL;
773 }
774
775 // The stack freezing code assumes there's a closure pointer on
776 // the top of the stack, so we have to arrange that this is the case...
777 //
778 if (sp[0] == (W_)&stg_enter_info) {
779 sp++;
780 } else {
781 sp--;
782 sp[0] = (W_)&stg_dummy_ret_closure;
783 }
784
785 frame = sp + 1;
786 while (stop_here == NULL || frame < (StgPtr)stop_here) {
787
788 // 1. Let the top of the stack be the "current closure"
789 //
790 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
791 // CATCH_FRAME.
792 //
793 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
794 // current closure applied to the chunk of stack up to (but not
795 // including) the update frame. This closure becomes the "current
796 // closure". Go back to step 2.
797 //
798 // 4. If it's a CATCH_FRAME, then leave the exception handler on
799 // top of the stack applied to the exception.
800 //
801 // 5. If it's a STOP_FRAME, then kill the thread.
802 //
803 // 6. If it's an UNDERFLOW_FRAME, then continue with the next
804 // stack chunk.
805 //
806 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
807 // transaction
808
809 info = get_ret_itbl((StgClosure *)frame);
810
811 switch (info->i.type) {
812
813 case UPDATE_FRAME:
814 {
815 StgAP_STACK * ap;
816 nat words;
817
818 // First build an AP_STACK consisting of the stack chunk above the
819 // current update frame, with the top word on the stack as the
820 // fun field.
821 //
822 words = frame - sp - 1;
823 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
824
825 ap->size = words;
826 ap->fun = (StgClosure *)sp[0];
827 sp++;
828 for(i=0; i < (nat)words; ++i) {
829 ap->payload[i] = (StgClosure *)*sp++;
830 }
831
832 SET_HDR(ap,&stg_AP_STACK_info,
833 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
834 TICK_ALLOC_UP_THK(WDS(words+1),0);
835
836 //IF_DEBUG(scheduler,
837 // debugBelch("sched: Updating ");
838 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
839 // debugBelch(" with ");
840 // printObj((StgClosure *)ap);
841 // );
842
843 if (((StgUpdateFrame *)frame)->updatee == updatee) {
844 // If this update frame points to the same closure as
845 // the update frame further down the stack
846 // (stop_here), then don't perform the update. We
847 // want to keep the blackhole in this case, so we can
848 // detect and report the loop (#2783).
849 ap = (StgAP_STACK*)updatee;
850 } else {
851 // Perform the update
852 // TODO: this may waste some work, if the thunk has
853 // already been updated by another thread.
854 updateThunk(cap, tso,
855 ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
856 }
857
858 sp += sizeofW(StgUpdateFrame) - 1;
859 sp[0] = (W_)ap; // push onto stack
860 frame = sp + 1;
861 continue; //no need to bump frame
862 }
863
864 case UNDERFLOW_FRAME:
865 {
866 StgAP_STACK * ap;
867 nat words;
868
869 // First build an AP_STACK consisting of the stack chunk above the
870 // current update frame, with the top word on the stack as the
871 // fun field.
872 //
873 words = frame - sp - 1;
874 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
875
876 ap->size = words;
877 ap->fun = (StgClosure *)sp[0];
878 sp++;
879 for(i=0; i < (nat)words; ++i) {
880 ap->payload[i] = (StgClosure *)*sp++;
881 }
882
883 SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
884 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
885 TICK_ALLOC_SE_THK(WDS(words+1),0);
886
887 stack->sp = sp;
888 threadStackUnderflow(cap,tso);
889 stack = tso->stackobj;
890 sp = stack->sp;
891
892 sp--;
893 sp[0] = (W_)ap;
894 frame = sp + 1;
895 continue;
896 }
897
898 case STOP_FRAME:
899 {
900 // We've stripped the entire stack, the thread is now dead.
901 tso->what_next = ThreadKilled;
902 stack->sp = frame + sizeofW(StgStopFrame);
903 goto done;
904 }
905
906 case CATCH_FRAME:
907 // If we find a CATCH_FRAME, and we've got an exception to raise,
908 // then build the THUNK raise(exception), and leave it on
909 // top of the CATCH_FRAME ready to enter.
910 //
911 {
912 StgCatchFrame *cf = (StgCatchFrame *)frame;
913 StgThunk *raise;
914
915 if (exception == NULL) break;
916
917 // we've got an exception to raise, so let's pass it to the
918 // handler in this frame.
919 //
920 raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
921 TICK_ALLOC_SE_THK(WDS(1),0);
922 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
923 raise->payload[0] = exception;
924
925 // throw away the stack from Sp up to the CATCH_FRAME.
926 //
927 sp = frame - 1;
928
929 /* Ensure that async excpetions are blocked now, so we don't get
930 * a surprise exception before we get around to executing the
931 * handler.
932 */
933 tso->flags |= TSO_BLOCKEX;
934 if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
935 tso->flags &= ~TSO_INTERRUPTIBLE;
936 } else {
937 tso->flags |= TSO_INTERRUPTIBLE;
938 }
939
940 /* Put the newly-built THUNK on top of the stack, ready to execute
941 * when the thread restarts.
942 */
943 sp[0] = (W_)raise;
944 sp[-1] = (W_)&stg_enter_info;
945 stack->sp = sp-1;
946 tso->what_next = ThreadRunGHC;
947 goto done;
948 }
949
950 case ATOMICALLY_FRAME:
951 if (stop_at_atomically) {
952 ASSERT(tso->trec->enclosing_trec == NO_TREC);
953 stmCondemnTransaction(cap, tso -> trec);
954 stack->sp = frame - 2;
955 // The ATOMICALLY_FRAME expects to be returned a
956 // result from the transaction, which it stores in the
957 // stack frame. Hence we arrange to return a dummy
958 // result, so that the GC doesn't get upset (#3578).
959 // Perhaps a better way would be to have a different
960 // ATOMICALLY_FRAME instance for condemned
961 // transactions, but I don't fully understand the
962 // interaction with STM invariants.
963 stack->sp[1] = (W_)&stg_NO_TREC_closure;
964 stack->sp[0] = (W_)&stg_ret_p_info;
965 tso->what_next = ThreadRunGHC;
966 goto done;
967 }
968 else
969 {
970 // Freezing an STM transaction. Just aborting the
971 // transaction would be wrong; this is what we used to
972 // do, and it goes wrong if the ATOMICALLY_FRAME ever
973 // gets back onto the stack again, which it will do if
974 // the transaction is inside unsafePerformIO or
975 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
976 //
977 // So we want to make it so that if the enclosing
978 // computation is resumed, we will re-execute the
979 // transaction. We therefore:
980 //
981 // 1. abort the current transaction
982 // 3. replace the stack up to and including the
983 // atomically frame with a closure representing
984 // a call to "atomically x", where x is the code
985 // of the transaction.
986 // 4. continue stripping the stack
987 //
988 StgTRecHeader *trec = tso->trec;
989 StgTRecHeader *outer = trec->enclosing_trec;
990
991 StgThunk *atomically;
992 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
993
994 debugTraceCap(DEBUG_stm, cap,
995 "raiseAsync: freezing atomically frame")
996 stmAbortTransaction(cap, trec);
997 stmFreeAbortedTRec(cap, trec);
998 tso->trec = outer;
999
1000 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
1001 TICK_ALLOC_SE_THK(1,0);
1002 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1003 atomically->payload[0] = af->code;
1004
1005 // discard stack up to and including the ATOMICALLY_FRAME
1006 frame += sizeofW(StgAtomicallyFrame);
1007 sp = frame - 1;
1008
1009 // replace the ATOMICALLY_FRAME with call to atomically#
1010 sp[0] = (W_)atomically;
1011 continue;
1012 }
1013
1014 case CATCH_STM_FRAME:
1015 case CATCH_RETRY_FRAME:
1016 // CATCH frames within an atomically block: abort the
1017 // inner transaction and continue. Eventually we will
1018 // hit the outer transaction that will get frozen (see
1019 // above).
1020 //
1021 // In this case (unlike ordinary exceptions) we do not care
1022 // whether the transaction is valid or not because its
1023 // possible validity cannot have caused the exception
1024 // and will not be visible after the abort.
1025 {
1026 StgTRecHeader *trec = tso -> trec;
1027 StgTRecHeader *outer = trec -> enclosing_trec;
1028 debugTraceCap(DEBUG_stm, cap,
1029 "found atomically block delivering async exception");
1030 stmAbortTransaction(cap, trec);
1031 stmFreeAbortedTRec(cap, trec);
1032 tso -> trec = outer;
1033 break;
1034 };
1035
1036 default:
1037 break;
1038 }
1039
1040 // move on to the next stack frame
1041 frame += stack_frame_sizeW((StgClosure *)frame);
1042 }
1043
1044 done:
1045 IF_DEBUG(sanity, checkTSO(tso));
1046
1047 // wake it up
1048 if (tso->why_blocked != NotBlocked) {
1049 tso->why_blocked = NotBlocked;
1050 appendToRunQueue(cap,tso);
1051 }
1052
1053 return tso;
1054 }
1055
1056