Generalise `Control.Monad.{when,unless,guard}`
[ghc.git] / rts / RaiseAsync.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25
26 static StgTSO* raiseAsync (Capability *cap,
27 StgTSO *tso,
28 StgClosure *exception,
29 rtsBool stop_at_atomically,
30 StgUpdateFrame *stop_here);
31
32 static void removeFromQueues(Capability *cap, StgTSO *tso);
33
34 static void removeFromMVarBlockedQueue (StgTSO *tso);
35
36 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
37 Capability *target_cap USED_IF_THREADS,
38 MessageThrowTo *msg USED_IF_THREADS);
39
40 /* -----------------------------------------------------------------------------
41 throwToSingleThreaded
42
43 This version of throwTo is safe to use if and only if one of the
44 following holds:
45
46 - !THREADED_RTS
47
48 - all the other threads in the system are stopped (eg. during GC).
49
50 - we surely own the target TSO (eg. we just took it from the
51 run queue of the current capability, or we are running it).
52
53 It doesn't cater for blocking the source thread until the exception
54 has been raised.
55 -------------------------------------------------------------------------- */
56
57 static void
58 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
59 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
60 {
61 // Thread already dead?
62 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
63 return;
64 }
65
66 // Remove it from any blocking queues
67 removeFromQueues(cap,tso);
68
69 raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
70 }
71
72 void
73 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
74 {
75 throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL);
76 }
77
78 void
79 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
80 rtsBool stop_at_atomically)
81 {
82 throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
83 }
84
85 void // cannot return a different TSO
86 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
87 {
88 throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here);
89 }
90
91 /* -----------------------------------------------------------------------------
92 throwTo
93
94 This function may be used to throw an exception from one thread to
95 another, during the course of normal execution. This is a tricky
96 task: the target thread might be running on another CPU, or it
97 may be blocked and could be woken up at any point by another CPU.
98 We have some delicate synchronisation to do.
99
100 The underlying scheme when multiple Capabilities are in use is
101 message passing: when the target of a throwTo is on another
102 Capability, we send a message (a MessageThrowTo closure) to that
103 Capability.
104
105 If the throwTo needs to block because the target TSO is masking
106 exceptions (the TSO_BLOCKEX flag), then the message is placed on
107 the blocked_exceptions queue attached to the target TSO. When the
108 target TSO enters the unmasked state again, it must check the
109 queue. The blocked_exceptions queue is not locked; only the
110 Capability owning the TSO may modify it.
111
112 To make things simpler for throwTo, we always create the message
113 first before deciding what to do. The message may get sent, or it
114 may get attached to a TSO's blocked_exceptions queue, or the
115 exception may get thrown immediately and the message dropped,
116 depending on the current state of the target.
117
118 Currently we send a message if the target belongs to another
119 Capability, and it is
120
121 - NotBlocked, BlockedOnMsgThrowTo,
122 BlockedOnCCall_Interruptible
123
124 - or it is masking exceptions (TSO_BLOCKEX)
125
126 Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
127 BlockedOnBlackHole then we acquire ownership of the TSO by locking
128 its parent container (e.g. the MVar) and then raise the exception.
129 We might change these cases to be more message-passing-like in the
130 future.
131
132 Returns:
133
134 NULL exception was raised, ok to continue
135
136 MessageThrowTo * exception was not raised; the source TSO
137 should now put itself in the state
138 BlockedOnMsgThrowTo, and when it is ready
139 it should unlock the mssage using
140 unlockClosure(msg, &stg_MSG_THROWTO_info);
141 If it decides not to raise the exception after
142 all, it can revoke it safely with
143 unlockClosure(msg, &stg_MSG_NULL_info);
144
145 -------------------------------------------------------------------------- */
146
147 MessageThrowTo *
148 throwTo (Capability *cap, // the Capability we hold
149 StgTSO *source, // the TSO sending the exception (or NULL)
150 StgTSO *target, // the TSO receiving the exception
151 StgClosure *exception) // the exception closure
152 {
153 MessageThrowTo *msg;
154
155 msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
156 // the message starts locked; see below
157 SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
158 msg->source = source;
159 msg->target = target;
160 msg->exception = exception;
161
162 switch (throwToMsg(cap, msg))
163 {
164 case THROWTO_SUCCESS:
165 // unlock the message now, otherwise we leave a WHITEHOLE in
166 // the heap (#6103)
167 SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
168 return NULL;
169
170 case THROWTO_BLOCKED:
171 default:
172 // the caller will unlock the message when it is ready. We
173 // cannot unlock it yet, because the calling thread will need
174 // to tidy up its state first.
175 return msg;
176 }
177 }
178
179
180 nat
181 throwToMsg (Capability *cap, MessageThrowTo *msg)
182 {
183 StgWord status;
184 StgTSO *target = msg->target;
185 Capability *target_cap;
186
187 goto check_target;
188
189 retry:
190 write_barrier();
191 debugTrace(DEBUG_sched, "throwTo: retrying...");
192
193 check_target:
194 ASSERT(target != END_TSO_QUEUE);
195
196 // Thread already dead?
197 if (target->what_next == ThreadComplete
198 || target->what_next == ThreadKilled) {
199 return THROWTO_SUCCESS;
200 }
201
202 debugTraceCap(DEBUG_sched, cap,
203 "throwTo: from thread %lu to thread %lu",
204 (unsigned long)msg->source->id,
205 (unsigned long)msg->target->id);
206
207 #ifdef DEBUG
208 traceThreadStatus(DEBUG_sched, target);
209 #endif
210
211 target_cap = target->cap;
212 if (target->cap != cap) {
213 throwToSendMsg(cap, target_cap, msg);
214 return THROWTO_BLOCKED;
215 }
216
217 status = target->why_blocked;
218
219 switch (status) {
220 case NotBlocked:
221 {
222 if ((target->flags & TSO_BLOCKEX) == 0) {
223 // It's on our run queue and not blocking exceptions
224 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
225 return THROWTO_SUCCESS;
226 } else {
227 blockedThrowTo(cap,target,msg);
228 return THROWTO_BLOCKED;
229 }
230 }
231
232 case BlockedOnMsgThrowTo:
233 {
234 const StgInfoTable *i;
235 MessageThrowTo *m;
236
237 m = target->block_info.throwto;
238
239 // target is local to this cap, but has sent a throwto
240 // message to another cap.
241 //
242 // The source message is locked. We need to revoke the
243 // target's message so that we can raise the exception, so
244 // we attempt to lock it.
245
246 // There's a possibility of a deadlock if two threads are both
247 // trying to throwTo each other (or more generally, a cycle of
248 // threads). To break the symmetry we compare the addresses
249 // of the MessageThrowTo objects, and the one for which m <
250 // msg gets to spin, while the other can only try to lock
251 // once, but must then back off and unlock both before trying
252 // again.
253 if (m < msg) {
254 i = lockClosure((StgClosure *)m);
255 } else {
256 i = tryLockClosure((StgClosure *)m);
257 if (i == NULL) {
258 // debugBelch("collision\n");
259 throwToSendMsg(cap, target->cap, msg);
260 return THROWTO_BLOCKED;
261 }
262 }
263
264 if (i == &stg_MSG_NULL_info) {
265 // we know there's a MSG_TRY_WAKEUP on the way, so we
266 // might as well just do it now. The message will
267 // be a no-op when it arrives.
268 unlockClosure((StgClosure*)m, i);
269 tryWakeupThread(cap, target);
270 goto retry;
271 }
272
273 if (i != &stg_MSG_THROWTO_info) {
274 // if it's a MSG_NULL, this TSO has been woken up by another Cap
275 unlockClosure((StgClosure*)m, i);
276 goto retry;
277 }
278
279 if ((target->flags & TSO_BLOCKEX) &&
280 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
281 unlockClosure((StgClosure*)m, i);
282 blockedThrowTo(cap,target,msg);
283 return THROWTO_BLOCKED;
284 }
285
286 // nobody else can wake up this TSO after we claim the message
287 doneWithMsgThrowTo(m);
288
289 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
290 return THROWTO_SUCCESS;
291 }
292
293 case BlockedOnMVar:
294 case BlockedOnMVarRead:
295 {
296 /*
297 To establish ownership of this TSO, we need to acquire a
298 lock on the MVar that it is blocked on.
299 */
300 StgMVar *mvar;
301 StgInfoTable *info USED_IF_THREADS;
302
303 mvar = (StgMVar *)target->block_info.closure;
304
305 // ASSUMPTION: tso->block_info must always point to a
306 // closure. In the threaded RTS it does.
307 switch (get_itbl((StgClosure *)mvar)->type) {
308 case MVAR_CLEAN:
309 case MVAR_DIRTY:
310 break;
311 default:
312 goto retry;
313 }
314
315 info = lockClosure((StgClosure *)mvar);
316
317 // we have the MVar, let's check whether the thread
318 // is still blocked on the same MVar.
319 if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
320 || (StgMVar *)target->block_info.closure != mvar) {
321 unlockClosure((StgClosure *)mvar, info);
322 goto retry;
323 }
324
325 if (target->_link == END_TSO_QUEUE) {
326 // the MVar operation has already completed. There is a
327 // MSG_TRY_WAKEUP on the way, but we can just wake up the
328 // thread now anyway and ignore the message when it
329 // arrives.
330 unlockClosure((StgClosure *)mvar, info);
331 tryWakeupThread(cap, target);
332 goto retry;
333 }
334
335 if ((target->flags & TSO_BLOCKEX) &&
336 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
337 blockedThrowTo(cap,target,msg);
338 unlockClosure((StgClosure *)mvar, info);
339 return THROWTO_BLOCKED;
340 } else {
341 // revoke the MVar operation
342 removeFromMVarBlockedQueue(target);
343 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
344 unlockClosure((StgClosure *)mvar, info);
345 return THROWTO_SUCCESS;
346 }
347 }
348
349 case BlockedOnBlackHole:
350 {
351 if (target->flags & TSO_BLOCKEX) {
352 // BlockedOnBlackHole is not interruptible.
353 blockedThrowTo(cap,target,msg);
354 return THROWTO_BLOCKED;
355 } else {
356 // Revoke the message by replacing it with IND. We're not
357 // locking anything here, so we might still get a TRY_WAKEUP
358 // message from the owner of the blackhole some time in the
359 // future, but that doesn't matter.
360 ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
361 OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
362 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
363 return THROWTO_SUCCESS;
364 }
365 }
366
367 case BlockedOnSTM:
368 lockTSO(target);
369 // Unblocking BlockedOnSTM threads requires the TSO to be
370 // locked; see STM.c:unpark_tso().
371 if (target->why_blocked != BlockedOnSTM) {
372 unlockTSO(target);
373 goto retry;
374 }
375 if ((target->flags & TSO_BLOCKEX) &&
376 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
377 blockedThrowTo(cap,target,msg);
378 unlockTSO(target);
379 return THROWTO_BLOCKED;
380 } else {
381 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
382 unlockTSO(target);
383 return THROWTO_SUCCESS;
384 }
385
386 case BlockedOnCCall_Interruptible:
387 #ifdef THREADED_RTS
388 {
389 Task *task = NULL;
390 // walk suspended_ccalls to find the correct worker thread
391 InCall *incall;
392 for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
393 if (incall->suspended_tso == target) {
394 task = incall->task;
395 break;
396 }
397 }
398 if (task != NULL) {
399 blockedThrowTo(cap, target, msg);
400 if (!((target->flags & TSO_BLOCKEX) &&
401 ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
402 interruptWorkerTask(task);
403 }
404 return THROWTO_BLOCKED;
405 } else {
406 debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
407 }
408 // fall to next
409 }
410 #endif
411 case BlockedOnCCall:
412 blockedThrowTo(cap,target,msg);
413 return THROWTO_BLOCKED;
414
415 #ifndef THREADEDED_RTS
416 case BlockedOnRead:
417 case BlockedOnWrite:
418 case BlockedOnDelay:
419 #if defined(mingw32_HOST_OS)
420 case BlockedOnDoProc:
421 #endif
422 if ((target->flags & TSO_BLOCKEX) &&
423 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
424 blockedThrowTo(cap,target,msg);
425 return THROWTO_BLOCKED;
426 } else {
427 removeFromQueues(cap,target);
428 raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
429 return THROWTO_SUCCESS;
430 }
431 #endif
432
433 case ThreadMigrating:
434 // if is is ThreadMigrating and tso->cap is ours, then it
435 // *must* be migrating *to* this capability. If it were
436 // migrating away from the capability, then tso->cap would
437 // point to the destination.
438 //
439 // There is a MSG_WAKEUP in the message queue for this thread,
440 // but we can just do it preemptively:
441 tryWakeupThread(cap, target);
442 // and now retry, the thread should be runnable.
443 goto retry;
444
445 default:
446 barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
447 }
448 barf("throwTo");
449 }
450
451 static void
452 throwToSendMsg (Capability *cap STG_UNUSED,
453 Capability *target_cap USED_IF_THREADS,
454 MessageThrowTo *msg USED_IF_THREADS)
455
456 {
457 #ifdef THREADED_RTS
458 debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
459
460 sendMessage(cap, target_cap, (Message*)msg);
461 #endif
462 }
463
464 // Block a throwTo message on the target TSO's blocked_exceptions
465 // queue. The current Capability must own the target TSO in order to
466 // modify the blocked_exceptions queue.
467 void
468 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
469 {
470 debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
471 (unsigned long)target->id);
472
473 ASSERT(target->cap == cap);
474
475 msg->link = target->blocked_exceptions;
476 target->blocked_exceptions = msg;
477 dirty_TSO(cap,target); // we modified the blocked_exceptions queue
478 }
479
480 /* -----------------------------------------------------------------------------
481 Waking up threads blocked in throwTo
482
483 There are two ways to do this: maybePerformBlockedException() will
484 perform the throwTo() for the thread at the head of the queue
485 immediately, and leave the other threads on the queue.
486 maybePerformBlockedException() also checks the TSO_BLOCKEX flag
487 before raising an exception.
488
489 awakenBlockedExceptionQueue() will wake up all the threads in the
490 queue, but not perform any throwTo() immediately. This might be
491 more appropriate when the target thread is the one actually running
492 (see Exception.cmm).
493
494 Returns: non-zero if an exception was raised, zero otherwise.
495 -------------------------------------------------------------------------- */
496
497 int
498 maybePerformBlockedException (Capability *cap, StgTSO *tso)
499 {
500 MessageThrowTo *msg;
501 const StgInfoTable *i;
502 StgTSO *source;
503
504 if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
505 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
506 awakenBlockedExceptionQueue(cap,tso);
507 return 1;
508 } else {
509 return 0;
510 }
511 }
512
513 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
514 (tso->flags & TSO_BLOCKEX) != 0) {
515 debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
516 }
517
518 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
519 && ((tso->flags & TSO_BLOCKEX) == 0
520 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
521
522 // We unblock just the first thread on the queue, and perform
523 // its throw immediately.
524 loop:
525 msg = tso->blocked_exceptions;
526 if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
527 i = lockClosure((StgClosure*)msg);
528 tso->blocked_exceptions = (MessageThrowTo*)msg->link;
529 if (i == &stg_MSG_NULL_info) {
530 unlockClosure((StgClosure*)msg,i);
531 goto loop;
532 }
533
534 throwToSingleThreaded(cap, msg->target, msg->exception);
535 source = msg->source;
536 doneWithMsgThrowTo(msg);
537 tryWakeupThread(cap, source);
538 return 1;
539 }
540 return 0;
541 }
542
543 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
544 // blocked exceptions.
545
546 void
547 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
548 {
549 MessageThrowTo *msg;
550 const StgInfoTable *i;
551 StgTSO *source;
552
553 for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
554 msg = (MessageThrowTo*)msg->link) {
555 i = lockClosure((StgClosure *)msg);
556 if (i != &stg_MSG_NULL_info) {
557 source = msg->source;
558 doneWithMsgThrowTo(msg);
559 tryWakeupThread(cap, source);
560 } else {
561 unlockClosure((StgClosure *)msg,i);
562 }
563 }
564 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
565 }
566
567 /* -----------------------------------------------------------------------------
568 Remove a thread from blocking queues.
569
570 This is for use when we raise an exception in another thread, which
571 may be blocked.
572
573 Precondition: we have exclusive access to the TSO, via the same set
574 of conditions as throwToSingleThreaded() (c.f.).
575 -------------------------------------------------------------------------- */
576
577 static void
578 removeFromMVarBlockedQueue (StgTSO *tso)
579 {
580 StgMVar *mvar = (StgMVar*)tso->block_info.closure;
581 StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
582
583 if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
584 // already removed from this MVar
585 return;
586 }
587
588 // Assume the MVar is locked. (not assertable; sometimes it isn't
589 // actually WHITEHOLE'd).
590
591 // We want to remove the MVAR_TSO_QUEUE object from the queue. It
592 // isn't doubly-linked so we can't actually remove it; instead we
593 // just overwrite it with an IND if possible and let the GC short
594 // it out. However, we have to be careful to maintain the deque
595 // structure:
596
597 if (mvar->head == q) {
598 mvar->head = q->link;
599 OVERWRITE_INFO(q, &stg_IND_info);
600 if (mvar->tail == q) {
601 mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
602 }
603 }
604 else if (mvar->tail == q) {
605 // we can't replace it with an IND in this case, because then
606 // we lose the tail pointer when the GC shorts out the IND.
607 // So we use MSG_NULL as a kind of non-dupable indirection;
608 // these are ignored by takeMVar/putMVar.
609 OVERWRITE_INFO(q, &stg_MSG_NULL_info);
610 }
611 else {
612 OVERWRITE_INFO(q, &stg_IND_info);
613 }
614
615 // revoke the MVar operation
616 tso->_link = END_TSO_QUEUE;
617 }
618
619 static void
620 removeFromQueues(Capability *cap, StgTSO *tso)
621 {
622 switch (tso->why_blocked) {
623
624 case NotBlocked:
625 case ThreadMigrating:
626 return;
627
628 case BlockedOnSTM:
629 // Be careful: nothing to do here! We tell the scheduler that the
630 // thread is runnable and we leave it to the stack-walking code to
631 // abort the transaction while unwinding the stack. We should
632 // perhaps have a debugging test to make sure that this really
633 // happens and that the 'zombie' transaction does not get
634 // committed.
635 goto done;
636
637 case BlockedOnMVar:
638 case BlockedOnMVarRead:
639 removeFromMVarBlockedQueue(tso);
640 goto done;
641
642 case BlockedOnBlackHole:
643 // nothing to do
644 goto done;
645
646 case BlockedOnMsgThrowTo:
647 {
648 MessageThrowTo *m = tso->block_info.throwto;
649 // The message is locked by us, unless we got here via
650 // deleteAllThreads(), in which case we own all the
651 // capabilities.
652 // ASSERT(m->header.info == &stg_WHITEHOLE_info);
653
654 // unlock and revoke it at the same time
655 doneWithMsgThrowTo(m);
656 break;
657 }
658
659 #if !defined(THREADED_RTS)
660 case BlockedOnRead:
661 case BlockedOnWrite:
662 #if defined(mingw32_HOST_OS)
663 case BlockedOnDoProc:
664 #endif
665 removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
666 #if defined(mingw32_HOST_OS)
667 /* (Cooperatively) signal that the worker thread should abort
668 * the request.
669 */
670 abandonWorkRequest(tso->block_info.async_result->reqID);
671 #endif
672 goto done;
673
674 case BlockedOnDelay:
675 removeThreadFromQueue(cap, &sleeping_queue, tso);
676 goto done;
677 #endif
678
679 default:
680 barf("removeFromQueues: %d", tso->why_blocked);
681 }
682
683 done:
684 tso->why_blocked = NotBlocked;
685 appendToRunQueue(cap, tso);
686 }
687
688 /* -----------------------------------------------------------------------------
689 * raiseAsync()
690 *
691 * The following function implements the magic for raising an
692 * asynchronous exception in an existing thread.
693 *
694 * We first remove the thread from any queue on which it might be
695 * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
696 * TSO blocked_exception queues.
697 *
698 * We strip the stack down to the innermost CATCH_FRAME, building
699 * thunks in the heap for all the active computations, so they can
700 * be restarted if necessary. When we reach a CATCH_FRAME, we build
701 * an application of the handler to the exception, and push it on
702 * the top of the stack.
703 *
704 * How exactly do we save all the active computations? We create an
705 * AP_STACK for every UpdateFrame on the stack. Entering one of these
706 * AP_STACKs pushes everything from the corresponding update frame
707 * upwards onto the stack. (Actually, it pushes everything up to the
708 * next update frame plus a pointer to the next AP_STACK object.
709 * Entering the next AP_STACK object pushes more onto the stack until we
710 * reach the last AP_STACK object - at which point the stack should look
711 * exactly as it did when we killed the TSO and we can continue
712 * execution by entering the closure on top of the stack.
713 *
714 * We can also kill a thread entirely - this happens if either (a) the
715 * exception passed to raiseAsync is NULL, or (b) there's no
716 * CATCH_FRAME on the stack. In either case, we strip the entire
717 * stack and replace the thread with a zombie.
718 *
719 * ToDo: in THREADED_RTS mode, this function is only safe if either
720 * (a) we hold all the Capabilities (eg. in GC, or if there is only
721 * one Capability), or (b) we own the Capability that the TSO is
722 * currently blocked on or on the run queue of.
723 *
724 * -------------------------------------------------------------------------- */
725
726 static StgTSO *
727 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
728 rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
729 {
730 StgRetInfoTable *info;
731 StgPtr sp, frame;
732 StgClosure *updatee;
733 nat i;
734 StgStack *stack;
735
736 debugTraceCap(DEBUG_sched, cap,
737 "raising exception in thread %ld.", (long)tso->id);
738
739 #if defined(PROFILING)
740 /*
741 * Debugging tool: on raising an exception, show where we are.
742 * See also Exception.cmm:stg_raisezh.
743 * This wasn't done for asynchronous exceptions originally; see #1450
744 */
745 if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
746 {
747 fprintCCS_stderr(tso->prof.cccs,exception,tso);
748 }
749 #endif
750 // ASSUMES: the thread is not already complete or dead
751 // Upper layers should deal with that.
752 ASSERT(tso->what_next != ThreadComplete &&
753 tso->what_next != ThreadKilled);
754
755 // only if we own this TSO (except that deleteThread() calls this
756 ASSERT(tso->cap == cap);
757
758 stack = tso->stackobj;
759
760 // mark it dirty; we're about to change its stack.
761 dirty_TSO(cap, tso);
762 dirty_STACK(cap, stack);
763
764 sp = stack->sp;
765
766 if (stop_here != NULL) {
767 updatee = stop_here->updatee;
768 } else {
769 updatee = NULL;
770 }
771
772 // The stack freezing code assumes there's a closure pointer on
773 // the top of the stack, so we have to arrange that this is the case...
774 //
775 if (sp[0] == (W_)&stg_enter_info) {
776 sp++;
777 } else {
778 sp--;
779 sp[0] = (W_)&stg_dummy_ret_closure;
780 }
781
782 frame = sp + 1;
783 while (stop_here == NULL || frame < (StgPtr)stop_here) {
784
785 // 1. Let the top of the stack be the "current closure"
786 //
787 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
788 // CATCH_FRAME.
789 //
790 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
791 // current closure applied to the chunk of stack up to (but not
792 // including) the update frame. This closure becomes the "current
793 // closure". Go back to step 2.
794 //
795 // 4. If it's a CATCH_FRAME, then leave the exception handler on
796 // top of the stack applied to the exception.
797 //
798 // 5. If it's a STOP_FRAME, then kill the thread.
799 //
800 // 6. If it's an UNDERFLOW_FRAME, then continue with the next
801 // stack chunk.
802 //
803 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
804 // transaction
805
806 info = get_ret_itbl((StgClosure *)frame);
807
808 switch (info->i.type) {
809
810 case UPDATE_FRAME:
811 {
812 StgAP_STACK * ap;
813 nat words;
814
815 // First build an AP_STACK consisting of the stack chunk above the
816 // current update frame, with the top word on the stack as the
817 // fun field.
818 //
819 words = frame - sp - 1;
820 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
821
822 ap->size = words;
823 ap->fun = (StgClosure *)sp[0];
824 sp++;
825 for(i=0; i < (nat)words; ++i) {
826 ap->payload[i] = (StgClosure *)*sp++;
827 }
828
829 SET_HDR(ap,&stg_AP_STACK_info,
830 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
831 TICK_ALLOC_UP_THK(WDS(words+1),0);
832
833 //IF_DEBUG(scheduler,
834 // debugBelch("sched: Updating ");
835 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
836 // debugBelch(" with ");
837 // printObj((StgClosure *)ap);
838 // );
839
840 if (((StgUpdateFrame *)frame)->updatee == updatee) {
841 // If this update frame points to the same closure as
842 // the update frame further down the stack
843 // (stop_here), then don't perform the update. We
844 // want to keep the blackhole in this case, so we can
845 // detect and report the loop (#2783).
846 ap = (StgAP_STACK*)updatee;
847 } else {
848 // Perform the update
849 // TODO: this may waste some work, if the thunk has
850 // already been updated by another thread.
851 updateThunk(cap, tso,
852 ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
853 }
854
855 sp += sizeofW(StgUpdateFrame) - 1;
856 sp[0] = (W_)ap; // push onto stack
857 frame = sp + 1;
858 continue; //no need to bump frame
859 }
860
861 case UNDERFLOW_FRAME:
862 {
863 StgAP_STACK * ap;
864 nat words;
865
866 // First build an AP_STACK consisting of the stack chunk above the
867 // current update frame, with the top word on the stack as the
868 // fun field.
869 //
870 words = frame - sp - 1;
871 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
872
873 ap->size = words;
874 ap->fun = (StgClosure *)sp[0];
875 sp++;
876 for(i=0; i < (nat)words; ++i) {
877 ap->payload[i] = (StgClosure *)*sp++;
878 }
879
880 SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
881 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
882 TICK_ALLOC_SE_THK(WDS(words+1),0);
883
884 stack->sp = sp;
885 threadStackUnderflow(cap,tso);
886 stack = tso->stackobj;
887 sp = stack->sp;
888
889 sp--;
890 sp[0] = (W_)ap;
891 frame = sp + 1;
892 continue;
893 }
894
895 case STOP_FRAME:
896 {
897 // We've stripped the entire stack, the thread is now dead.
898 tso->what_next = ThreadKilled;
899 stack->sp = frame + sizeofW(StgStopFrame);
900 goto done;
901 }
902
903 case CATCH_FRAME:
904 // If we find a CATCH_FRAME, and we've got an exception to raise,
905 // then build the THUNK raise(exception), and leave it on
906 // top of the CATCH_FRAME ready to enter.
907 //
908 {
909 StgCatchFrame *cf = (StgCatchFrame *)frame;
910 StgThunk *raise;
911
912 if (exception == NULL) break;
913
914 // we've got an exception to raise, so let's pass it to the
915 // handler in this frame.
916 //
917 raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
918 TICK_ALLOC_SE_THK(WDS(1),0);
919 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
920 raise->payload[0] = exception;
921
922 // throw away the stack from Sp up to the CATCH_FRAME.
923 //
924 sp = frame - 1;
925
926 /* Ensure that async exceptions are blocked now, so we don't get
927 * a surprise exception before we get around to executing the
928 * handler.
929 */
930 tso->flags |= TSO_BLOCKEX;
931 if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
932 tso->flags &= ~TSO_INTERRUPTIBLE;
933 } else {
934 tso->flags |= TSO_INTERRUPTIBLE;
935 }
936
937 /* Put the newly-built THUNK on top of the stack, ready to execute
938 * when the thread restarts.
939 */
940 sp[0] = (W_)raise;
941 sp[-1] = (W_)&stg_enter_info;
942 stack->sp = sp-1;
943 tso->what_next = ThreadRunGHC;
944 goto done;
945 }
946
947 case ATOMICALLY_FRAME:
948 if (stop_at_atomically) {
949 ASSERT(tso->trec->enclosing_trec == NO_TREC);
950 stmCondemnTransaction(cap, tso -> trec);
951 stack->sp = frame - 2;
952 // The ATOMICALLY_FRAME expects to be returned a
953 // result from the transaction, which it stores in the
954 // stack frame. Hence we arrange to return a dummy
955 // result, so that the GC doesn't get upset (#3578).
956 // Perhaps a better way would be to have a different
957 // ATOMICALLY_FRAME instance for condemned
958 // transactions, but I don't fully understand the
959 // interaction with STM invariants.
960 stack->sp[1] = (W_)&stg_NO_TREC_closure;
961 stack->sp[0] = (W_)&stg_ret_p_info;
962 tso->what_next = ThreadRunGHC;
963 goto done;
964 }
965 else
966 {
967 // Freezing an STM transaction. Just aborting the
968 // transaction would be wrong; this is what we used to
969 // do, and it goes wrong if the ATOMICALLY_FRAME ever
970 // gets back onto the stack again, which it will do if
971 // the transaction is inside unsafePerformIO or
972 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
973 //
974 // So we want to make it so that if the enclosing
975 // computation is resumed, we will re-execute the
976 // transaction. We therefore:
977 //
978 // 1. abort the current transaction
979 // 3. replace the stack up to and including the
980 // atomically frame with a closure representing
981 // a call to "atomically x", where x is the code
982 // of the transaction.
983 // 4. continue stripping the stack
984 //
985 StgTRecHeader *trec = tso->trec;
986 StgTRecHeader *outer = trec->enclosing_trec;
987
988 StgThunk *atomically;
989 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
990
991 debugTraceCap(DEBUG_stm, cap,
992 "raiseAsync: freezing atomically frame")
993 stmAbortTransaction(cap, trec);
994 stmFreeAbortedTRec(cap, trec);
995 tso->trec = outer;
996
997 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
998 TICK_ALLOC_SE_THK(1,0);
999 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1000 atomically->payload[0] = af->code;
1001
1002 // discard stack up to and including the ATOMICALLY_FRAME
1003 frame += sizeofW(StgAtomicallyFrame);
1004 sp = frame - 1;
1005
1006 // replace the ATOMICALLY_FRAME with call to atomically#
1007 sp[0] = (W_)atomically;
1008 continue;
1009 }
1010
1011 case CATCH_STM_FRAME:
1012 case CATCH_RETRY_FRAME:
1013 // CATCH frames within an atomically block: abort the
1014 // inner transaction and continue. Eventually we will
1015 // hit the outer transaction that will get frozen (see
1016 // above).
1017 //
1018 // In this case (unlike ordinary exceptions) we do not care
1019 // whether the transaction is valid or not because its
1020 // possible validity cannot have caused the exception
1021 // and will not be visible after the abort.
1022 {
1023 StgTRecHeader *trec = tso -> trec;
1024 StgTRecHeader *outer = trec -> enclosing_trec;
1025 debugTraceCap(DEBUG_stm, cap,
1026 "found atomically block delivering async exception");
1027 stmAbortTransaction(cap, trec);
1028 stmFreeAbortedTRec(cap, trec);
1029 tso -> trec = outer;
1030 break;
1031 };
1032
1033 default:
1034 break;
1035 }
1036
1037 // move on to the next stack frame
1038 frame += stack_frame_sizeW((StgClosure *)frame);
1039 }
1040
1041 done:
1042 IF_DEBUG(sanity, checkTSO(tso));
1043
1044 // wake it up
1045 if (tso->why_blocked != NotBlocked) {
1046 tso->why_blocked = NotBlocked;
1047 appendToRunQueue(cap,tso);
1048 }
1049
1050 return tso;
1051 }
1052
1053
1054
1055 // Local Variables:
1056 // mode: C
1057 // fill-column: 80
1058 // indent-tabs-mode: nil
1059 // c-basic-offset: 4
1060 // buffer-file-coding-system: utf-8-unix
1061 // End: