2123b8c3a49067562ba5ffc0ce51f12d08cbf308
[ghc.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2 *
3 * Non-blocking / asynchronous I/O for Win32.
4 *
5 * (c) sof, 2002-2003.
6 */
7 #include "Rts.h"
8 #include "IOManager.h"
9 #include "WorkQueue.h"
10 #include "ConsoleHandler.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <io.h>
14 #include <winsock.h>
15 #include <process.h>
16
17 /*
18 * Internal state maintained by the IO manager.
19 */
20 typedef struct IOManagerState {
21 CritSection manLock;
22 WorkQueue* workQueue;
23 int queueSize;
24 int numWorkers;
25 int workersIdle;
26 HANDLE hExitEvent;
27 unsigned int requestID;
28 /* fields for keeping track of active WorkItems */
29 CritSection active_work_lock;
30 WorkItem* active_work_items;
31 } IOManagerState;
32
33 /* ToDo: wrap up this state via a IOManager handle instead? */
34 static IOManagerState* ioMan;
35
36 static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
37 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
38
39 /*
40 * The routine executed by each worker thread.
41 */
42 static
43 unsigned
44 WINAPI
45 IOWorkerProc(PVOID param)
46 {
47 HANDLE hWaits[2];
48 DWORD rc;
49 IOManagerState* iom = (IOManagerState*)param;
50 WorkQueue* pq = iom->workQueue;
51 WorkItem* work;
52 int len = 0, fd = 0;
53 DWORD errCode = 0;
54 void* complData;
55
56 hWaits[0] = (HANDLE)iom->hExitEvent;
57 hWaits[1] = GetWorkQueueHandle(pq);
58
59 while (1) {
60 /* The error code is communicated back on completion of request; reset. */
61 errCode = 0;
62
63 EnterCriticalSection(&iom->manLock);
64 /* Signal that the worker is idle.
65 *
66 * 'workersIdle' is used when determining whether or not to
67 * increase the worker thread pool when adding a new request.
68 * (see addIORequest().)
69 */
70 iom->workersIdle++;
71 LeaveCriticalSection(&iom->manLock);
72
73 /*
74 * A possible future refinement is to make long-term idle threads
75 * wake up and decide to shut down should the number of idle threads
76 * be above some threshold.
77 *
78 */
79 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
80
81 if (rc == WAIT_OBJECT_0) {
82 // we received the exit event
83 EnterCriticalSection(&iom->manLock);
84 ioMan->numWorkers--;
85 LeaveCriticalSection(&iom->manLock);
86 return 0;
87 }
88
89 EnterCriticalSection(&iom->manLock);
90 /* Signal that the thread is 'non-idle' and about to consume
91 * a work item.
92 */
93 iom->workersIdle--;
94 iom->queueSize--;
95 LeaveCriticalSection(&iom->manLock);
96
97 if ( rc == (WAIT_OBJECT_0 + 1) ) {
98 /* work item available, fetch it. */
99 if (FetchWork(pq,(void**)&work)) {
100 work->abandonOp = 0;
101 RegisterWorkItem(iom,work);
102 if ( work->workKind & WORKER_READ ) {
103 if ( work->workKind & WORKER_FOR_SOCKET ) {
104 len = recv(work->workData.ioData.fd,
105 work->workData.ioData.buf,
106 work->workData.ioData.len,
107 0);
108 if (len == SOCKET_ERROR) {
109 errCode = WSAGetLastError();
110 }
111 } else {
112 while (1) {
113 /* Do the read(), with extra-special handling for Ctrl+C */
114 len = read(work->workData.ioData.fd,
115 work->workData.ioData.buf,
116 work->workData.ioData.len);
117 if ( len == 0 && work->workData.ioData.len != 0 ) {
118 /* Given the following scenario:
119 * - a console handler has been registered that handles Ctrl+C
120 * events.
121 * - we've not tweaked the 'console mode' settings to turn on
122 * ENABLE_PROCESSED_INPUT.
123 * - we're blocked waiting on input from standard input.
124 * - the user hits Ctrl+C.
125 *
126 * The OS will invoke the console handler (in a separate OS thread),
127 * and the above read() (i.e., under the hood, a ReadFile() op) returns
128 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
129 * want to percolate this error condition back to the Haskell user.
130 * Do this by waiting for the completion of the Haskell console handler.
131 * If upon completion of the console handler routine, the Haskell thread
132 * that issued the request is found to have been thrown an exception,
133 * the worker abandons the request (since that's what the Haskell thread
134 * has done.) If the Haskell thread hasn't been interrupted, the worker
135 * retries the read request as if nothing happened.
136 */
137 if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
138 /* For now, only abort when dealing with the standard input handle.
139 * i.e., for all others, an error is raised.
140 */
141 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
142 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
143 if (rts_waitConsoleHandlerCompletion()) {
144 /* If the Scheduler has set work->abandonOp, the Haskell thread has
145 * been thrown an exception (=> the worker must abandon this request.)
146 * We test for this below before invoking the on-completion routine.
147 */
148 if (work->abandonOp) {
149 break;
150 } else {
151 continue;
152 }
153 }
154 } else {
155 break; /* Treat it like an error */
156 }
157 } else {
158 break;
159 }
160 } else {
161 break;
162 }
163 }
164 if (len == -1) { errCode = errno; }
165 }
166 complData = work->workData.ioData.buf;
167 fd = work->workData.ioData.fd;
168 } else if ( work->workKind & WORKER_WRITE ) {
169 if ( work->workKind & WORKER_FOR_SOCKET ) {
170 len = send(work->workData.ioData.fd,
171 work->workData.ioData.buf,
172 work->workData.ioData.len,
173 0);
174 if (len == SOCKET_ERROR) {
175 errCode = WSAGetLastError();
176 }
177 } else {
178 len = write(work->workData.ioData.fd,
179 work->workData.ioData.buf,
180 work->workData.ioData.len);
181 if (len == -1) { errCode = errno; }
182 }
183 complData = work->workData.ioData.buf;
184 fd = work->workData.ioData.fd;
185 } else if ( work->workKind & WORKER_DELAY ) {
186 /* Approximate implementation of threadDelay;
187 *
188 * Note: Sleep() is in milliseconds, not micros.
189 */
190 Sleep(work->workData.delayData.msecs / 1000);
191 len = work->workData.delayData.msecs;
192 complData = NULL;
193 fd = 0;
194 errCode = 0;
195 } else if ( work->workKind & WORKER_DO_PROC ) {
196 /* perform operation/proc on behalf of Haskell thread. */
197 if (work->workData.procData.proc) {
198 /* The procedure is assumed to encode result + success/failure
199 * via its param.
200 */
201 errCode=work->workData.procData.proc(work->workData.procData.param);
202 } else {
203 errCode=1;
204 }
205 complData = work->workData.procData.param;
206 } else {
207 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
208 fflush(stderr);
209 continue;
210 }
211 if (!work->abandonOp) {
212 work->onCompletion(work->requestID,
213 fd,
214 len,
215 complData,
216 errCode);
217 }
218 /* Free the WorkItem */
219 DeregisterWorkItem(iom,work);
220 free(work);
221 } else {
222 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
223 EnterCriticalSection(&iom->manLock);
224 ioMan->numWorkers--;
225 LeaveCriticalSection(&iom->manLock);
226 return 1;
227 }
228 } else {
229 fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
230 EnterCriticalSection(&iom->manLock);
231 ioMan->numWorkers--;
232 LeaveCriticalSection(&iom->manLock);
233 return 1;
234 }
235 }
236 return 0;
237 }
238
239 static
240 BOOL
241 NewIOWorkerThread(IOManagerState* iom)
242 {
243 unsigned threadId;
244 return ( 0 != _beginthreadex(NULL,
245 0,
246 IOWorkerProc,
247 (LPVOID)iom,
248 0,
249 &threadId) );
250 }
251
252 BOOL
253 StartIOManager(void)
254 {
255 HANDLE hExit;
256 WorkQueue* wq;
257
258 wq = NewWorkQueue();
259 if ( !wq ) return FALSE;
260
261 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
262
263 if (!ioMan) {
264 FreeWorkQueue(wq);
265 return FALSE;
266 }
267
268 /* A manual-reset event */
269 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
270 if ( !hExit ) {
271 FreeWorkQueue(wq);
272 free(ioMan);
273 return FALSE;
274 }
275
276 ioMan->hExitEvent = hExit;
277 InitializeCriticalSection(&ioMan->manLock);
278 ioMan->workQueue = wq;
279 ioMan->numWorkers = 0;
280 ioMan->workersIdle = 0;
281 ioMan->queueSize = 0;
282 ioMan->requestID = 1;
283 InitializeCriticalSection(&ioMan->active_work_lock);
284 ioMan->active_work_items = NULL;
285
286 return TRUE;
287 }
288
289 /*
290 * Function: depositWorkItem()
291 *
292 * Local function which deposits a WorkItem onto a work queue,
293 * deciding in the process whether or not the thread pool needs
294 * to be augmented with another thread to handle the new request.
295 *
296 */
297 static
298 int
299 depositWorkItem( unsigned int reqID,
300 WorkItem* wItem )
301 {
302 EnterCriticalSection(&ioMan->manLock);
303
304 #if 0
305 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
306 fflush(stderr);
307 #endif
308 /* A new worker thread is created when there are fewer idle threads
309 * than non-consumed queue requests. This ensures that requests will
310 * be dealt with in a timely manner.
311 *
312 * [Long explanation of why the previous thread pool policy lead to
313 * trouble]
314 *
315 * Previously, the thread pool was augmented iff no idle worker threads
316 * were available. That strategy runs the risk of repeatedly adding to
317 * the request queue without expanding the thread pool to handle this
318 * sudden spike in queued requests.
319 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
320 * thread is created and the request is simply queued. If addIORequest()
321 * is called again _before the OS schedules a worker thread to pull the
322 * request off the queue_, workersIdle is still 1 and another request is
323 * simply added to the queue. Once the worker thread is run, only one
324 * request is de-queued, leaving the 2nd request in the queue]
325 *
326 * Assuming none of the queued requests take an inordinate amount of to
327 * complete, the request queue would eventually be drained. But if that's
328 * not the case, the later requests will end up languishing in the queue
329 * indefinitely. The non-timely handling of requests may cause CH applications
330 * to misbehave / hang; bad.
331 *
332 */
333 ioMan->queueSize++;
334 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
335 /* see if giving up our quantum ferrets out some idle threads.
336 */
337 LeaveCriticalSection(&ioMan->manLock);
338 Sleep(0);
339 EnterCriticalSection(&ioMan->manLock);
340 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
341 /* No, go ahead and create another. */
342 ioMan->numWorkers++;
343 if (!NewIOWorkerThread(ioMan)) {
344 ioMan->numWorkers--;
345 }
346 }
347 }
348 LeaveCriticalSection(&ioMan->manLock);
349
350 if (SubmitWork(ioMan->workQueue,wItem)) {
351 /* Note: the work item has potentially been consumed by a worker thread
352 * (and freed) at this point, so we cannot use wItem's requestID.
353 */
354 return reqID;
355 } else {
356 return 0;
357 }
358 }
359
360 /*
361 * Function: AddIORequest()
362 *
363 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
364 * request to work queue, deciding whether or not to augment
365 * the thread pool in the process.
366 */
367 int
368 AddIORequest ( int fd,
369 BOOL forWriting,
370 BOOL isSocket,
371 int len,
372 char* buffer,
373 CompletionProc onCompletion)
374 {
375 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
376 unsigned int reqID = ioMan->requestID++;
377 if (!ioMan || !wItem) return 0;
378
379 /* Fill in the blanks */
380 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
381 ( forWriting ? WORKER_WRITE : WORKER_READ );
382 wItem->workData.ioData.fd = fd;
383 wItem->workData.ioData.len = len;
384 wItem->workData.ioData.buf = buffer;
385 wItem->link = NULL;
386
387 wItem->onCompletion = onCompletion;
388 wItem->requestID = reqID;
389
390 return depositWorkItem(reqID, wItem);
391 }
392
393 /*
394 * Function: AddDelayRequest()
395 *
396 * Like AddIORequest(), but this time adding a delay request to
397 * the request queue.
398 */
399 BOOL
400 AddDelayRequest ( unsigned int msecs,
401 CompletionProc onCompletion)
402 {
403 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
404 unsigned int reqID = ioMan->requestID++;
405 if (!ioMan || !wItem) return FALSE;
406
407 /* Fill in the blanks */
408 wItem->workKind = WORKER_DELAY;
409 wItem->workData.delayData.msecs = msecs;
410 wItem->onCompletion = onCompletion;
411 wItem->requestID = reqID;
412 wItem->link = NULL;
413
414 return depositWorkItem(reqID, wItem);
415 }
416
417 /*
418 * Function: AddProcRequest()
419 *
420 * Add an asynchronous procedure request.
421 */
422 BOOL
423 AddProcRequest ( void* proc,
424 void* param,
425 CompletionProc onCompletion)
426 {
427 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
428 unsigned int reqID = ioMan->requestID++;
429 if (!ioMan || !wItem) return FALSE;
430
431 /* Fill in the blanks */
432 wItem->workKind = WORKER_DO_PROC;
433 wItem->workData.procData.proc = proc;
434 wItem->workData.procData.param = param;
435 wItem->onCompletion = onCompletion;
436 wItem->requestID = reqID;
437 wItem->abandonOp = 0;
438 wItem->link = NULL;
439
440 return depositWorkItem(reqID, wItem);
441 }
442
443 void ShutdownIOManager ( void )
444 {
445 int num;
446
447 SetEvent(ioMan->hExitEvent);
448
449 /* Wait for all worker threads to die. */
450 for (;;) {
451 EnterCriticalSection(&ioMan->manLock);
452 num = ioMan->numWorkers;
453 LeaveCriticalSection(&ioMan->manLock);
454 if (num == 0)
455 break;
456 Sleep(10);
457 }
458 FreeWorkQueue(ioMan->workQueue);
459 CloseHandle(ioMan->hExitEvent);
460 free(ioMan);
461 ioMan = NULL;
462 }
463
464 /* Keep track of WorkItems currently being serviced. */
465 static
466 void
467 RegisterWorkItem(IOManagerState* ioMan,
468 WorkItem* wi)
469 {
470 EnterCriticalSection(&ioMan->active_work_lock);
471 wi->link = ioMan->active_work_items;
472 ioMan->active_work_items = wi;
473 LeaveCriticalSection(&ioMan->active_work_lock);
474 }
475
476 static
477 void
478 DeregisterWorkItem(IOManagerState* ioMan,
479 WorkItem* wi)
480 {
481 WorkItem *ptr, *prev;
482
483 EnterCriticalSection(&ioMan->active_work_lock);
484 for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
485 if (wi->requestID == ptr->requestID) {
486 if (prev==NULL) {
487 ioMan->active_work_items = ptr->link;
488 } else {
489 prev->link = ptr->link;
490 }
491 LeaveCriticalSection(&ioMan->active_work_lock);
492 return;
493 }
494 }
495 fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
496 LeaveCriticalSection(&ioMan->active_work_lock);
497 }
498
499
500 /*
501 * Function: abandonWorkRequest()
502 *
503 * Signal that a work request isn't of interest. Called by the Scheduler
504 * if a blocked Haskell thread has an exception thrown to it.
505 *
506 * Note: we're not aborting the system call that a worker might be blocked on
507 * here, just disabling the propagation of its result once its finished. We
508 * may have to go the whole hog here and switch to overlapped I/O so that we
509 * can abort blocked system calls.
510 */
511 void
512 abandonWorkRequest ( int reqID )
513 {
514 WorkItem *ptr;
515 EnterCriticalSection(&ioMan->active_work_lock);
516 for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
517 if (ptr->requestID == (unsigned int)reqID ) {
518 ptr->abandonOp = 1;
519 LeaveCriticalSection(&ioMan->active_work_lock);
520 return;
521 }
522 }
523 /* Note: if the request ID isn't present, the worker will have
524 * finished sometime since awaitRequests() last drained the completed
525 * request table; i.e., not an error.
526 */
527 LeaveCriticalSection(&ioMan->active_work_lock);
528 }