rts: delint/detab/dewhitespace win32/OSThreads.c
[ghc.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2 *
3 * Non-blocking / asynchronous I/O for Win32.
4 *
5 * (c) sof, 2002-2003.
6 */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "IOManager.h"
12 #include "WorkQueue.h"
13 #include "ConsoleHandler.h"
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <io.h>
17 #include <winsock.h>
18 #include <process.h>
19 #include <errno.h>
20
21 /*
22 * Internal state maintained by the IO manager.
23 */
24 typedef struct IOManagerState {
25 CritSection manLock;
26 WorkQueue* workQueue;
27 int queueSize;
28 int numWorkers;
29 int workersIdle;
30 HANDLE hExitEvent;
31 unsigned int requestID;
32 /* fields for keeping track of active WorkItems */
33 CritSection active_work_lock;
34 WorkItem* active_work_items;
35 UINT sleepResolution;
36 } IOManagerState;
37
38 /* ToDo: wrap up this state via a IOManager handle instead? */
39 static IOManagerState* ioMan;
40
41 static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
42 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
43
44 /*
45 * The routine executed by each worker thread.
46 */
47 static
48 unsigned
49 WINAPI
50 IOWorkerProc(PVOID param)
51 {
52 HANDLE hWaits[2];
53 DWORD rc;
54 IOManagerState* iom = (IOManagerState*)param;
55 WorkQueue* pq = iom->workQueue;
56 WorkItem* work;
57 int len = 0, fd = 0;
58 DWORD errCode = 0;
59 void* complData;
60
61 hWaits[0] = (HANDLE)iom->hExitEvent;
62 hWaits[1] = GetWorkQueueHandle(pq);
63
64 while (1) {
65 // The error code is communicated back on completion of request; reset.
66 errCode = 0;
67
68 EnterCriticalSection(&iom->manLock);
69 /* Signal that the worker is idle.
70 *
71 * 'workersIdle' is used when determining whether or not to
72 * increase the worker thread pool when adding a new request.
73 * (see addIORequest().)
74 */
75 iom->workersIdle++;
76 LeaveCriticalSection(&iom->manLock);
77
78 /*
79 * A possible future refinement is to make long-term idle threads
80 * wake up and decide to shut down should the number of idle threads
81 * be above some threshold.
82 *
83 */
84 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
85
86 if (rc == WAIT_OBJECT_0) {
87 // we received the exit event
88 EnterCriticalSection(&iom->manLock);
89 ioMan->numWorkers--;
90 LeaveCriticalSection(&iom->manLock);
91 return 0;
92 }
93
94 EnterCriticalSection(&iom->manLock);
95 /* Signal that the thread is 'non-idle' and about to consume
96 * a work item.
97 */
98 iom->workersIdle--;
99 iom->queueSize--;
100 LeaveCriticalSection(&iom->manLock);
101
102 if ( rc == (WAIT_OBJECT_0 + 1) ) {
103 /* work item available, fetch it. */
104 if (FetchWork(pq,(void**)&work)) {
105 work->abandonOp = 0;
106 RegisterWorkItem(iom,work);
107 if ( work->workKind & WORKER_READ ) {
108 if ( work->workKind & WORKER_FOR_SOCKET ) {
109 len = recv(work->workData.ioData.fd,
110 work->workData.ioData.buf,
111 work->workData.ioData.len,
112 0);
113 if (len == SOCKET_ERROR) {
114 errCode = WSAGetLastError();
115 }
116 } else {
117 while (1) {
118 // Do the read(), with extra-special handling for Ctrl+C
119 len = read(work->workData.ioData.fd,
120 work->workData.ioData.buf,
121 work->workData.ioData.len);
122 if ( len == 0 && work->workData.ioData.len != 0 ) {
123 /* Given the following scenario:
124 * - a console handler has been registered
125 * that handles Ctrl+C events.
126 * - we've not tweaked the 'console mode'
127 * settings to turn on
128 * ENABLE_PROCESSED_INPUT.
129 * - we're blocked waiting on input from
130 standard input.
131 * - the user hits Ctrl+C.
132 *
133 * The OS will invoke the console handler
134 * (in a separate OS thread), and the
135 * above read() (i.e., under the hood, a
136 * ReadFile() op) returns 0, with the
137 * error set to
138 * ERROR_OPERATION_ABORTED. We don't want
139 * to percolate this error condition back
140 * to the Haskell user. Do this by
141 * waiting for the completion of the
142 * Haskell console handler. If upon
143 * completion of the console handler
144 * routine, the Haskell thread that issued
145 * the request is found to have been
146 * thrown an exception, the worker
147 * abandons the request (since that's what
148 * the Haskell thread has done.) If the
149 * Haskell thread hasn't been interrupted,
150 * the worker retries the read request as
151 * if nothing happened.
152 */
153 if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
154 /* For now, only abort when dealing
155 * with the standard input handle.
156 * i.e., for all others, an error is
157 * raised.
158 */
159 HANDLE h =
160 (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
161 int iofd = work->workData.ioData.fd;
162 if ( _get_osfhandle(iofd) == (intptr_t)h ) {
163 if (rts_waitConsoleHandlerCompletion()) {
164 /* If the Scheduler has set
165 * work->abandonOp, the
166 * Haskell thread has been
167 * thrown an exception (=> the
168 * worker must abandon this
169 * request.) We test for this
170 * below before invoking the
171 * on-completion routine.
172 */
173 if (work->abandonOp) {
174 break;
175 } else {
176 continue;
177 }
178 }
179 } else {
180 break; /* Treat it like an error */
181 }
182 } else {
183 break;
184 }
185 } else {
186 break;
187 }
188 }
189 if (len == -1) { errCode = errno; }
190 }
191 complData = work->workData.ioData.buf;
192 fd = work->workData.ioData.fd;
193 } else if ( work->workKind & WORKER_WRITE ) {
194 if ( work->workKind & WORKER_FOR_SOCKET ) {
195 len = send(work->workData.ioData.fd,
196 work->workData.ioData.buf,
197 work->workData.ioData.len,
198 0);
199 if (len == SOCKET_ERROR) {
200 errCode = WSAGetLastError();
201 }
202 } else {
203 len = write(work->workData.ioData.fd,
204 work->workData.ioData.buf,
205 work->workData.ioData.len);
206 if (len == -1) {
207 errCode = errno;
208 // write() gets errno wrong for
209 // ERROR_NO_DATA, we have to fix it here:
210 if (errCode == EINVAL &&
211 GetLastError() == ERROR_NO_DATA) {
212 errCode = EPIPE;
213 }
214 }
215 }
216 complData = work->workData.ioData.buf;
217 fd = work->workData.ioData.fd;
218 } else if ( work->workKind & WORKER_DELAY ) {
219 /* Approximate implementation of threadDelay;
220 *
221 * Note: Sleep() is in milliseconds, not micros.
222 *
223 * MSDN says of Sleep:
224 * If dwMilliseconds is greater than one tick
225 * but less than two, the wait can be anywhere
226 * between one and two ticks, and so on.
227 *
228 * so we need to add (milliseconds-per-tick - 1)
229 * to the amount of time we sleep for.
230 *
231 * test ThreadDelay001 fails if we get this wrong.
232 */
233 Sleep(((work->workData.delayData.usecs + 999) / 1000)
234 + iom->sleepResolution - 1);
235 len = work->workData.delayData.usecs;
236 complData = NULL;
237 fd = 0;
238 errCode = 0;
239 } else if ( work->workKind & WORKER_DO_PROC ) {
240 // perform operation/proc on behalf of Haskell thread.
241 if (work->workData.procData.proc) {
242 // The procedure is assumed to encode result +
243 // success/failure via its param.
244 void* param = work->workData.procData.param;
245 errCode=work->workData.procData.proc(param);
246 } else {
247 errCode=1;
248 }
249 complData = work->workData.procData.param;
250 } else {
251 fprintf(stderr, "unknown work request type (%d), "
252 "ignoring.\n", work->workKind);
253 fflush(stderr);
254 continue;
255 }
256 if (!work->abandonOp) {
257 work->onCompletion(work->requestID,
258 fd,
259 len,
260 complData,
261 errCode);
262 }
263 // Free the WorkItem
264 DeregisterWorkItem(iom,work);
265 free(work);
266 } else {
267 fprintf(stderr, "unable to fetch work; fatal.\n");
268 fflush(stderr);
269 EnterCriticalSection(&iom->manLock);
270 ioMan->numWorkers--;
271 LeaveCriticalSection(&iom->manLock);
272 return 1;
273 }
274 } else {
275 fprintf(stderr, "waiting failed (%lu); fatal.\n", rc);
276 fflush(stderr);
277 EnterCriticalSection(&iom->manLock);
278 ioMan->numWorkers--;
279 LeaveCriticalSection(&iom->manLock);
280 return 1;
281 }
282 }
283 return 0;
284 }
285
286 static
287 BOOL
288 NewIOWorkerThread(IOManagerState* iom)
289 {
290 unsigned threadId;
291 return ( 0 != _beginthreadex(NULL,
292 0,
293 IOWorkerProc,
294 (LPVOID)iom,
295 0,
296 &threadId) );
297 }
298
299 BOOL
300 StartIOManager(void)
301 {
302 HANDLE hExit;
303 WorkQueue* wq;
304 UINT sleepResolution;
305 TIMECAPS timecaps;
306 MMRESULT mmresult;
307
308 mmresult = timeGetDevCaps(&timecaps, sizeof(timecaps));
309 if (mmresult != MMSYSERR_NOERROR) {
310 return FALSE;
311 }
312 sleepResolution = timecaps.wPeriodMin;
313 mmresult = timeBeginPeriod(sleepResolution);
314 if (mmresult != MMSYSERR_NOERROR) {
315 return FALSE;
316 }
317
318 wq = NewWorkQueue();
319 if ( !wq ) return FALSE;
320
321 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
322
323 if (!ioMan) {
324 FreeWorkQueue(wq);
325 return FALSE;
326 }
327
328 /* A manual-reset event */
329 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
330 if ( !hExit ) {
331 FreeWorkQueue(wq);
332 free(ioMan);
333 return FALSE;
334 }
335
336 ioMan->hExitEvent = hExit;
337 InitializeCriticalSection(&ioMan->manLock);
338 ioMan->workQueue = wq;
339 ioMan->numWorkers = 0;
340 ioMan->workersIdle = 0;
341 ioMan->queueSize = 0;
342 ioMan->requestID = 1;
343 InitializeCriticalSection(&ioMan->active_work_lock);
344 ioMan->active_work_items = NULL;
345 ioMan->sleepResolution = sleepResolution;
346
347 return TRUE;
348 }
349
350 /*
351 * Function: depositWorkItem()
352 *
353 * Local function which deposits a WorkItem onto a work queue,
354 * deciding in the process whether or not the thread pool needs
355 * to be augmented with another thread to handle the new request.
356 *
357 */
358 static
359 int
360 depositWorkItem( unsigned int reqID,
361 WorkItem* wItem )
362 {
363 EnterCriticalSection(&ioMan->manLock);
364
365 #if 0
366 fprintf(stderr, "depositWorkItem: %d/%d\n",
367 ioMan->workersIdle, ioMan->numWorkers);
368 fflush(stderr);
369 #endif
370 /* A new worker thread is created when there are fewer idle threads
371 * than non-consumed queue requests. This ensures that requests will
372 * be dealt with in a timely manner.
373 *
374 * [Long explanation of why the previous thread pool policy lead to
375 * trouble]
376 *
377 * Previously, the thread pool was augmented iff no idle worker threads
378 * were available. That strategy runs the risk of repeatedly adding to
379 * the request queue without expanding the thread pool to handle this
380 * sudden spike in queued requests.
381 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
382 * thread is created and the request is simply queued. If addIORequest()
383 * is called again _before the OS schedules a worker thread to pull the
384 * request off the queue_, workersIdle is still 1 and another request is
385 * simply added to the queue. Once the worker thread is run, only one
386 * request is de-queued, leaving the 2nd request in the queue]
387 *
388 * Assuming none of the queued requests take an inordinate amount
389 * of to complete, the request queue would eventually be
390 * drained. But if that's not the case, the later requests will
391 * end up languishing in the queue indefinitely. The non-timely
392 * handling of requests may cause CH applications to misbehave /
393 * hang; bad.
394 *
395 */
396 ioMan->queueSize++;
397 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
398 /* see if giving up our quantum ferrets out some idle threads.
399 */
400 LeaveCriticalSection(&ioMan->manLock);
401 Sleep(0);
402 EnterCriticalSection(&ioMan->manLock);
403 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
404 /* No, go ahead and create another. */
405 ioMan->numWorkers++;
406 if (!NewIOWorkerThread(ioMan)) {
407 ioMan->numWorkers--;
408 }
409 }
410 }
411 LeaveCriticalSection(&ioMan->manLock);
412
413 if (SubmitWork(ioMan->workQueue,wItem)) {
414 /* Note: the work item has potentially been consumed by a worker thread
415 * (and freed) at this point, so we cannot use wItem's requestID.
416 */
417 return reqID;
418 } else {
419 return 0;
420 }
421 }
422
423 /*
424 * Function: AddIORequest()
425 *
426 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
427 * request to work queue, deciding whether or not to augment
428 * the thread pool in the process.
429 */
430 int
431 AddIORequest ( int fd,
432 BOOL forWriting,
433 BOOL isSocket,
434 int len,
435 char* buffer,
436 CompletionProc onCompletion)
437 {
438 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
439 unsigned int reqID = ioMan->requestID++;
440 if (!ioMan || !wItem) return 0;
441
442 /* Fill in the blanks */
443 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
444 ( forWriting ? WORKER_WRITE : WORKER_READ );
445 wItem->workData.ioData.fd = fd;
446 wItem->workData.ioData.len = len;
447 wItem->workData.ioData.buf = buffer;
448 wItem->link = NULL;
449
450 wItem->onCompletion = onCompletion;
451 wItem->requestID = reqID;
452
453 return depositWorkItem(reqID, wItem);
454 }
455
456 /*
457 * Function: AddDelayRequest()
458 *
459 * Like AddIORequest(), but this time adding a delay request to
460 * the request queue.
461 */
462 BOOL
463 AddDelayRequest ( unsigned int usecs,
464 CompletionProc onCompletion)
465 {
466 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
467 unsigned int reqID = ioMan->requestID++;
468 if (!ioMan || !wItem) return FALSE;
469
470 /* Fill in the blanks */
471 wItem->workKind = WORKER_DELAY;
472 wItem->workData.delayData.usecs = usecs;
473 wItem->onCompletion = onCompletion;
474 wItem->requestID = reqID;
475 wItem->link = NULL;
476
477 return depositWorkItem(reqID, wItem);
478 }
479
480 /*
481 * Function: AddProcRequest()
482 *
483 * Add an asynchronous procedure request.
484 */
485 BOOL
486 AddProcRequest ( void* proc,
487 void* param,
488 CompletionProc onCompletion)
489 {
490 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
491 unsigned int reqID = ioMan->requestID++;
492 if (!ioMan || !wItem) return FALSE;
493
494 /* Fill in the blanks */
495 wItem->workKind = WORKER_DO_PROC;
496 wItem->workData.procData.proc = proc;
497 wItem->workData.procData.param = param;
498 wItem->onCompletion = onCompletion;
499 wItem->requestID = reqID;
500 wItem->abandonOp = 0;
501 wItem->link = NULL;
502
503 return depositWorkItem(reqID, wItem);
504 }
505
506 void ShutdownIOManager ( rtsBool wait_threads )
507 {
508 int num;
509 MMRESULT mmresult;
510
511 SetEvent(ioMan->hExitEvent);
512
513 if (wait_threads) {
514 /* Wait for all worker threads to die. */
515 for (;;) {
516 EnterCriticalSection(&ioMan->manLock);
517 num = ioMan->numWorkers;
518 LeaveCriticalSection(&ioMan->manLock);
519 if (num == 0)
520 break;
521 Sleep(10);
522 }
523 FreeWorkQueue(ioMan->workQueue);
524 CloseHandle(ioMan->hExitEvent);
525 DeleteCriticalSection(&ioMan->active_work_lock);
526 DeleteCriticalSection(&ioMan->manLock);
527
528 mmresult = timeEndPeriod(ioMan->sleepResolution);
529 if (mmresult != MMSYSERR_NOERROR) {
530 barf("timeEndPeriod failed");
531 }
532
533 free(ioMan);
534 ioMan = NULL;
535 }
536 }
537
538 /* Keep track of WorkItems currently being serviced. */
539 static
540 void
541 RegisterWorkItem(IOManagerState* ioMan,
542 WorkItem* wi)
543 {
544 EnterCriticalSection(&ioMan->active_work_lock);
545 wi->link = ioMan->active_work_items;
546 ioMan->active_work_items = wi;
547 LeaveCriticalSection(&ioMan->active_work_lock);
548 }
549
550 static
551 void
552 DeregisterWorkItem(IOManagerState* ioMan,
553 WorkItem* wi)
554 {
555 WorkItem *ptr, *prev;
556
557 EnterCriticalSection(&ioMan->active_work_lock);
558 for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
559 if (wi->requestID == ptr->requestID) {
560 if (prev==NULL) {
561 ioMan->active_work_items = ptr->link;
562 } else {
563 prev->link = ptr->link;
564 }
565 LeaveCriticalSection(&ioMan->active_work_lock);
566 return;
567 }
568 }
569 fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n",
570 wi->requestID);
571 LeaveCriticalSection(&ioMan->active_work_lock);
572 }
573
574
575 /*
576 * Function: abandonWorkRequest()
577 *
578 * Signal that a work request isn't of interest. Called by the Scheduler
579 * if a blocked Haskell thread has an exception thrown to it.
580 *
581 * Note: we're not aborting the system call that a worker might be blocked on
582 * here, just disabling the propagation of its result once its finished. We
583 * may have to go the whole hog here and switch to overlapped I/O so that we
584 * can abort blocked system calls.
585 */
586 void
587 abandonWorkRequest ( int reqID )
588 {
589 WorkItem *ptr;
590 EnterCriticalSection(&ioMan->active_work_lock);
591 for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
592 if (ptr->requestID == (unsigned int)reqID ) {
593 ptr->abandonOp = 1;
594 LeaveCriticalSection(&ioMan->active_work_lock);
595 return;
596 }
597 }
598 /* Note: if the request ID isn't present, the worker will have
599 * finished sometime since awaitRequests() last drained the completed
600 * request table; i.e., not an error.
601 */
602 LeaveCriticalSection(&ioMan->active_work_lock);
603 }
604
605 #endif