b399d416758737e3afb0b96cc0b7f100ae3916b7
[ghc.git] / rts / RaiseAsync.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25
26 static void blockedThrowTo (Capability *cap,
27 StgTSO *target, MessageThrowTo *msg);
28
29 static void removeFromQueues(Capability *cap, StgTSO *tso);
30
31 static void removeFromMVarBlockedQueue (StgTSO *tso);
32
33 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
34 Capability *target_cap USED_IF_THREADS,
35 MessageThrowTo *msg USED_IF_THREADS);
36
37 /* -----------------------------------------------------------------------------
38 throwToSingleThreaded
39
40 This version of throwTo is safe to use if and only if one of the
41 following holds:
42
43 - !THREADED_RTS
44
45 - all the other threads in the system are stopped (eg. during GC).
46
47 - we surely own the target TSO (eg. we just took it from the
48 run queue of the current capability, or we are running it).
49
50 It doesn't cater for blocking the source thread until the exception
51 has been raised.
52 -------------------------------------------------------------------------- */
53
54 static void
55 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
56 bool stop_at_atomically, StgUpdateFrame *stop_here)
57 {
58 // Thread already dead?
59 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
60 return;
61 }
62
63 // Remove it from any blocking queues
64 removeFromQueues(cap,tso);
65
66 raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
67 }
68
69 void
70 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
71 {
72 throwToSingleThreaded__(cap, tso, exception, false, NULL);
73 }
74
75 void
76 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
77 bool stop_at_atomically)
78 {
79 throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
80 }
81
82 void // cannot return a different TSO
83 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
84 {
85 throwToSingleThreaded__ (cap, tso, NULL, false, stop_here);
86 }
87
88 /* -----------------------------------------------------------------------------
89 throwToSelf
90
91 Useful for throwing an async exception in a thread from the
92 runtime. It handles unlocking the throwto message returned by
93 throwTo().
94
95 Note [Throw to self when masked]
96
97 When a StackOverflow occurs when the thread is masked, we want to
98 defer the exception to when the thread becomes unmasked/hits an
99 interruptible point. We already have a mechanism for doing this,
100 the blocked_exceptions list, but the use here is a bit unusual,
101 because an exception is normally only added to this list upon
102 an asynchronous 'throwTo' call (with all of the relevant
103 multithreaded nonsense). Morally, a stack overflow should be an
104 asynchronous exception sent by a thread to itself, and it should
105 have the same semantics. But there are a few key differences:
106
107 - If you actually tried to send an asynchronous exception to
108 yourself using throwTo, the exception would actually immediately
109 be delivered. This is because throwTo itself is considered an
110 interruptible point, so the exception is always deliverable. Thus,
111 ordinarily, we never end up with a message to onesself in the
112 blocked_exceptions queue.
113
114 - In the case of a StackOverflow, we don't actually care about the
115 wakeup semantics; when an exception is delivered, the thread that
116 originally threw the exception should be woken up, since throwTo
117 blocks until the exception is successfully thrown. Fortunately,
118 it is harmless to wakeup a thread that doesn't actually need waking
119 up, e.g. ourselves.
120
121 - No synchronization is necessary, because we own the TSO and the
122 capability. You can observe this by tracing through the execution
123 of throwTo. We skip synchronizing the message and inter-capability
124 communication.
125
126 We think this doesn't break any invariants, but do be careful!
127 -------------------------------------------------------------------------- */
128
129 void
130 throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
131 {
132 MessageThrowTo *m;
133
134 m = throwTo(cap, tso, tso, exception);
135
136 if (m != NULL) {
137 // throwTo leaves it locked
138 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
139 }
140 }
141
142 /* -----------------------------------------------------------------------------
143 throwTo
144
145 This function may be used to throw an exception from one thread to
146 another, during the course of normal execution. This is a tricky
147 task: the target thread might be running on another CPU, or it
148 may be blocked and could be woken up at any point by another CPU.
149 We have some delicate synchronisation to do.
150
151 The underlying scheme when multiple Capabilities are in use is
152 message passing: when the target of a throwTo is on another
153 Capability, we send a message (a MessageThrowTo closure) to that
154 Capability.
155
156 If the throwTo needs to block because the target TSO is masking
157 exceptions (the TSO_BLOCKEX flag), then the message is placed on
158 the blocked_exceptions queue attached to the target TSO. When the
159 target TSO enters the unmasked state again, it must check the
160 queue. The blocked_exceptions queue is not locked; only the
161 Capability owning the TSO may modify it.
162
163 To make things simpler for throwTo, we always create the message
164 first before deciding what to do. The message may get sent, or it
165 may get attached to a TSO's blocked_exceptions queue, or the
166 exception may get thrown immediately and the message dropped,
167 depending on the current state of the target.
168
169 Currently we send a message if the target belongs to another
170 Capability, and it is
171
172 - NotBlocked, BlockedOnMsgThrowTo,
173 BlockedOnCCall_Interruptible
174
175 - or it is masking exceptions (TSO_BLOCKEX)
176
177 Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
178 BlockedOnBlackHole then we acquire ownership of the TSO by locking
179 its parent container (e.g. the MVar) and then raise the exception.
180 We might change these cases to be more message-passing-like in the
181 future.
182
183 Returns:
184
185 NULL exception was raised, ok to continue
186
187 MessageThrowTo * exception was not raised; the source TSO
188 should now put itself in the state
189 BlockedOnMsgThrowTo, and when it is ready
190 it should unlock the mssage using
191 unlockClosure(msg, &stg_MSG_THROWTO_info);
192 If it decides not to raise the exception after
193 all, it can revoke it safely with
194 unlockClosure(msg, &stg_MSG_NULL_info);
195
196 -------------------------------------------------------------------------- */
197
198 MessageThrowTo *
199 throwTo (Capability *cap, // the Capability we hold
200 StgTSO *source, // the TSO sending the exception (or NULL)
201 StgTSO *target, // the TSO receiving the exception
202 StgClosure *exception) // the exception closure
203 {
204 MessageThrowTo *msg;
205
206 msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
207 // the message starts locked; see below
208 SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
209 msg->source = source;
210 msg->target = target;
211 msg->exception = exception;
212
213 switch (throwToMsg(cap, msg))
214 {
215 case THROWTO_SUCCESS:
216 // unlock the message now, otherwise we leave a WHITEHOLE in
217 // the heap (#6103)
218 SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
219 return NULL;
220
221 case THROWTO_BLOCKED:
222 default:
223 // the caller will unlock the message when it is ready. We
224 // cannot unlock it yet, because the calling thread will need
225 // to tidy up its state first.
226 return msg;
227 }
228 }
229
230
231 uint32_t
232 throwToMsg (Capability *cap, MessageThrowTo *msg)
233 {
234 StgWord status;
235 StgTSO *target = msg->target;
236 Capability *target_cap;
237
238 goto check_target;
239
240 retry:
241 write_barrier();
242 debugTrace(DEBUG_sched, "throwTo: retrying...");
243
244 check_target:
245 ASSERT(target != END_TSO_QUEUE);
246
247 // Thread already dead?
248 if (target->what_next == ThreadComplete
249 || target->what_next == ThreadKilled) {
250 return THROWTO_SUCCESS;
251 }
252
253 debugTraceCap(DEBUG_sched, cap,
254 "throwTo: from thread %lu to thread %lu",
255 (unsigned long)msg->source->id,
256 (unsigned long)msg->target->id);
257
258 #ifdef DEBUG
259 traceThreadStatus(DEBUG_sched, target);
260 #endif
261
262 target_cap = target->cap;
263 if (target->cap != cap) {
264 throwToSendMsg(cap, target_cap, msg);
265 return THROWTO_BLOCKED;
266 }
267
268 status = target->why_blocked;
269
270 switch (status) {
271 case NotBlocked:
272 {
273 if ((target->flags & TSO_BLOCKEX) == 0) {
274 // It's on our run queue and not blocking exceptions
275 raiseAsync(cap, target, msg->exception, false, NULL);
276 return THROWTO_SUCCESS;
277 } else {
278 blockedThrowTo(cap,target,msg);
279 return THROWTO_BLOCKED;
280 }
281 }
282
283 case BlockedOnMsgThrowTo:
284 {
285 const StgInfoTable *i;
286 MessageThrowTo *m;
287
288 m = target->block_info.throwto;
289
290 // target is local to this cap, but has sent a throwto
291 // message to another cap.
292 //
293 // The source message is locked. We need to revoke the
294 // target's message so that we can raise the exception, so
295 // we attempt to lock it.
296
297 // There's a possibility of a deadlock if two threads are both
298 // trying to throwTo each other (or more generally, a cycle of
299 // threads). To break the symmetry we compare the addresses
300 // of the MessageThrowTo objects, and the one for which m <
301 // msg gets to spin, while the other can only try to lock
302 // once, but must then back off and unlock both before trying
303 // again.
304 if (m < msg) {
305 i = lockClosure((StgClosure *)m);
306 } else {
307 i = tryLockClosure((StgClosure *)m);
308 if (i == NULL) {
309 // debugBelch("collision\n");
310 throwToSendMsg(cap, target->cap, msg);
311 return THROWTO_BLOCKED;
312 }
313 }
314
315 if (i == &stg_MSG_NULL_info) {
316 // we know there's a MSG_TRY_WAKEUP on the way, so we
317 // might as well just do it now. The message will
318 // be a no-op when it arrives.
319 unlockClosure((StgClosure*)m, i);
320 tryWakeupThread(cap, target);
321 goto retry;
322 }
323
324 if (i != &stg_MSG_THROWTO_info) {
325 // if it's a MSG_NULL, this TSO has been woken up by another Cap
326 unlockClosure((StgClosure*)m, i);
327 goto retry;
328 }
329
330 if ((target->flags & TSO_BLOCKEX) &&
331 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
332 unlockClosure((StgClosure*)m, i);
333 blockedThrowTo(cap,target,msg);
334 return THROWTO_BLOCKED;
335 }
336
337 // nobody else can wake up this TSO after we claim the message
338 doneWithMsgThrowTo(m);
339
340 raiseAsync(cap, target, msg->exception, false, NULL);
341 return THROWTO_SUCCESS;
342 }
343
344 case BlockedOnMVar:
345 case BlockedOnMVarRead:
346 {
347 /*
348 To establish ownership of this TSO, we need to acquire a
349 lock on the MVar that it is blocked on.
350 */
351 StgMVar *mvar;
352 StgInfoTable *info USED_IF_THREADS;
353
354 mvar = (StgMVar *)target->block_info.closure;
355
356 // ASSUMPTION: tso->block_info must always point to a
357 // closure. In the threaded RTS it does.
358 switch (get_itbl((StgClosure *)mvar)->type) {
359 case MVAR_CLEAN:
360 case MVAR_DIRTY:
361 break;
362 default:
363 goto retry;
364 }
365
366 info = lockClosure((StgClosure *)mvar);
367
368 // we have the MVar, let's check whether the thread
369 // is still blocked on the same MVar.
370 if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
371 || (StgMVar *)target->block_info.closure != mvar) {
372 unlockClosure((StgClosure *)mvar, info);
373 goto retry;
374 }
375
376 if (target->_link == END_TSO_QUEUE) {
377 // the MVar operation has already completed. There is a
378 // MSG_TRY_WAKEUP on the way, but we can just wake up the
379 // thread now anyway and ignore the message when it
380 // arrives.
381 unlockClosure((StgClosure *)mvar, info);
382 tryWakeupThread(cap, target);
383 goto retry;
384 }
385
386 if ((target->flags & TSO_BLOCKEX) &&
387 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
388 blockedThrowTo(cap,target,msg);
389 unlockClosure((StgClosure *)mvar, info);
390 return THROWTO_BLOCKED;
391 } else {
392 // revoke the MVar operation
393 removeFromMVarBlockedQueue(target);
394 raiseAsync(cap, target, msg->exception, false, NULL);
395 unlockClosure((StgClosure *)mvar, info);
396 return THROWTO_SUCCESS;
397 }
398 }
399
400 case BlockedOnBlackHole:
401 {
402 if (target->flags & TSO_BLOCKEX) {
403 // BlockedOnBlackHole is not interruptible.
404 blockedThrowTo(cap,target,msg);
405 return THROWTO_BLOCKED;
406 } else {
407 // Revoke the message by replacing it with IND. We're not
408 // locking anything here, so we might still get a TRY_WAKEUP
409 // message from the owner of the blackhole some time in the
410 // future, but that doesn't matter.
411 ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
412 OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
413 raiseAsync(cap, target, msg->exception, false, NULL);
414 return THROWTO_SUCCESS;
415 }
416 }
417
418 case BlockedOnSTM:
419 lockTSO(target);
420 // Unblocking BlockedOnSTM threads requires the TSO to be
421 // locked; see STM.c:unpark_tso().
422 if (target->why_blocked != BlockedOnSTM) {
423 unlockTSO(target);
424 goto retry;
425 }
426 if ((target->flags & TSO_BLOCKEX) &&
427 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
428 blockedThrowTo(cap,target,msg);
429 unlockTSO(target);
430 return THROWTO_BLOCKED;
431 } else {
432 raiseAsync(cap, target, msg->exception, false, NULL);
433 unlockTSO(target);
434 return THROWTO_SUCCESS;
435 }
436
437 case BlockedOnCCall_Interruptible:
438 #ifdef THREADED_RTS
439 {
440 Task *task = NULL;
441 // walk suspended_ccalls to find the correct worker thread
442 InCall *incall;
443 for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
444 if (incall->suspended_tso == target) {
445 task = incall->task;
446 break;
447 }
448 }
449 if (task != NULL) {
450 blockedThrowTo(cap, target, msg);
451 if (!((target->flags & TSO_BLOCKEX) &&
452 ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
453 interruptWorkerTask(task);
454 }
455 return THROWTO_BLOCKED;
456 } else {
457 debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
458 }
459 // fall to next
460 }
461 #endif
462 case BlockedOnCCall:
463 blockedThrowTo(cap,target,msg);
464 return THROWTO_BLOCKED;
465
466 #ifndef THREADEDED_RTS
467 case BlockedOnRead:
468 case BlockedOnWrite:
469 case BlockedOnDelay:
470 #if defined(mingw32_HOST_OS)
471 case BlockedOnDoProc:
472 #endif
473 if ((target->flags & TSO_BLOCKEX) &&
474 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
475 blockedThrowTo(cap,target,msg);
476 return THROWTO_BLOCKED;
477 } else {
478 removeFromQueues(cap,target);
479 raiseAsync(cap, target, msg->exception, false, NULL);
480 return THROWTO_SUCCESS;
481 }
482 #endif
483
484 case ThreadMigrating:
485 // if it is ThreadMigrating and tso->cap is ours, then it
486 // *must* be migrating *to* this capability. If it were
487 // migrating away from the capability, then tso->cap would
488 // point to the destination.
489 //
490 // There is a MSG_WAKEUP in the message queue for this thread,
491 // but we can just do it preemptively:
492 tryWakeupThread(cap, target);
493 // and now retry, the thread should be runnable.
494 goto retry;
495
496 default:
497 barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
498 }
499 barf("throwTo");
500 }
501
502 static void
503 throwToSendMsg (Capability *cap STG_UNUSED,
504 Capability *target_cap USED_IF_THREADS,
505 MessageThrowTo *msg USED_IF_THREADS)
506
507 {
508 #ifdef THREADED_RTS
509 debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
510
511 sendMessage(cap, target_cap, (Message*)msg);
512 #endif
513 }
514
515 // Block a throwTo message on the target TSO's blocked_exceptions
516 // queue. The current Capability must own the target TSO in order to
517 // modify the blocked_exceptions queue.
518 void
519 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
520 {
521 debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
522 (unsigned long)target->id);
523
524 ASSERT(target->cap == cap);
525
526 msg->link = target->blocked_exceptions;
527 target->blocked_exceptions = msg;
528 dirty_TSO(cap,target); // we modified the blocked_exceptions queue
529 }
530
531 /* -----------------------------------------------------------------------------
532 Waking up threads blocked in throwTo
533
534 There are two ways to do this: maybePerformBlockedException() will
535 perform the throwTo() for the thread at the head of the queue
536 immediately, and leave the other threads on the queue.
537 maybePerformBlockedException() also checks the TSO_BLOCKEX flag
538 before raising an exception.
539
540 awakenBlockedExceptionQueue() will wake up all the threads in the
541 queue, but not perform any throwTo() immediately. This might be
542 more appropriate when the target thread is the one actually running
543 (see Exception.cmm).
544
545 Returns: non-zero if an exception was raised, zero otherwise.
546 -------------------------------------------------------------------------- */
547
548 int
549 maybePerformBlockedException (Capability *cap, StgTSO *tso)
550 {
551 MessageThrowTo *msg;
552 const StgInfoTable *i;
553 StgTSO *source;
554
555 if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
556 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
557 awakenBlockedExceptionQueue(cap,tso);
558 return 1;
559 } else {
560 return 0;
561 }
562 }
563
564 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
565 (tso->flags & TSO_BLOCKEX) != 0) {
566 debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
567 }
568
569 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
570 && ((tso->flags & TSO_BLOCKEX) == 0
571 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
572
573 // We unblock just the first thread on the queue, and perform
574 // its throw immediately.
575 loop:
576 msg = tso->blocked_exceptions;
577 if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
578 i = lockClosure((StgClosure*)msg);
579 tso->blocked_exceptions = (MessageThrowTo*)msg->link;
580 if (i == &stg_MSG_NULL_info) {
581 unlockClosure((StgClosure*)msg,i);
582 goto loop;
583 }
584
585 throwToSingleThreaded(cap, msg->target, msg->exception);
586 source = msg->source;
587 doneWithMsgThrowTo(msg);
588 tryWakeupThread(cap, source);
589 return 1;
590 }
591 return 0;
592 }
593
594 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
595 // blocked exceptions.
596
597 void
598 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
599 {
600 MessageThrowTo *msg;
601 const StgInfoTable *i;
602 StgTSO *source;
603
604 for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
605 msg = (MessageThrowTo*)msg->link) {
606 i = lockClosure((StgClosure *)msg);
607 if (i != &stg_MSG_NULL_info) {
608 source = msg->source;
609 doneWithMsgThrowTo(msg);
610 tryWakeupThread(cap, source);
611 } else {
612 unlockClosure((StgClosure *)msg,i);
613 }
614 }
615 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
616 }
617
618 /* -----------------------------------------------------------------------------
619 Remove a thread from blocking queues.
620
621 This is for use when we raise an exception in another thread, which
622 may be blocked.
623
624 Precondition: we have exclusive access to the TSO, via the same set
625 of conditions as throwToSingleThreaded() (c.f.).
626 -------------------------------------------------------------------------- */
627
628 static void
629 removeFromMVarBlockedQueue (StgTSO *tso)
630 {
631 StgMVar *mvar = (StgMVar*)tso->block_info.closure;
632 StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
633
634 if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
635 // already removed from this MVar
636 return;
637 }
638
639 // Assume the MVar is locked. (not assertable; sometimes it isn't
640 // actually WHITEHOLE'd).
641
642 // We want to remove the MVAR_TSO_QUEUE object from the queue. It
643 // isn't doubly-linked so we can't actually remove it; instead we
644 // just overwrite it with an IND if possible and let the GC short
645 // it out. However, we have to be careful to maintain the deque
646 // structure:
647
648 if (mvar->head == q) {
649 mvar->head = q->link;
650 OVERWRITE_INFO(q, &stg_IND_info);
651 if (mvar->tail == q) {
652 mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
653 }
654 }
655 else if (mvar->tail == q) {
656 // we can't replace it with an IND in this case, because then
657 // we lose the tail pointer when the GC shorts out the IND.
658 // So we use MSG_NULL as a kind of non-dupable indirection;
659 // these are ignored by takeMVar/putMVar.
660 OVERWRITE_INFO(q, &stg_MSG_NULL_info);
661 }
662 else {
663 OVERWRITE_INFO(q, &stg_IND_info);
664 }
665
666 // revoke the MVar operation
667 tso->_link = END_TSO_QUEUE;
668 }
669
670 static void
671 removeFromQueues(Capability *cap, StgTSO *tso)
672 {
673 switch (tso->why_blocked) {
674
675 case NotBlocked:
676 case ThreadMigrating:
677 return;
678
679 case BlockedOnSTM:
680 // Be careful: nothing to do here! We tell the scheduler that the
681 // thread is runnable and we leave it to the stack-walking code to
682 // abort the transaction while unwinding the stack. We should
683 // perhaps have a debugging test to make sure that this really
684 // happens and that the 'zombie' transaction does not get
685 // committed.
686 goto done;
687
688 case BlockedOnMVar:
689 case BlockedOnMVarRead:
690 removeFromMVarBlockedQueue(tso);
691 goto done;
692
693 case BlockedOnBlackHole:
694 // nothing to do
695 goto done;
696
697 case BlockedOnMsgThrowTo:
698 {
699 MessageThrowTo *m = tso->block_info.throwto;
700 // The message is locked by us, unless we got here via
701 // deleteAllThreads(), in which case we own all the
702 // capabilities.
703 // ASSERT(m->header.info == &stg_WHITEHOLE_info);
704
705 // unlock and revoke it at the same time
706 doneWithMsgThrowTo(m);
707 break;
708 }
709
710 #if !defined(THREADED_RTS)
711 case BlockedOnRead:
712 case BlockedOnWrite:
713 #if defined(mingw32_HOST_OS)
714 case BlockedOnDoProc:
715 #endif
716 removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
717 #if defined(mingw32_HOST_OS)
718 /* (Cooperatively) signal that the worker thread should abort
719 * the request.
720 */
721 abandonWorkRequest(tso->block_info.async_result->reqID);
722 #endif
723 goto done;
724
725 case BlockedOnDelay:
726 removeThreadFromQueue(cap, &sleeping_queue, tso);
727 goto done;
728 #endif
729
730 default:
731 barf("removeFromQueues: %d", tso->why_blocked);
732 }
733
734 done:
735 tso->why_blocked = NotBlocked;
736 appendToRunQueue(cap, tso);
737 }
738
739 /* -----------------------------------------------------------------------------
740 * raiseAsync()
741 *
742 * The following function implements the magic for raising an
743 * asynchronous exception in an existing thread.
744 *
745 * We first remove the thread from any queue on which it might be
746 * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
747 * TSO blocked_exception queues.
748 *
749 * We strip the stack down to the innermost CATCH_FRAME, building
750 * thunks in the heap for all the active computations, so they can
751 * be restarted if necessary. When we reach a CATCH_FRAME, we build
752 * an application of the handler to the exception, and push it on
753 * the top of the stack.
754 *
755 * How exactly do we save all the active computations? We create an
756 * AP_STACK for every UpdateFrame on the stack. Entering one of these
757 * AP_STACKs pushes everything from the corresponding update frame
758 * upwards onto the stack. (Actually, it pushes everything up to the
759 * next update frame plus a pointer to the next AP_STACK object.
760 * Entering the next AP_STACK object pushes more onto the stack until we
761 * reach the last AP_STACK object - at which point the stack should look
762 * exactly as it did when we killed the TSO and we can continue
763 * execution by entering the closure on top of the stack.
764 *
765 * We can also kill a thread entirely - this happens if either (a) the
766 * exception passed to raiseAsync is NULL, or (b) there's no
767 * CATCH_FRAME on the stack. In either case, we strip the entire
768 * stack and replace the thread with a zombie.
769 *
770 * ToDo: in THREADED_RTS mode, this function is only safe if either
771 * (a) we hold all the Capabilities (eg. in GC, or if there is only
772 * one Capability), or (b) we own the Capability that the TSO is
773 * currently blocked on or on the run queue of.
774 *
775 * -------------------------------------------------------------------------- */
776
777 StgTSO *
778 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
779 bool stop_at_atomically, StgUpdateFrame *stop_here)
780 {
781 const StgRetInfoTable *info;
782 StgPtr sp, frame;
783 StgClosure *updatee;
784 uint32_t i;
785 StgStack *stack;
786
787 debugTraceCap(DEBUG_sched, cap,
788 "raising exception in thread %ld.", (long)tso->id);
789
790 #if defined(PROFILING)
791 /*
792 * Debugging tool: on raising an exception, show where we are.
793 * See also Exception.cmm:stg_raisezh.
794 * This wasn't done for asynchronous exceptions originally; see #1450
795 */
796 if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
797 {
798 fprintCCS_stderr(tso->prof.cccs,exception,tso);
799 }
800 #endif
801 // ASSUMES: the thread is not already complete or dead
802 // Upper layers should deal with that.
803 ASSERT(tso->what_next != ThreadComplete &&
804 tso->what_next != ThreadKilled);
805
806 // only if we own this TSO (except that deleteThread() calls this
807 ASSERT(tso->cap == cap);
808
809 stack = tso->stackobj;
810
811 // mark it dirty; we're about to change its stack.
812 dirty_TSO(cap, tso);
813 dirty_STACK(cap, stack);
814
815 sp = stack->sp;
816
817 if (stop_here != NULL) {
818 updatee = stop_here->updatee;
819 } else {
820 updatee = NULL;
821 }
822
823 // The stack freezing code assumes there's a closure pointer on
824 // the top of the stack, so we have to arrange that this is the case...
825 //
826 if (sp[0] == (W_)&stg_enter_info) {
827 sp++;
828 } else {
829 sp--;
830 sp[0] = (W_)&stg_dummy_ret_closure;
831 }
832
833 frame = sp + 1;
834 while (stop_here == NULL || frame < (StgPtr)stop_here) {
835
836 // 1. Let the top of the stack be the "current closure"
837 //
838 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
839 // CATCH_FRAME.
840 //
841 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
842 // current closure applied to the chunk of stack up to (but not
843 // including) the update frame. This closure becomes the "current
844 // closure". Go back to step 2.
845 //
846 // 4. If it's a CATCH_FRAME, then leave the exception handler on
847 // top of the stack applied to the exception.
848 //
849 // 5. If it's a STOP_FRAME, then kill the thread.
850 //
851 // 6. If it's an UNDERFLOW_FRAME, then continue with the next
852 // stack chunk.
853 //
854 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
855 // transaction
856
857 info = get_ret_itbl((StgClosure *)frame);
858
859 switch (info->i.type) {
860
861 case UPDATE_FRAME:
862 {
863 StgAP_STACK * ap;
864 uint32_t words;
865
866 // First build an AP_STACK consisting of the stack chunk above the
867 // current update frame, with the top word on the stack as the
868 // fun field.
869 //
870 words = frame - sp - 1;
871 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
872
873 ap->size = words;
874 ap->fun = (StgClosure *)sp[0];
875 sp++;
876 for(i=0; i < words; ++i) {
877 ap->payload[i] = (StgClosure *)*sp++;
878 }
879
880 SET_HDR(ap,&stg_AP_STACK_info,
881 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
882 TICK_ALLOC_UP_THK(WDS(words+1),0);
883
884 //IF_DEBUG(scheduler,
885 // debugBelch("sched: Updating ");
886 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
887 // debugBelch(" with ");
888 // printObj((StgClosure *)ap);
889 // );
890
891 if (((StgUpdateFrame *)frame)->updatee == updatee) {
892 // If this update frame points to the same closure as
893 // the update frame further down the stack
894 // (stop_here), then don't perform the update. We
895 // want to keep the blackhole in this case, so we can
896 // detect and report the loop (#2783).
897 ap = (StgAP_STACK*)updatee;
898 } else {
899 // Perform the update
900 // TODO: this may waste some work, if the thunk has
901 // already been updated by another thread.
902 updateThunk(cap, tso,
903 ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
904 }
905
906 sp += sizeofW(StgUpdateFrame) - 1;
907 sp[0] = (W_)ap; // push onto stack
908 frame = sp + 1;
909 continue; //no need to bump frame
910 }
911
912 case UNDERFLOW_FRAME:
913 {
914 StgAP_STACK * ap;
915 uint32_t words;
916
917 // First build an AP_STACK consisting of the stack chunk above the
918 // current update frame, with the top word on the stack as the
919 // fun field.
920 //
921 words = frame - sp - 1;
922 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
923
924 ap->size = words;
925 ap->fun = (StgClosure *)sp[0];
926 sp++;
927 for(i=0; i < words; ++i) {
928 ap->payload[i] = (StgClosure *)*sp++;
929 }
930
931 SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
932 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
933 TICK_ALLOC_SE_THK(WDS(words+1),0);
934
935 stack->sp = sp;
936 threadStackUnderflow(cap,tso);
937 stack = tso->stackobj;
938 sp = stack->sp;
939
940 sp--;
941 sp[0] = (W_)ap;
942 frame = sp + 1;
943 continue;
944 }
945
946 case STOP_FRAME:
947 {
948 // We've stripped the entire stack, the thread is now dead.
949 tso->what_next = ThreadKilled;
950 stack->sp = frame + sizeofW(StgStopFrame);
951 goto done;
952 }
953
954 case CATCH_FRAME:
955 // If we find a CATCH_FRAME, and we've got an exception to raise,
956 // then build the THUNK raise(exception), and leave it on
957 // top of the CATCH_FRAME ready to enter.
958 //
959 {
960 StgCatchFrame *cf = (StgCatchFrame *)frame;
961 StgThunk *raise;
962
963 if (exception == NULL) break;
964
965 // we've got an exception to raise, so let's pass it to the
966 // handler in this frame.
967 //
968 raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
969 TICK_ALLOC_SE_THK(WDS(1),0);
970 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
971 raise->payload[0] = exception;
972
973 // throw away the stack from Sp up to the CATCH_FRAME.
974 //
975 sp = frame - 1;
976
977 /* Ensure that async exceptions are blocked now, so we don't get
978 * a surprise exception before we get around to executing the
979 * handler.
980 */
981 tso->flags |= TSO_BLOCKEX;
982 if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
983 tso->flags &= ~TSO_INTERRUPTIBLE;
984 } else {
985 tso->flags |= TSO_INTERRUPTIBLE;
986 }
987
988 /* Put the newly-built THUNK on top of the stack, ready to execute
989 * when the thread restarts.
990 */
991 sp[0] = (W_)raise;
992 sp[-1] = (W_)&stg_enter_info;
993 stack->sp = sp-1;
994 tso->what_next = ThreadRunGHC;
995 goto done;
996 }
997
998 case ATOMICALLY_FRAME:
999 if (stop_at_atomically) {
1000 ASSERT(tso->trec->enclosing_trec == NO_TREC);
1001 stmCondemnTransaction(cap, tso -> trec);
1002 stack->sp = frame - 2;
1003 // The ATOMICALLY_FRAME expects to be returned a
1004 // result from the transaction, which it stores in the
1005 // stack frame. Hence we arrange to return a dummy
1006 // result, so that the GC doesn't get upset (#3578).
1007 // Perhaps a better way would be to have a different
1008 // ATOMICALLY_FRAME instance for condemned
1009 // transactions, but I don't fully understand the
1010 // interaction with STM invariants.
1011 stack->sp[1] = (W_)&stg_NO_TREC_closure;
1012 stack->sp[0] = (W_)&stg_ret_p_info;
1013 tso->what_next = ThreadRunGHC;
1014 goto done;
1015 }
1016 else
1017 {
1018 // Freezing an STM transaction. Just aborting the
1019 // transaction would be wrong; this is what we used to
1020 // do, and it goes wrong if the ATOMICALLY_FRAME ever
1021 // gets back onto the stack again, which it will do if
1022 // the transaction is inside unsafePerformIO or
1023 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
1024 //
1025 // So we want to make it so that if the enclosing
1026 // computation is resumed, we will re-execute the
1027 // transaction. We therefore:
1028 //
1029 // 1. abort the current transaction
1030 // 3. replace the stack up to and including the
1031 // atomically frame with a closure representing
1032 // a call to "atomically x", where x is the code
1033 // of the transaction.
1034 // 4. continue stripping the stack
1035 //
1036 StgTRecHeader *trec = tso->trec;
1037 StgTRecHeader *outer = trec->enclosing_trec;
1038
1039 StgThunk *atomically;
1040 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
1041
1042 debugTraceCap(DEBUG_stm, cap,
1043 "raiseAsync: freezing atomically frame")
1044 stmAbortTransaction(cap, trec);
1045 stmFreeAbortedTRec(cap, trec);
1046 tso->trec = outer;
1047
1048 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
1049 TICK_ALLOC_SE_THK(1,0);
1050 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1051 atomically->payload[0] = af->code;
1052
1053 // discard stack up to and including the ATOMICALLY_FRAME
1054 frame += sizeofW(StgAtomicallyFrame);
1055 sp = frame - 1;
1056
1057 // replace the ATOMICALLY_FRAME with call to atomically#
1058 sp[0] = (W_)atomically;
1059 continue;
1060 }
1061
1062 case CATCH_STM_FRAME:
1063 case CATCH_RETRY_FRAME:
1064 // CATCH frames within an atomically block: abort the
1065 // inner transaction and continue. Eventually we will
1066 // hit the outer transaction that will get frozen (see
1067 // above).
1068 //
1069 // In this case (unlike ordinary exceptions) we do not care
1070 // whether the transaction is valid or not because its
1071 // possible validity cannot have caused the exception
1072 // and will not be visible after the abort.
1073 {
1074 StgTRecHeader *trec = tso -> trec;
1075 StgTRecHeader *outer = trec -> enclosing_trec;
1076 debugTraceCap(DEBUG_stm, cap,
1077 "found atomically block delivering async exception");
1078 stmAbortTransaction(cap, trec);
1079 stmFreeAbortedTRec(cap, trec);
1080 tso -> trec = outer;
1081 break;
1082 };
1083
1084 default:
1085 break;
1086 }
1087
1088 // move on to the next stack frame
1089 frame += stack_frame_sizeW((StgClosure *)frame);
1090 }
1091
1092 done:
1093 IF_DEBUG(sanity, checkTSO(tso));
1094
1095 // wake it up
1096 if (tso->why_blocked != NotBlocked) {
1097 tso->why_blocked = NotBlocked;
1098 appendToRunQueue(cap,tso);
1099 }
1100
1101 return tso;
1102 }