Merge branch 'refs/heads/vect-avoid' into vect-avoid-merge
[ghc.git] / rts / RaiseAsync.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25
26 static StgTSO* raiseAsync (Capability *cap,
27 StgTSO *tso,
28 StgClosure *exception,
29 rtsBool stop_at_atomically,
30 StgUpdateFrame *stop_here);
31
32 static void removeFromQueues(Capability *cap, StgTSO *tso);
33
34 static void removeFromMVarBlockedQueue (StgTSO *tso);
35
36 static void blockedThrowTo (Capability *cap,
37 StgTSO *target, MessageThrowTo *msg);
38
39 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
40 Capability *target_cap USED_IF_THREADS,
41 MessageThrowTo *msg USED_IF_THREADS);
42
43 /* -----------------------------------------------------------------------------
44 throwToSingleThreaded
45
46 This version of throwTo is safe to use if and only if one of the
47 following holds:
48
49 - !THREADED_RTS
50
51 - all the other threads in the system are stopped (eg. during GC).
52
53 - we surely own the target TSO (eg. we just took it from the
54 run queue of the current capability, or we are running it).
55
56 It doesn't cater for blocking the source thread until the exception
57 has been raised.
58 -------------------------------------------------------------------------- */
59
60 static void
61 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
62 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
63 {
64 // Thread already dead?
65 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
66 return;
67 }
68
69 // Remove it from any blocking queues
70 removeFromQueues(cap,tso);
71
72 raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
73 }
74
75 void
76 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
77 {
78 throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL);
79 }
80
81 void
82 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
83 rtsBool stop_at_atomically)
84 {
85 throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
86 }
87
88 void // cannot return a different TSO
89 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
90 {
91 throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here);
92 }
93
94 /* -----------------------------------------------------------------------------
95 throwTo
96
97 This function may be used to throw an exception from one thread to
98 another, during the course of normal execution. This is a tricky
99 task: the target thread might be running on another CPU, or it
100 may be blocked and could be woken up at any point by another CPU.
101 We have some delicate synchronisation to do.
102
103 The underlying scheme when multiple Capabilities are in use is
104 message passing: when the target of a throwTo is on another
105 Capability, we send a message (a MessageThrowTo closure) to that
106 Capability.
107
108 If the throwTo needs to block because the target TSO is masking
109 exceptions (the TSO_BLOCKEX flag), then the message is placed on
110 the blocked_exceptions queue attached to the target TSO. When the
111 target TSO enters the unmasked state again, it must check the
112 queue. The blocked_exceptions queue is not locked; only the
113 Capability owning the TSO may modify it.
114
115 To make things simpler for throwTo, we always create the message
116 first before deciding what to do. The message may get sent, or it
117 may get attached to a TSO's blocked_exceptions queue, or the
118 exception may get thrown immediately and the message dropped,
119 depending on the current state of the target.
120
121 Currently we send a message if the target belongs to another
122 Capability, and it is
123
124 - NotBlocked, BlockedOnMsgThrowTo,
125 BlockedOnCCall_Interruptible
126
127 - or it is masking exceptions (TSO_BLOCKEX)
128
129 Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
130 BlockedOnBlackHole then we acquire ownership of the TSO by locking
131 its parent container (e.g. the MVar) and then raise the exception.
132 We might change these cases to be more message-passing-like in the
133 future.
134
135 Returns:
136
137 NULL exception was raised, ok to continue
138
139 MessageThrowTo * exception was not raised; the source TSO
140 should now put itself in the state
141 BlockedOnMsgThrowTo, and when it is ready
142 it should unlock the mssage using
143 unlockClosure(msg, &stg_MSG_THROWTO_info);
144 If it decides not to raise the exception after
145 all, it can revoke it safely with
146 unlockClosure(msg, &stg_MSG_NULL_info);
147
148 -------------------------------------------------------------------------- */
149
150 MessageThrowTo *
151 throwTo (Capability *cap, // the Capability we hold
152 StgTSO *source, // the TSO sending the exception (or NULL)
153 StgTSO *target, // the TSO receiving the exception
154 StgClosure *exception) // the exception closure
155 {
156 MessageThrowTo *msg;
157
158 msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
159 // the message starts locked; see below
160 SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
161 msg->source = source;
162 msg->target = target;
163 msg->exception = exception;
164
165 switch (throwToMsg(cap, msg))
166 {
167 case THROWTO_SUCCESS:
168 // unlock the message now, otherwise we leave a WHITEHOLE in
169 // the heap (#6103)
170 SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
171 return NULL;
172
173 case THROWTO_BLOCKED:
174 default:
175 // the caller will unlock the message when it is ready. We
176 // cannot unlock it yet, because the calling thread will need
177 // to tidy up its state first.
178 return msg;
179 }
180 }
181
182
183 nat
184 throwToMsg (Capability *cap, MessageThrowTo *msg)
185 {
186 StgWord status;
187 StgTSO *target = msg->target;
188 Capability *target_cap;
189
190 goto check_target;
191
192 retry:
193 write_barrier();
194 debugTrace(DEBUG_sched, "throwTo: retrying...");
195
196 check_target:
197 ASSERT(target != END_TSO_QUEUE);
198
199 // Thread already dead?
200 if (target->what_next == ThreadComplete
201 || target->what_next == ThreadKilled) {
202 return THROWTO_SUCCESS;
203 }
204
205 debugTraceCap(DEBUG_sched, cap,
206 "throwTo: from thread %lu to thread %lu",
207 (unsigned long)msg->source->id,
208 (unsigned long)msg->target->id);
209
210 #ifdef DEBUG
211 traceThreadStatus(DEBUG_sched, target);
212 #endif
213
214 target_cap = target->cap;
215 if (target->cap != cap) {
216 throwToSendMsg(cap, target_cap, msg);
217 return THROWTO_BLOCKED;
218 }
219
220 status = target->why_blocked;
221
222 switch (status) {
223 case NotBlocked:
224 {
225 if ((target->flags & TSO_BLOCKEX) == 0) {
226 // It's on our run queue and not blocking exceptions
227 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
228 return THROWTO_SUCCESS;
229 } else {
230 blockedThrowTo(cap,target,msg);
231 return THROWTO_BLOCKED;
232 }
233 }
234
235 case BlockedOnMsgThrowTo:
236 {
237 const StgInfoTable *i;
238 MessageThrowTo *m;
239
240 m = target->block_info.throwto;
241
242 // target is local to this cap, but has sent a throwto
243 // message to another cap.
244 //
245 // The source message is locked. We need to revoke the
246 // target's message so that we can raise the exception, so
247 // we attempt to lock it.
248
249 // There's a possibility of a deadlock if two threads are both
250 // trying to throwTo each other (or more generally, a cycle of
251 // threads). To break the symmetry we compare the addresses
252 // of the MessageThrowTo objects, and the one for which m <
253 // msg gets to spin, while the other can only try to lock
254 // once, but must then back off and unlock both before trying
255 // again.
256 if (m < msg) {
257 i = lockClosure((StgClosure *)m);
258 } else {
259 i = tryLockClosure((StgClosure *)m);
260 if (i == NULL) {
261 // debugBelch("collision\n");
262 throwToSendMsg(cap, target->cap, msg);
263 return THROWTO_BLOCKED;
264 }
265 }
266
267 if (i == &stg_MSG_NULL_info) {
268 // we know there's a MSG_TRY_WAKEUP on the way, so we
269 // might as well just do it now. The message will
270 // be a no-op when it arrives.
271 unlockClosure((StgClosure*)m, i);
272 tryWakeupThread(cap, target);
273 goto retry;
274 }
275
276 if (i != &stg_MSG_THROWTO_info) {
277 // if it's a MSG_NULL, this TSO has been woken up by another Cap
278 unlockClosure((StgClosure*)m, i);
279 goto retry;
280 }
281
282 if ((target->flags & TSO_BLOCKEX) &&
283 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
284 unlockClosure((StgClosure*)m, i);
285 blockedThrowTo(cap,target,msg);
286 return THROWTO_BLOCKED;
287 }
288
289 // nobody else can wake up this TSO after we claim the message
290 doneWithMsgThrowTo(m);
291
292 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
293 return THROWTO_SUCCESS;
294 }
295
296 case BlockedOnMVar:
297 {
298 /*
299 To establish ownership of this TSO, we need to acquire a
300 lock on the MVar that it is blocked on.
301 */
302 StgMVar *mvar;
303 StgInfoTable *info USED_IF_THREADS;
304
305 mvar = (StgMVar *)target->block_info.closure;
306
307 // ASSUMPTION: tso->block_info must always point to a
308 // closure. In the threaded RTS it does.
309 switch (get_itbl((StgClosure *)mvar)->type) {
310 case MVAR_CLEAN:
311 case MVAR_DIRTY:
312 break;
313 default:
314 goto retry;
315 }
316
317 info = lockClosure((StgClosure *)mvar);
318
319 // we have the MVar, let's check whether the thread
320 // is still blocked on the same MVar.
321 if (target->why_blocked != BlockedOnMVar
322 || (StgMVar *)target->block_info.closure != mvar) {
323 unlockClosure((StgClosure *)mvar, info);
324 goto retry;
325 }
326
327 if (target->_link == END_TSO_QUEUE) {
328 // the MVar operation has already completed. There is a
329 // MSG_TRY_WAKEUP on the way, but we can just wake up the
330 // thread now anyway and ignore the message when it
331 // arrives.
332 unlockClosure((StgClosure *)mvar, info);
333 tryWakeupThread(cap, target);
334 goto retry;
335 }
336
337 if ((target->flags & TSO_BLOCKEX) &&
338 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
339 blockedThrowTo(cap,target,msg);
340 unlockClosure((StgClosure *)mvar, info);
341 return THROWTO_BLOCKED;
342 } else {
343 // revoke the MVar operation
344 removeFromMVarBlockedQueue(target);
345 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
346 unlockClosure((StgClosure *)mvar, info);
347 return THROWTO_SUCCESS;
348 }
349 }
350
351 case BlockedOnBlackHole:
352 {
353 if (target->flags & TSO_BLOCKEX) {
354 // BlockedOnBlackHole is not interruptible.
355 blockedThrowTo(cap,target,msg);
356 return THROWTO_BLOCKED;
357 } else {
358 // Revoke the message by replacing it with IND. We're not
359 // locking anything here, so we might still get a TRY_WAKEUP
360 // message from the owner of the blackhole some time in the
361 // future, but that doesn't matter.
362 ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
363 OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
364 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
365 return THROWTO_SUCCESS;
366 }
367 }
368
369 case BlockedOnSTM:
370 lockTSO(target);
371 // Unblocking BlockedOnSTM threads requires the TSO to be
372 // locked; see STM.c:unpark_tso().
373 if (target->why_blocked != BlockedOnSTM) {
374 unlockTSO(target);
375 goto retry;
376 }
377 if ((target->flags & TSO_BLOCKEX) &&
378 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
379 blockedThrowTo(cap,target,msg);
380 unlockTSO(target);
381 return THROWTO_BLOCKED;
382 } else {
383 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
384 unlockTSO(target);
385 return THROWTO_SUCCESS;
386 }
387
388 case BlockedOnCCall_Interruptible:
389 #ifdef THREADED_RTS
390 {
391 Task *task = NULL;
392 // walk suspended_ccalls to find the correct worker thread
393 InCall *incall;
394 for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
395 if (incall->suspended_tso == target) {
396 task = incall->task;
397 break;
398 }
399 }
400 if (task != NULL) {
401 blockedThrowTo(cap, target, msg);
402 if (!((target->flags & TSO_BLOCKEX) &&
403 ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
404 interruptWorkerTask(task);
405 }
406 return THROWTO_BLOCKED;
407 } else {
408 debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
409 }
410 // fall to next
411 }
412 #endif
413 case BlockedOnCCall:
414 blockedThrowTo(cap,target,msg);
415 return THROWTO_BLOCKED;
416
417 #ifndef THREADEDED_RTS
418 case BlockedOnRead:
419 case BlockedOnWrite:
420 case BlockedOnDelay:
421 #if defined(mingw32_HOST_OS)
422 case BlockedOnDoProc:
423 #endif
424 if ((target->flags & TSO_BLOCKEX) &&
425 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
426 blockedThrowTo(cap,target,msg);
427 return THROWTO_BLOCKED;
428 } else {
429 removeFromQueues(cap,target);
430 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
431 return THROWTO_SUCCESS;
432 }
433 #endif
434
435 case ThreadMigrating:
436 // if is is ThreadMigrating and tso->cap is ours, then it
437 // *must* be migrating *to* this capability. If it were
438 // migrating away from the capability, then tso->cap would
439 // point to the destination.
440 //
441 // There is a MSG_WAKEUP in the message queue for this thread,
442 // but we can just do it preemptively:
443 tryWakeupThread(cap, target);
444 // and now retry, the thread should be runnable.
445 goto retry;
446
447 default:
448 barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
449 }
450 barf("throwTo");
451 }
452
453 static void
454 throwToSendMsg (Capability *cap STG_UNUSED,
455 Capability *target_cap USED_IF_THREADS,
456 MessageThrowTo *msg USED_IF_THREADS)
457
458 {
459 #ifdef THREADED_RTS
460 debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
461
462 sendMessage(cap, target_cap, (Message*)msg);
463 #endif
464 }
465
466 // Block a throwTo message on the target TSO's blocked_exceptions
467 // queue. The current Capability must own the target TSO in order to
468 // modify the blocked_exceptions queue.
469 static void
470 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
471 {
472 debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
473 (unsigned long)target->id);
474
475 ASSERT(target->cap == cap);
476
477 msg->link = target->blocked_exceptions;
478 target->blocked_exceptions = msg;
479 dirty_TSO(cap,target); // we modified the blocked_exceptions queue
480 }
481
482 /* -----------------------------------------------------------------------------
483 Waking up threads blocked in throwTo
484
485 There are two ways to do this: maybePerformBlockedException() will
486 perform the throwTo() for the thread at the head of the queue
487 immediately, and leave the other threads on the queue.
488 maybePerformBlockedException() also checks the TSO_BLOCKEX flag
489 before raising an exception.
490
491 awakenBlockedExceptionQueue() will wake up all the threads in the
492 queue, but not perform any throwTo() immediately. This might be
493 more appropriate when the target thread is the one actually running
494 (see Exception.cmm).
495
496 Returns: non-zero if an exception was raised, zero otherwise.
497 -------------------------------------------------------------------------- */
498
499 int
500 maybePerformBlockedException (Capability *cap, StgTSO *tso)
501 {
502 MessageThrowTo *msg;
503 const StgInfoTable *i;
504 StgTSO *source;
505
506 if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
507 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
508 awakenBlockedExceptionQueue(cap,tso);
509 return 1;
510 } else {
511 return 0;
512 }
513 }
514
515 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
516 (tso->flags & TSO_BLOCKEX) != 0) {
517 debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
518 }
519
520 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
521 && ((tso->flags & TSO_BLOCKEX) == 0
522 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
523
524 // We unblock just the first thread on the queue, and perform
525 // its throw immediately.
526 loop:
527 msg = tso->blocked_exceptions;
528 if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
529 i = lockClosure((StgClosure*)msg);
530 tso->blocked_exceptions = (MessageThrowTo*)msg->link;
531 if (i == &stg_MSG_NULL_info) {
532 unlockClosure((StgClosure*)msg,i);
533 goto loop;
534 }
535
536 throwToSingleThreaded(cap, msg->target, msg->exception);
537 source = msg->source;
538 doneWithMsgThrowTo(msg);
539 tryWakeupThread(cap, source);
540 return 1;
541 }
542 return 0;
543 }
544
545 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
546 // blocked exceptions.
547
548 void
549 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
550 {
551 MessageThrowTo *msg;
552 const StgInfoTable *i;
553 StgTSO *source;
554
555 for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
556 msg = (MessageThrowTo*)msg->link) {
557 i = lockClosure((StgClosure *)msg);
558 if (i != &stg_MSG_NULL_info) {
559 source = msg->source;
560 doneWithMsgThrowTo(msg);
561 tryWakeupThread(cap, source);
562 } else {
563 unlockClosure((StgClosure *)msg,i);
564 }
565 }
566 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
567 }
568
569 /* -----------------------------------------------------------------------------
570 Remove a thread from blocking queues.
571
572 This is for use when we raise an exception in another thread, which
573 may be blocked.
574
575 Precondition: we have exclusive access to the TSO, via the same set
576 of conditions as throwToSingleThreaded() (c.f.).
577 -------------------------------------------------------------------------- */
578
579 static void
580 removeFromMVarBlockedQueue (StgTSO *tso)
581 {
582 StgMVar *mvar = (StgMVar*)tso->block_info.closure;
583 StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
584
585 if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
586 // already removed from this MVar
587 return;
588 }
589
590 // Assume the MVar is locked. (not assertable; sometimes it isn't
591 // actually WHITEHOLE'd).
592
593 // We want to remove the MVAR_TSO_QUEUE object from the queue. It
594 // isn't doubly-linked so we can't actually remove it; instead we
595 // just overwrite it with an IND if possible and let the GC short
596 // it out. However, we have to be careful to maintain the deque
597 // structure:
598
599 if (mvar->head == q) {
600 mvar->head = q->link;
601 OVERWRITE_INFO(q, &stg_IND_info);
602 if (mvar->tail == q) {
603 mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
604 }
605 }
606 else if (mvar->tail == q) {
607 // we can't replace it with an IND in this case, because then
608 // we lose the tail pointer when the GC shorts out the IND.
609 // So we use MSG_NULL as a kind of non-dupable indirection;
610 // these are ignored by takeMVar/putMVar.
611 OVERWRITE_INFO(q, &stg_MSG_NULL_info);
612 }
613 else {
614 OVERWRITE_INFO(q, &stg_IND_info);
615 }
616
617 // revoke the MVar operation
618 tso->_link = END_TSO_QUEUE;
619 }
620
621 static void
622 removeFromQueues(Capability *cap, StgTSO *tso)
623 {
624 switch (tso->why_blocked) {
625
626 case NotBlocked:
627 case ThreadMigrating:
628 return;
629
630 case BlockedOnSTM:
631 // Be careful: nothing to do here! We tell the scheduler that the
632 // thread is runnable and we leave it to the stack-walking code to
633 // abort the transaction while unwinding the stack. We should
634 // perhaps have a debugging test to make sure that this really
635 // happens and that the 'zombie' transaction does not get
636 // committed.
637 goto done;
638
639 case BlockedOnMVar:
640 removeFromMVarBlockedQueue(tso);
641 goto done;
642
643 case BlockedOnBlackHole:
644 // nothing to do
645 goto done;
646
647 case BlockedOnMsgThrowTo:
648 {
649 MessageThrowTo *m = tso->block_info.throwto;
650 // The message is locked by us, unless we got here via
651 // deleteAllThreads(), in which case we own all the
652 // capabilities.
653 // ASSERT(m->header.info == &stg_WHITEHOLE_info);
654
655 // unlock and revoke it at the same time
656 doneWithMsgThrowTo(m);
657 break;
658 }
659
660 #if !defined(THREADED_RTS)
661 case BlockedOnRead:
662 case BlockedOnWrite:
663 #if defined(mingw32_HOST_OS)
664 case BlockedOnDoProc:
665 #endif
666 removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
667 #if defined(mingw32_HOST_OS)
668 /* (Cooperatively) signal that the worker thread should abort
669 * the request.
670 */
671 abandonWorkRequest(tso->block_info.async_result->reqID);
672 #endif
673 goto done;
674
675 case BlockedOnDelay:
676 removeThreadFromQueue(cap, &sleeping_queue, tso);
677 goto done;
678 #endif
679
680 default:
681 barf("removeFromQueues: %d", tso->why_blocked);
682 }
683
684 done:
685 tso->why_blocked = NotBlocked;
686 appendToRunQueue(cap, tso);
687 }
688
689 /* -----------------------------------------------------------------------------
690 * raiseAsync()
691 *
692 * The following function implements the magic for raising an
693 * asynchronous exception in an existing thread.
694 *
695 * We first remove the thread from any queue on which it might be
696 * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
697 * TSO blocked_exception queues.
698 *
699 * We strip the stack down to the innermost CATCH_FRAME, building
700 * thunks in the heap for all the active computations, so they can
701 * be restarted if necessary. When we reach a CATCH_FRAME, we build
702 * an application of the handler to the exception, and push it on
703 * the top of the stack.
704 *
705 * How exactly do we save all the active computations? We create an
706 * AP_STACK for every UpdateFrame on the stack. Entering one of these
707 * AP_STACKs pushes everything from the corresponding update frame
708 * upwards onto the stack. (Actually, it pushes everything up to the
709 * next update frame plus a pointer to the next AP_STACK object.
710 * Entering the next AP_STACK object pushes more onto the stack until we
711 * reach the last AP_STACK object - at which point the stack should look
712 * exactly as it did when we killed the TSO and we can continue
713 * execution by entering the closure on top of the stack.
714 *
715 * We can also kill a thread entirely - this happens if either (a) the
716 * exception passed to raiseAsync is NULL, or (b) there's no
717 * CATCH_FRAME on the stack. In either case, we strip the entire
718 * stack and replace the thread with a zombie.
719 *
720 * ToDo: in THREADED_RTS mode, this function is only safe if either
721 * (a) we hold all the Capabilities (eg. in GC, or if there is only
722 * one Capability), or (b) we own the Capability that the TSO is
723 * currently blocked on or on the run queue of.
724 *
725 * -------------------------------------------------------------------------- */
726
727 static StgTSO *
728 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
729 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
730 {
731 StgRetInfoTable *info;
732 StgPtr sp, frame;
733 StgClosure *updatee;
734 nat i;
735 StgStack *stack;
736
737 debugTraceCap(DEBUG_sched, cap,
738 "raising exception in thread %ld.", (long)tso->id);
739
740 #if defined(PROFILING)
741 /*
742 * Debugging tool: on raising an exception, show where we are.
743 * See also Exception.cmm:stg_raisezh.
744 * This wasn't done for asynchronous exceptions originally; see #1450
745 */
746 if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
747 {
748 fprintCCS_stderr(tso->prof.cccs,exception,tso);
749 }
750 #endif
751 // ASSUMES: the thread is not already complete or dead
752 // Upper layers should deal with that.
753 ASSERT(tso->what_next != ThreadComplete &&
754 tso->what_next != ThreadKilled);
755
756 // only if we own this TSO (except that deleteThread() calls this
757 ASSERT(tso->cap == cap);
758
759 stack = tso->stackobj;
760
761 // mark it dirty; we're about to change its stack.
762 dirty_TSO(cap, tso);
763 dirty_STACK(cap, stack);
764
765 sp = stack->sp;
766
767 if (stop_here != NULL) {
768 updatee = stop_here->updatee;
769 } else {
770 updatee = NULL;
771 }
772
773 // The stack freezing code assumes there's a closure pointer on
774 // the top of the stack, so we have to arrange that this is the case...
775 //
776 if (sp[0] == (W_)&stg_enter_info) {
777 sp++;
778 } else {
779 sp--;
780 sp[0] = (W_)&stg_dummy_ret_closure;
781 }
782
783 frame = sp + 1;
784 while (stop_here == NULL || frame < (StgPtr)stop_here) {
785
786 // 1. Let the top of the stack be the "current closure"
787 //
788 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
789 // CATCH_FRAME.
790 //
791 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
792 // current closure applied to the chunk of stack up to (but not
793 // including) the update frame. This closure becomes the "current
794 // closure". Go back to step 2.
795 //
796 // 4. If it's a CATCH_FRAME, then leave the exception handler on
797 // top of the stack applied to the exception.
798 //
799 // 5. If it's a STOP_FRAME, then kill the thread.
800 //
801 // 6. If it's an UNDERFLOW_FRAME, then continue with the next
802 // stack chunk.
803 //
804 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
805 // transaction
806
807 info = get_ret_itbl((StgClosure *)frame);
808
809 switch (info->i.type) {
810
811 case UPDATE_FRAME:
812 {
813 StgAP_STACK * ap;
814 nat words;
815
816 // First build an AP_STACK consisting of the stack chunk above the
817 // current update frame, with the top word on the stack as the
818 // fun field.
819 //
820 words = frame - sp - 1;
821 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
822
823 ap->size = words;
824 ap->fun = (StgClosure *)sp[0];
825 sp++;
826 for(i=0; i < (nat)words; ++i) {
827 ap->payload[i] = (StgClosure *)*sp++;
828 }
829
830 SET_HDR(ap,&stg_AP_STACK_info,
831 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
832 TICK_ALLOC_UP_THK(words+1,0);
833
834 //IF_DEBUG(scheduler,
835 // debugBelch("sched: Updating ");
836 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
837 // debugBelch(" with ");
838 // printObj((StgClosure *)ap);
839 // );
840
841 if (((StgUpdateFrame *)frame)->updatee == updatee) {
842 // If this update frame points to the same closure as
843 // the update frame further down the stack
844 // (stop_here), then don't perform the update. We
845 // want to keep the blackhole in this case, so we can
846 // detect and report the loop (#2783).
847 ap = (StgAP_STACK*)updatee;
848 } else {
849 // Perform the update
850 // TODO: this may waste some work, if the thunk has
851 // already been updated by another thread.
852 updateThunk(cap, tso,
853 ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
854 }
855
856 sp += sizeofW(StgUpdateFrame) - 1;
857 sp[0] = (W_)ap; // push onto stack
858 frame = sp + 1;
859 continue; //no need to bump frame
860 }
861
862 case UNDERFLOW_FRAME:
863 {
864 StgAP_STACK * ap;
865 nat words;
866
867 // First build an AP_STACK consisting of the stack chunk above the
868 // current update frame, with the top word on the stack as the
869 // fun field.
870 //
871 words = frame - sp - 1;
872 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
873
874 ap->size = words;
875 ap->fun = (StgClosure *)sp[0];
876 sp++;
877 for(i=0; i < (nat)words; ++i) {
878 ap->payload[i] = (StgClosure *)*sp++;
879 }
880
881 SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
882 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
883 TICK_ALLOC_SE_THK(words+1,0);
884
885 stack->sp = sp;
886 threadStackUnderflow(cap,tso);
887 stack = tso->stackobj;
888 sp = stack->sp;
889
890 sp--;
891 sp[0] = (W_)ap;
892 frame = sp + 1;
893 continue;
894 }
895
896 case STOP_FRAME:
897 {
898 // We've stripped the entire stack, the thread is now dead.
899 tso->what_next = ThreadKilled;
900 stack->sp = frame + sizeofW(StgStopFrame);
901 goto done;
902 }
903
904 case CATCH_FRAME:
905 // If we find a CATCH_FRAME, and we've got an exception to raise,
906 // then build the THUNK raise(exception), and leave it on
907 // top of the CATCH_FRAME ready to enter.
908 //
909 {
910 StgCatchFrame *cf = (StgCatchFrame *)frame;
911 StgThunk *raise;
912
913 if (exception == NULL) break;
914
915 // we've got an exception to raise, so let's pass it to the
916 // handler in this frame.
917 //
918 raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
919 TICK_ALLOC_SE_THK(1,0);
920 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
921 raise->payload[0] = exception;
922
923 // throw away the stack from Sp up to the CATCH_FRAME.
924 //
925 sp = frame - 1;
926
927 /* Ensure that async excpetions are blocked now, so we don't get
928 * a surprise exception before we get around to executing the
929 * handler.
930 */
931 tso->flags |= TSO_BLOCKEX;
932 if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
933 tso->flags &= ~TSO_INTERRUPTIBLE;
934 } else {
935 tso->flags |= TSO_INTERRUPTIBLE;
936 }
937
938 /* Put the newly-built THUNK on top of the stack, ready to execute
939 * when the thread restarts.
940 */
941 sp[0] = (W_)raise;
942 sp[-1] = (W_)&stg_enter_info;
943 stack->sp = sp-1;
944 tso->what_next = ThreadRunGHC;
945 goto done;
946 }
947
948 case ATOMICALLY_FRAME:
949 if (stop_at_atomically) {
950 ASSERT(tso->trec->enclosing_trec == NO_TREC);
951 stmCondemnTransaction(cap, tso -> trec);
952 stack->sp = frame - 2;
953 // The ATOMICALLY_FRAME expects to be returned a
954 // result from the transaction, which it stores in the
955 // stack frame. Hence we arrange to return a dummy
956 // result, so that the GC doesn't get upset (#3578).
957 // Perhaps a better way would be to have a different
958 // ATOMICALLY_FRAME instance for condemned
959 // transactions, but I don't fully understand the
960 // interaction with STM invariants.
961 stack->sp[1] = (W_)&stg_NO_TREC_closure;
962 stack->sp[0] = (W_)&stg_ret_p_info;
963 tso->what_next = ThreadRunGHC;
964 goto done;
965 }
966 else
967 {
968 // Freezing an STM transaction. Just aborting the
969 // transaction would be wrong; this is what we used to
970 // do, and it goes wrong if the ATOMICALLY_FRAME ever
971 // gets back onto the stack again, which it will do if
972 // the transaction is inside unsafePerformIO or
973 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
974 //
975 // So we want to make it so that if the enclosing
976 // computation is resumed, we will re-execute the
977 // transaction. We therefore:
978 //
979 // 1. abort the current transaction
980 // 3. replace the stack up to and including the
981 // atomically frame with a closure representing
982 // a call to "atomically x", where x is the code
983 // of the transaction.
984 // 4. continue stripping the stack
985 //
986 StgTRecHeader *trec = tso->trec;
987 StgTRecHeader *outer = trec->enclosing_trec;
988
989 StgThunk *atomically;
990 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
991
992 debugTraceCap(DEBUG_stm, cap,
993 "raiseAsync: freezing atomically frame")
994 stmAbortTransaction(cap, trec);
995 stmFreeAbortedTRec(cap, trec);
996 tso->trec = outer;
997
998 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
999 TICK_ALLOC_SE_THK(1,0);
1000 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1001 atomically->payload[0] = af->code;
1002
1003 // discard stack up to and including the ATOMICALLY_FRAME
1004 frame += sizeofW(StgAtomicallyFrame);
1005 sp = frame - 1;
1006
1007 // replace the ATOMICALLY_FRAME with call to atomically#
1008 sp[0] = (W_)atomically;
1009 continue;
1010 }
1011
1012 case CATCH_STM_FRAME:
1013 case CATCH_RETRY_FRAME:
1014 // CATCH frames within an atomically block: abort the
1015 // inner transaction and continue. Eventually we will
1016 // hit the outer transaction that will get frozen (see
1017 // above).
1018 //
1019 // In this case (unlike ordinary exceptions) we do not care
1020 // whether the transaction is valid or not because its
1021 // possible validity cannot have caused the exception
1022 // and will not be visible after the abort.
1023 {
1024 StgTRecHeader *trec = tso -> trec;
1025 StgTRecHeader *outer = trec -> enclosing_trec;
1026 debugTraceCap(DEBUG_stm, cap,
1027 "found atomically block delivering async exception");
1028 stmAbortTransaction(cap, trec);
1029 stmFreeAbortedTRec(cap, trec);
1030 tso -> trec = outer;
1031 break;
1032 };
1033
1034 default:
1035 break;
1036 }
1037
1038 // move on to the next stack frame
1039 frame += stack_frame_sizeW((StgClosure *)frame);
1040 }
1041
1042 done:
1043 IF_DEBUG(sanity, checkTSO(tso));
1044
1045 // wake it up
1046 if (tso->why_blocked != NotBlocked) {
1047 tso->why_blocked = NotBlocked;
1048 appendToRunQueue(cap,tso);
1049 }
1050
1051 return tso;
1052 }
1053
1054