Merge branch 'master' of darcs.haskell.org:/srv/darcs//ghc
[ghc.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2 *
3 * Non-blocking / asynchronous I/O for Win32.
4 *
5 * (c) sof, 2002-2003.
6 */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "IOManager.h"
12 #include "WorkQueue.h"
13 #include "ConsoleHandler.h"
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <io.h>
17 #include <winsock.h>
18 #include <process.h>
19 #include <errno.h>
20
21 /*
22 * Internal state maintained by the IO manager.
23 */
24 typedef struct IOManagerState {
25 CritSection manLock;
26 WorkQueue* workQueue;
27 int queueSize;
28 int numWorkers;
29 int workersIdle;
30 HANDLE hExitEvent;
31 unsigned int requestID;
32 /* fields for keeping track of active WorkItems */
33 CritSection active_work_lock;
34 WorkItem* active_work_items;
35 UINT sleepResolution;
36 } IOManagerState;
37
38 /* ToDo: wrap up this state via a IOManager handle instead? */
39 static IOManagerState* ioMan;
40
41 static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
42 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
43
44 /*
45 * The routine executed by each worker thread.
46 */
47 static
48 unsigned
49 WINAPI
50 IOWorkerProc(PVOID param)
51 {
52 HANDLE hWaits[2];
53 DWORD rc;
54 IOManagerState* iom = (IOManagerState*)param;
55 WorkQueue* pq = iom->workQueue;
56 WorkItem* work;
57 int len = 0, fd = 0;
58 DWORD errCode = 0;
59 void* complData;
60
61 hWaits[0] = (HANDLE)iom->hExitEvent;
62 hWaits[1] = GetWorkQueueHandle(pq);
63
64 while (1) {
65 /* The error code is communicated back on completion of request; reset. */
66 errCode = 0;
67
68 EnterCriticalSection(&iom->manLock);
69 /* Signal that the worker is idle.
70 *
71 * 'workersIdle' is used when determining whether or not to
72 * increase the worker thread pool when adding a new request.
73 * (see addIORequest().)
74 */
75 iom->workersIdle++;
76 LeaveCriticalSection(&iom->manLock);
77
78 /*
79 * A possible future refinement is to make long-term idle threads
80 * wake up and decide to shut down should the number of idle threads
81 * be above some threshold.
82 *
83 */
84 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
85
86 if (rc == WAIT_OBJECT_0) {
87 // we received the exit event
88 EnterCriticalSection(&iom->manLock);
89 ioMan->numWorkers--;
90 LeaveCriticalSection(&iom->manLock);
91 return 0;
92 }
93
94 EnterCriticalSection(&iom->manLock);
95 /* Signal that the thread is 'non-idle' and about to consume
96 * a work item.
97 */
98 iom->workersIdle--;
99 iom->queueSize--;
100 LeaveCriticalSection(&iom->manLock);
101
102 if ( rc == (WAIT_OBJECT_0 + 1) ) {
103 /* work item available, fetch it. */
104 if (FetchWork(pq,(void**)&work)) {
105 work->abandonOp = 0;
106 RegisterWorkItem(iom,work);
107 if ( work->workKind & WORKER_READ ) {
108 if ( work->workKind & WORKER_FOR_SOCKET ) {
109 len = recv(work->workData.ioData.fd,
110 work->workData.ioData.buf,
111 work->workData.ioData.len,
112 0);
113 if (len == SOCKET_ERROR) {
114 errCode = WSAGetLastError();
115 }
116 } else {
117 while (1) {
118 /* Do the read(), with extra-special handling for Ctrl+C */
119 len = read(work->workData.ioData.fd,
120 work->workData.ioData.buf,
121 work->workData.ioData.len);
122 if ( len == 0 && work->workData.ioData.len != 0 ) {
123 /* Given the following scenario:
124 * - a console handler has been registered that handles Ctrl+C
125 * events.
126 * - we've not tweaked the 'console mode' settings to turn on
127 * ENABLE_PROCESSED_INPUT.
128 * - we're blocked waiting on input from standard input.
129 * - the user hits Ctrl+C.
130 *
131 * The OS will invoke the console handler (in a separate OS thread),
132 * and the above read() (i.e., under the hood, a ReadFile() op) returns
133 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
134 * want to percolate this error condition back to the Haskell user.
135 * Do this by waiting for the completion of the Haskell console handler.
136 * If upon completion of the console handler routine, the Haskell thread
137 * that issued the request is found to have been thrown an exception,
138 * the worker abandons the request (since that's what the Haskell thread
139 * has done.) If the Haskell thread hasn't been interrupted, the worker
140 * retries the read request as if nothing happened.
141 */
142 if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
143 /* For now, only abort when dealing with the standard input handle.
144 * i.e., for all others, an error is raised.
145 */
146 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
147 if ( _get_osfhandle(work->workData.ioData.fd) == (intptr_t)h ) {
148 if (rts_waitConsoleHandlerCompletion()) {
149 /* If the Scheduler has set work->abandonOp, the Haskell thread has
150 * been thrown an exception (=> the worker must abandon this request.)
151 * We test for this below before invoking the on-completion routine.
152 */
153 if (work->abandonOp) {
154 break;
155 } else {
156 continue;
157 }
158 }
159 } else {
160 break; /* Treat it like an error */
161 }
162 } else {
163 break;
164 }
165 } else {
166 break;
167 }
168 }
169 if (len == -1) { errCode = errno; }
170 }
171 complData = work->workData.ioData.buf;
172 fd = work->workData.ioData.fd;
173 } else if ( work->workKind & WORKER_WRITE ) {
174 if ( work->workKind & WORKER_FOR_SOCKET ) {
175 len = send(work->workData.ioData.fd,
176 work->workData.ioData.buf,
177 work->workData.ioData.len,
178 0);
179 if (len == SOCKET_ERROR) {
180 errCode = WSAGetLastError();
181 }
182 } else {
183 len = write(work->workData.ioData.fd,
184 work->workData.ioData.buf,
185 work->workData.ioData.len);
186 if (len == -1) {
187 errCode = errno;
188 // write() gets errno wrong for
189 // ERROR_NO_DATA, we have to fix it here:
190 if (errCode == EINVAL &&
191 GetLastError() == ERROR_NO_DATA) {
192 errCode = EPIPE;
193 }
194 }
195 }
196 complData = work->workData.ioData.buf;
197 fd = work->workData.ioData.fd;
198 } else if ( work->workKind & WORKER_DELAY ) {
199 /* Approximate implementation of threadDelay;
200 *
201 * Note: Sleep() is in milliseconds, not micros.
202 */
203 Sleep(((work->workData.delayData.usecs + 999) / 1000) + iom->sleepResolution - 1);
204 len = work->workData.delayData.usecs;
205 complData = NULL;
206 fd = 0;
207 errCode = 0;
208 } else if ( work->workKind & WORKER_DO_PROC ) {
209 /* perform operation/proc on behalf of Haskell thread. */
210 if (work->workData.procData.proc) {
211 /* The procedure is assumed to encode result + success/failure
212 * via its param.
213 */
214 errCode=work->workData.procData.proc(work->workData.procData.param);
215 } else {
216 errCode=1;
217 }
218 complData = work->workData.procData.param;
219 } else {
220 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
221 fflush(stderr);
222 continue;
223 }
224 if (!work->abandonOp) {
225 work->onCompletion(work->requestID,
226 fd,
227 len,
228 complData,
229 errCode);
230 }
231 /* Free the WorkItem */
232 DeregisterWorkItem(iom,work);
233 free(work);
234 } else {
235 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
236 EnterCriticalSection(&iom->manLock);
237 ioMan->numWorkers--;
238 LeaveCriticalSection(&iom->manLock);
239 return 1;
240 }
241 } else {
242 fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
243 EnterCriticalSection(&iom->manLock);
244 ioMan->numWorkers--;
245 LeaveCriticalSection(&iom->manLock);
246 return 1;
247 }
248 }
249 return 0;
250 }
251
252 static
253 BOOL
254 NewIOWorkerThread(IOManagerState* iom)
255 {
256 unsigned threadId;
257 return ( 0 != _beginthreadex(NULL,
258 0,
259 IOWorkerProc,
260 (LPVOID)iom,
261 0,
262 &threadId) );
263 }
264
265 BOOL
266 StartIOManager(void)
267 {
268 HANDLE hExit;
269 WorkQueue* wq;
270 UINT sleepResolution;
271 TIMECAPS timecaps;
272 MMRESULT mmresult;
273
274 mmresult = timeGetDevCaps(&timecaps, sizeof(timecaps));
275 if (mmresult != MMSYSERR_NOERROR) {
276 return FALSE;
277 }
278 sleepResolution = timecaps.wPeriodMin;
279 mmresult = timeBeginPeriod(sleepResolution);
280 if (mmresult != MMSYSERR_NOERROR) {
281 return FALSE;
282 }
283
284 wq = NewWorkQueue();
285 if ( !wq ) return FALSE;
286
287 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
288
289 if (!ioMan) {
290 FreeWorkQueue(wq);
291 return FALSE;
292 }
293
294 /* A manual-reset event */
295 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
296 if ( !hExit ) {
297 FreeWorkQueue(wq);
298 free(ioMan);
299 return FALSE;
300 }
301
302 ioMan->hExitEvent = hExit;
303 InitializeCriticalSection(&ioMan->manLock);
304 ioMan->workQueue = wq;
305 ioMan->numWorkers = 0;
306 ioMan->workersIdle = 0;
307 ioMan->queueSize = 0;
308 ioMan->requestID = 1;
309 InitializeCriticalSection(&ioMan->active_work_lock);
310 ioMan->active_work_items = NULL;
311 ioMan->sleepResolution = sleepResolution;
312
313 return TRUE;
314 }
315
316 /*
317 * Function: depositWorkItem()
318 *
319 * Local function which deposits a WorkItem onto a work queue,
320 * deciding in the process whether or not the thread pool needs
321 * to be augmented with another thread to handle the new request.
322 *
323 */
324 static
325 int
326 depositWorkItem( unsigned int reqID,
327 WorkItem* wItem )
328 {
329 EnterCriticalSection(&ioMan->manLock);
330
331 #if 0
332 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
333 fflush(stderr);
334 #endif
335 /* A new worker thread is created when there are fewer idle threads
336 * than non-consumed queue requests. This ensures that requests will
337 * be dealt with in a timely manner.
338 *
339 * [Long explanation of why the previous thread pool policy lead to
340 * trouble]
341 *
342 * Previously, the thread pool was augmented iff no idle worker threads
343 * were available. That strategy runs the risk of repeatedly adding to
344 * the request queue without expanding the thread pool to handle this
345 * sudden spike in queued requests.
346 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
347 * thread is created and the request is simply queued. If addIORequest()
348 * is called again _before the OS schedules a worker thread to pull the
349 * request off the queue_, workersIdle is still 1 and another request is
350 * simply added to the queue. Once the worker thread is run, only one
351 * request is de-queued, leaving the 2nd request in the queue]
352 *
353 * Assuming none of the queued requests take an inordinate amount of to
354 * complete, the request queue would eventually be drained. But if that's
355 * not the case, the later requests will end up languishing in the queue
356 * indefinitely. The non-timely handling of requests may cause CH applications
357 * to misbehave / hang; bad.
358 *
359 */
360 ioMan->queueSize++;
361 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
362 /* see if giving up our quantum ferrets out some idle threads.
363 */
364 LeaveCriticalSection(&ioMan->manLock);
365 Sleep(0);
366 EnterCriticalSection(&ioMan->manLock);
367 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
368 /* No, go ahead and create another. */
369 ioMan->numWorkers++;
370 if (!NewIOWorkerThread(ioMan)) {
371 ioMan->numWorkers--;
372 }
373 }
374 }
375 LeaveCriticalSection(&ioMan->manLock);
376
377 if (SubmitWork(ioMan->workQueue,wItem)) {
378 /* Note: the work item has potentially been consumed by a worker thread
379 * (and freed) at this point, so we cannot use wItem's requestID.
380 */
381 return reqID;
382 } else {
383 return 0;
384 }
385 }
386
387 /*
388 * Function: AddIORequest()
389 *
390 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
391 * request to work queue, deciding whether or not to augment
392 * the thread pool in the process.
393 */
394 int
395 AddIORequest ( int fd,
396 BOOL forWriting,
397 BOOL isSocket,
398 int len,
399 char* buffer,
400 CompletionProc onCompletion)
401 {
402 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
403 unsigned int reqID = ioMan->requestID++;
404 if (!ioMan || !wItem) return 0;
405
406 /* Fill in the blanks */
407 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
408 ( forWriting ? WORKER_WRITE : WORKER_READ );
409 wItem->workData.ioData.fd = fd;
410 wItem->workData.ioData.len = len;
411 wItem->workData.ioData.buf = buffer;
412 wItem->link = NULL;
413
414 wItem->onCompletion = onCompletion;
415 wItem->requestID = reqID;
416
417 return depositWorkItem(reqID, wItem);
418 }
419
420 /*
421 * Function: AddDelayRequest()
422 *
423 * Like AddIORequest(), but this time adding a delay request to
424 * the request queue.
425 */
426 BOOL
427 AddDelayRequest ( unsigned int usecs,
428 CompletionProc onCompletion)
429 {
430 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
431 unsigned int reqID = ioMan->requestID++;
432 if (!ioMan || !wItem) return FALSE;
433
434 /* Fill in the blanks */
435 wItem->workKind = WORKER_DELAY;
436 wItem->workData.delayData.usecs = usecs;
437 wItem->onCompletion = onCompletion;
438 wItem->requestID = reqID;
439 wItem->link = NULL;
440
441 return depositWorkItem(reqID, wItem);
442 }
443
444 /*
445 * Function: AddProcRequest()
446 *
447 * Add an asynchronous procedure request.
448 */
449 BOOL
450 AddProcRequest ( void* proc,
451 void* param,
452 CompletionProc onCompletion)
453 {
454 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
455 unsigned int reqID = ioMan->requestID++;
456 if (!ioMan || !wItem) return FALSE;
457
458 /* Fill in the blanks */
459 wItem->workKind = WORKER_DO_PROC;
460 wItem->workData.procData.proc = proc;
461 wItem->workData.procData.param = param;
462 wItem->onCompletion = onCompletion;
463 wItem->requestID = reqID;
464 wItem->abandonOp = 0;
465 wItem->link = NULL;
466
467 return depositWorkItem(reqID, wItem);
468 }
469
470 void ShutdownIOManager ( rtsBool wait_threads )
471 {
472 int num;
473 MMRESULT mmresult;
474
475 SetEvent(ioMan->hExitEvent);
476
477 if (wait_threads) {
478 /* Wait for all worker threads to die. */
479 for (;;) {
480 EnterCriticalSection(&ioMan->manLock);
481 num = ioMan->numWorkers;
482 LeaveCriticalSection(&ioMan->manLock);
483 if (num == 0)
484 break;
485 Sleep(10);
486 }
487 FreeWorkQueue(ioMan->workQueue);
488 CloseHandle(ioMan->hExitEvent);
489 DeleteCriticalSection(&ioMan->active_work_lock);
490 DeleteCriticalSection(&ioMan->manLock);
491
492 mmresult = timeEndPeriod(ioMan->sleepResolution);
493 if (mmresult != MMSYSERR_NOERROR) {
494 barf("timeEndPeriod failed");
495 }
496
497 free(ioMan);
498 ioMan = NULL;
499 }
500 }
501
502 /* Keep track of WorkItems currently being serviced. */
503 static
504 void
505 RegisterWorkItem(IOManagerState* ioMan,
506 WorkItem* wi)
507 {
508 EnterCriticalSection(&ioMan->active_work_lock);
509 wi->link = ioMan->active_work_items;
510 ioMan->active_work_items = wi;
511 LeaveCriticalSection(&ioMan->active_work_lock);
512 }
513
514 static
515 void
516 DeregisterWorkItem(IOManagerState* ioMan,
517 WorkItem* wi)
518 {
519 WorkItem *ptr, *prev;
520
521 EnterCriticalSection(&ioMan->active_work_lock);
522 for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
523 if (wi->requestID == ptr->requestID) {
524 if (prev==NULL) {
525 ioMan->active_work_items = ptr->link;
526 } else {
527 prev->link = ptr->link;
528 }
529 LeaveCriticalSection(&ioMan->active_work_lock);
530 return;
531 }
532 }
533 fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
534 LeaveCriticalSection(&ioMan->active_work_lock);
535 }
536
537
538 /*
539 * Function: abandonWorkRequest()
540 *
541 * Signal that a work request isn't of interest. Called by the Scheduler
542 * if a blocked Haskell thread has an exception thrown to it.
543 *
544 * Note: we're not aborting the system call that a worker might be blocked on
545 * here, just disabling the propagation of its result once its finished. We
546 * may have to go the whole hog here and switch to overlapped I/O so that we
547 * can abort blocked system calls.
548 */
549 void
550 abandonWorkRequest ( int reqID )
551 {
552 WorkItem *ptr;
553 EnterCriticalSection(&ioMan->active_work_lock);
554 for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
555 if (ptr->requestID == (unsigned int)reqID ) {
556 ptr->abandonOp = 1;
557 LeaveCriticalSection(&ioMan->active_work_lock);
558 return;
559 }
560 }
561 /* Note: if the request ID isn't present, the worker will have
562 * finished sometime since awaitRequests() last drained the completed
563 * request table; i.e., not an error.
564 */
565 LeaveCriticalSection(&ioMan->active_work_lock);
566 }
567
568 #endif