Remove the Windows Async IO Manager completely in THREADED_RTS mode
[ghc.git] / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2 *
3 * Integrating Win32 asynchronous I/O with the GHC RTS.
4 *
5 * (c) sof, 2002-2003.
6 */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "RtsUtils.h"
12 #include <windows.h>
13 #include <stdio.h>
14 #include "Schedule.h"
15 #include "RtsFlags.h"
16 #include "Capability.h"
17 #include "win32/AsyncIO.h"
18 #include "win32/IOManager.h"
19
20 /*
21 * Overview:
22 *
23 * Haskell code issue asynchronous I/O requests via the
24 * async{Read,Write,DoOp}# primops. These cause addIORequest()
25 * to be invoked, which forwards the request to the underlying
26 * asynchronous I/O subsystem. Each request is tagged with a unique
27 * ID.
28 *
29 * addIORequest() returns this ID, so that when the blocked CH
30 * thread is added onto blocked_queue, its TSO is annotated with
31 * it. Upon completion of an I/O request, the async I/O handling
32 * code makes a back-call to signal its completion; the local
33 * onIOComplete() routine. It adds the IO request ID (along with
34 * its result data) to a queue of completed requests before returning.
35 *
36 * The queue of completed IO request is read by the thread operating
37 * the RTS scheduler. It de-queues the CH threads corresponding
38 * to the request IDs, making them runnable again.
39 *
40 */
41
42 typedef struct CompletedReq {
43 unsigned int reqID;
44 int len;
45 int errCode;
46 } CompletedReq;
47
48 #define MAX_REQUESTS 200
49
50 static CRITICAL_SECTION queue_lock;
51 static HANDLE completed_req_event = INVALID_HANDLE_VALUE;
52 static HANDLE abandon_req_wait = INVALID_HANDLE_VALUE;
53 static HANDLE wait_handles[2];
54 static CompletedReq completedTable[MAX_REQUESTS];
55 static int completed_hw;
56 static HANDLE completed_table_sema;
57 static int issued_reqs;
58
59 static void
60 onIOComplete(unsigned int reqID,
61 int fd STG_UNUSED,
62 int len,
63 void* buf STG_UNUSED,
64 int errCode)
65 {
66 DWORD dwRes;
67 /* Deposit result of request in queue/table..when there's room. */
68 dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
69 switch (dwRes) {
70 case WAIT_OBJECT_0:
71 break;
72 default:
73 /* Not likely */
74 fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
75 fflush(stderr);
76 return;
77 }
78 EnterCriticalSection(&queue_lock);
79 if (completed_hw == MAX_REQUESTS) {
80 /* Shouldn't happen */
81 fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
82 fflush(stderr);
83 } else {
84 #if 0
85 fprintf(stderr, "onCompl: %d %d %d %d %d\n",
86 reqID, len, errCode, issued_reqs, completed_hw);
87 fflush(stderr);
88 #endif
89 completedTable[completed_hw].reqID = reqID;
90 completedTable[completed_hw].len = len;
91 completedTable[completed_hw].errCode = errCode;
92 completed_hw++;
93 issued_reqs--;
94 if (completed_hw == 1) {
95 /* The event is used to wake up the scheduler thread should it
96 * be blocked waiting for requests to complete. The event resets once
97 * that thread has cleared out the request queue/table.
98 */
99 SetEvent(completed_req_event);
100 }
101 }
102 LeaveCriticalSection(&queue_lock);
103 }
104
105 unsigned int
106 addIORequest(int fd,
107 int forWriting,
108 int isSock,
109 int len,
110 char* buf)
111 {
112 EnterCriticalSection(&queue_lock);
113 issued_reqs++;
114 LeaveCriticalSection(&queue_lock);
115 #if 0
116 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
117 #endif
118 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
119 }
120
121 unsigned int
122 addDelayRequest(int msecs)
123 {
124 EnterCriticalSection(&queue_lock);
125 issued_reqs++;
126 LeaveCriticalSection(&queue_lock);
127 #if 0
128 fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
129 #endif
130 return AddDelayRequest(msecs,onIOComplete);
131 }
132
133 unsigned int
134 addDoProcRequest(void* proc, void* param)
135 {
136 EnterCriticalSection(&queue_lock);
137 issued_reqs++;
138 LeaveCriticalSection(&queue_lock);
139 #if 0
140 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
141 #endif
142 return AddProcRequest(proc,param,onIOComplete);
143 }
144
145
146 int
147 startupAsyncIO()
148 {
149 if (!StartIOManager()) {
150 return 0;
151 }
152 InitializeCriticalSection(&queue_lock);
153 /* Create a pair of events:
154 *
155 * - completed_req_event -- signals the deposit of request result; manual reset.
156 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
157 * thread to abandon wait for IO request completion.
158 * Auto reset.
159 */
160 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
161 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
162 wait_handles[0] = completed_req_event;
163 wait_handles[1] = abandon_req_wait;
164 completed_hw = 0;
165 if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
166 DWORD rc = GetLastError();
167 fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", (int)rc);
168 fflush(stderr);
169 }
170
171 return ( completed_req_event != INVALID_HANDLE_VALUE &&
172 abandon_req_wait != INVALID_HANDLE_VALUE &&
173 completed_table_sema != NULL );
174 }
175
176 void
177 shutdownAsyncIO()
178 {
179 ShutdownIOManager();
180 if (completed_req_event != INVALID_HANDLE_VALUE) {
181 CloseHandle(completed_req_event);
182 completed_req_event = INVALID_HANDLE_VALUE;
183 }
184 if (abandon_req_wait != INVALID_HANDLE_VALUE) {
185 CloseHandle(abandon_req_wait);
186 abandon_req_wait = INVALID_HANDLE_VALUE;
187 }
188 if (completed_table_sema != NULL) {
189 CloseHandle(completed_table_sema);
190 completed_table_sema = NULL;
191 }
192 }
193
194 /*
195 * Function: awaitRequests(wait)
196 *
197 * Check for the completion of external IO work requests. Worker
198 * threads signal completion of IO requests by depositing them
199 * in a table (completedTable). awaitRequests() matches up
200 * requests in that table with threads on the blocked_queue,
201 * making the threads whose IO requests have completed runnable
202 * again.
203 *
204 * awaitRequests() is called by the scheduler periodically _or_ if
205 * it is out of work, and need to wait for the completion of IO
206 * requests to make further progress. In the latter scenario,
207 * awaitRequests() will simply block waiting for worker threads
208 * to complete if the 'completedTable' is empty.
209 */
210 int
211 awaitRequests(rtsBool wait)
212 {
213 #ifndef THREADED_RTS
214 // none of this is actually used in the threaded RTS
215
216 start:
217 #if 0
218 fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
219 fflush(stderr);
220 #endif
221 EnterCriticalSection(&queue_lock);
222 /* Nothing immediately available & we won't wait */
223 if ((!wait && completed_hw == 0)
224 #if 0
225 // If we just return when wait==rtsFalse, we'll go into a busy
226 // wait loop, so I disabled this condition --SDM 18/12/2003
227 (issued_reqs == 0 && completed_hw == 0)
228 #endif
229 ) {
230 LeaveCriticalSection(&queue_lock);
231 return 0;
232 }
233 if (completed_hw == 0) {
234 /* empty table, drop lock and wait */
235 LeaveCriticalSection(&queue_lock);
236 if ( wait && sched_state == SCHED_RUNNING ) {
237 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
238 switch (dwRes) {
239 case WAIT_OBJECT_0:
240 /* a request was completed */
241 break;
242 case WAIT_OBJECT_0 + 1:
243 case WAIT_TIMEOUT:
244 /* timeout (unlikely) or told to abandon waiting */
245 return 0;
246 case WAIT_FAILED: {
247 DWORD dw = GetLastError();
248 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
249 return 0;
250 }
251 default:
252 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
253 return 0;
254 }
255 } else {
256 return 0;
257 }
258 goto start;
259 } else {
260 int i;
261 StgTSO *tso, *prev;
262
263 for (i=0; i < completed_hw; i++) {
264 /* For each of the completed requests, match up their Ids
265 * with those of the threads on the blocked_queue. If the
266 * thread that made the IO request has been subsequently
267 * killed (and removed from blocked_queue), no match will
268 * be found for that request Id.
269 *
270 * i.e., killing a Haskell thread doesn't attempt to cancel
271 * the IO request it is blocked on.
272 *
273 */
274 unsigned int rID = completedTable[i].reqID;
275
276 prev = NULL;
277 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
278
279 switch(tso->why_blocked) {
280 case BlockedOnRead:
281 case BlockedOnWrite:
282 case BlockedOnDoProc:
283 if (tso->block_info.async_result->reqID == rID) {
284 /* Found the thread blocked waiting on request; stodgily fill
285 * in its result block.
286 */
287 tso->block_info.async_result->len = completedTable[i].len;
288 tso->block_info.async_result->errCode = completedTable[i].errCode;
289
290 /* Drop the matched TSO from blocked_queue */
291 if (prev) {
292 prev->link = tso->link;
293 } else {
294 blocked_queue_hd = tso->link;
295 }
296 if (blocked_queue_tl == tso) {
297 blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
298 }
299
300 /* Terminates the run queue + this inner for-loop. */
301 tso->link = END_TSO_QUEUE;
302 tso->why_blocked = NotBlocked;
303 pushOnRunQueue(&MainCapability, tso);
304 break;
305 }
306 break;
307 default:
308 if (tso->why_blocked != NotBlocked) {
309 barf("awaitRequests: odd thread state");
310 }
311 break;
312 }
313 }
314 /* Signal that there's completed table slots available */
315 if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
316 DWORD dw = GetLastError();
317 fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", (int)dw);
318 fflush(stderr);
319 }
320 }
321 completed_hw = 0;
322 ResetEvent(completed_req_event);
323 LeaveCriticalSection(&queue_lock);
324 return 1;
325 }
326 #endif /* !THREADED_RTS */
327 }
328
329 /*
330 * Function: abandonRequestWait()
331 *
332 * Wake up a thread that's blocked waiting for new IO requests
333 * to complete (via awaitRequests().)
334 */
335 void
336 abandonRequestWait( void )
337 {
338 /* the event is auto-reset, but in case there's no thread
339 * already waiting on the event, we want to return it to
340 * a non-signalled state.
341 *
342 * Careful! There is no synchronisation between
343 * abandonRequestWait and awaitRequest, which means that
344 * abandonRequestWait might be called just before a thread
345 * goes into a wait, and we miss the abandon signal. So we
346 * must SetEvent() here rather than PulseEvent() to ensure
347 * that the event isn't lost. We can re-optimise by resetting
348 * the event somewhere safe if we know the event has been
349 * properly serviced (see resetAbandon() below). --SDM 18/12/2003
350 */
351 SetEvent(abandon_req_wait);
352 }
353
354 void
355 resetAbandonRequestWait( void )
356 {
357 ResetEvent(abandon_req_wait);
358 }
359
360 #endif /* !defined(THREADED_RTS) */