/* $Id$ */
/** @file
* IPRT - Anonymous Pipes, OS/2 Implementation.
*/
/*
* Copyright (C) 2010-2013 Oracle Corporation
*
* This file is part of VirtualBox Open Source Edition (OSE), as
* available from http://www.virtualbox.org. This file is free software;
* you can redistribute it and/or modify it under the terms of the GNU
* General Public License (GPL) as published by the Free Software
* Foundation, in version 2 as it comes in the "COPYING" file of the
* VirtualBox OSE distribution. VirtualBox OSE is distributed in the
* hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
*
* The contents of this file may alternatively be used under the terms
* of the Common Development and Distribution License Version 1.0
* (CDDL) only, as it comes in the "COPYING.CDDL" file of the
* VirtualBox OSE distribution, in which case the provisions of the
* CDDL are applicable instead of those of the GPL.
*
* You may elect to license modified versions of this file under the
* terms and conditions of either the GPL or the CDDL or both.
*/
/*******************************************************************************
* Header Files *
*******************************************************************************/
#define INCL_ERRORS
#define INCL_DOSSEMAPHORES
#include <os2.h>
#include <iprt/pipe.h>
#include "internal/iprt.h"
#include <iprt/asm.h>
#include <iprt/assert.h>
#include <iprt/critsect.h>
#include <iprt/err.h>
#include <iprt/mem.h>
#include <iprt/string.h>
#include <iprt/poll.h>
#include <iprt/process.h>
#include <iprt/thread.h>
#include <iprt/time.h>
#include "internal/pipe.h"
#include "internal/magics.h"
/*******************************************************************************
* Defined Constants And Macros *
*******************************************************************************/
/** The pipe buffer size we prefer. */
#define RTPIPE_OS2_SIZE _32K
/*******************************************************************************
* Structures and Typedefs *
*******************************************************************************/
typedef struct RTPIPEINTERNAL
{
/** Magic value (RTPIPE_MAGIC). */
uint32_t u32Magic;
/** The pipe handle. */
HPIPE hPipe;
/** Set if this is the read end, clear if it's the write end. */
bool fRead;
/** Whether the pipe is in blocking or non-blocking mode. */
bool fBlocking;
/** Set if the pipe is broken. */
bool fBrokenPipe;
/** Usage counter. */
uint32_t cUsers;
/** The event semaphore associated with the pipe. */
HEV hev;
/** The handle of the poll set currently polling on this pipe.
* We can only have one poller at the time (lazy bird). */
RTPOLLSET hPollSet;
/** Critical section protecting the above members.
* (Taking the lazy/simple approach.) */
RTCRITSECT CritSect;
} RTPIPEINTERNAL;
/**
* Ensures that the pipe has a semaphore associated with it.
*
* @returns VBox status code.
* @param pThis The pipe.
*/
static int rtPipeOs2EnsureSem(RTPIPEINTERNAL *pThis)
{
if (pThis->hev != NULLHANDLE)
return VINF_SUCCESS;
HEV hev;
APIRET orc = DosCreateEventSem(NULL, &hev, DC_SEM_SHARED, FALSE);
if (orc == NO_ERROR)
{
orc = DosSetNPipeSem(pThis->hPipe, (HSEM)hev, 1);
if (orc == NO_ERROR)
{
pThis->hev = hev;
return VINF_SUCCESS;
}
DosCloseEventSem(hev);
}
return RTErrConvertFromOS2(orc);
}
RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
{
AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
/*
* Try create and connect a pipe pair.
*/
APIRET orc;
HPIPE hPipeR;
HFILE hPipeW;
int rc;
for (;;)
{
static volatile uint32_t g_iNextPipe = 0;
char szName[128];
RTStrPrintf(szName, sizeof(szName), "\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe));
/*
* Create the read end of the pipe.
*/
ULONG fPipeMode = 1 /*instance*/ | NP_TYPE_BYTE | NP_READMODE_BYTE | NP_NOWAIT;
ULONG fOpenMode = NP_ACCESS_DUPLEX | NP_WRITEBEHIND;
if (fFlags & RTPIPE_C_INHERIT_READ)
fOpenMode |= NP_INHERIT;
else
fOpenMode |= NP_NOINHERIT;
orc = DosCreateNPipe((PSZ)szName, &hPipeR, fOpenMode, fPipeMode, RTPIPE_OS2_SIZE, RTPIPE_OS2_SIZE, NP_DEFAULT_WAIT);
if (orc == NO_ERROR)
{
orc = DosConnectNPipe(hPipeR);
if (orc == ERROR_PIPE_NOT_CONNECTED || orc == NO_ERROR)
{
/*
* Connect to the pipe (the write end), attach sem below.
*/
ULONG ulAction = 0;
ULONG fOpenW = OPEN_ACTION_FAIL_IF_NEW | OPEN_ACTION_OPEN_IF_EXISTS;
ULONG fModeW = OPEN_ACCESS_WRITEONLY | OPEN_SHARE_DENYNONE | OPEN_FLAGS_FAIL_ON_ERROR;
if (!(fFlags & RTPIPE_C_INHERIT_WRITE))
fModeW |= OPEN_FLAGS_NOINHERIT;
orc = DosOpen((PSZ)szName, &hPipeW, &ulAction, 0 /*cbFile*/, FILE_NORMAL,
fOpenW, fModeW, NULL /*peaop2*/);
if (orc == NO_ERROR)
break;
}
DosClose(hPipeR);
}
if ( orc != ERROR_PIPE_BUSY /* already exist - compatible */
&& orc != ERROR_ACCESS_DENIED /* already exist - incompatible (?) */)
return RTErrConvertFromOS2(orc);
/* else: try again with a new name */
}
/*
* Create the two handles.
*/
RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
if (pThisR)
{
RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
if (pThisW)
{
/* Crit sects. */
rc = RTCritSectInit(&pThisR->CritSect);
if (RT_SUCCESS(rc))
{
rc = RTCritSectInit(&pThisW->CritSect);
if (RT_SUCCESS(rc))
{
/* Initialize the structures. */
pThisR->u32Magic = RTPIPE_MAGIC;
pThisW->u32Magic = RTPIPE_MAGIC;
pThisR->hPipe = hPipeR;
pThisW->hPipe = hPipeW;
pThisR->hev = NULLHANDLE;
pThisW->hev = NULLHANDLE;
pThisR->fRead = true;
pThisW->fRead = false;
pThisR->fBlocking = false;
pThisW->fBlocking = true;
//pThisR->fBrokenPipe = false;
//pThisW->fBrokenPipe = false;
//pThisR->cUsers = 0;
//pThisW->cUsers = 0;
pThisR->hPollSet = NIL_RTPOLLSET;
pThisW->hPollSet = NIL_RTPOLLSET;
*phPipeRead = pThisR;
*phPipeWrite = pThisW;
return VINF_SUCCESS;
}
RTCritSectDelete(&pThisR->CritSect);
}
RTMemFree(pThisW);
}
else
rc = VERR_NO_MEMORY;
RTMemFree(pThisR);
}
else
rc = VERR_NO_MEMORY;
/* Don't call DosDisConnectNPipe! */
DosClose(hPipeW);
DosClose(hPipeR);
return rc;
}
RTDECL(int) RTPipeClose(RTPIPE hPipe)
{
RTPIPEINTERNAL *pThis = hPipe;
if (pThis == NIL_RTPIPE)
return VINF_SUCCESS;
AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
/*
* Do the cleanup.
*/
AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
RTCritSectEnter(&pThis->CritSect);
Assert(pThis->cUsers == 0);
/* Don't call DosDisConnectNPipe! */
DosClose(pThis->hPipe);
pThis->hPipe = (HPIPE)-1;
if (pThis->hev != NULLHANDLE)
{
DosCloseEventSem(pThis->hev);
pThis->hev = NULLHANDLE;
}
RTCritSectLeave(&pThis->CritSect);
RTCritSectDelete(&pThis->CritSect);
RTMemFree(pThis);
return VINF_SUCCESS;
}
RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags)
{
AssertPtrReturn(phPipe, VERR_INVALID_POINTER);
AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER);
AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER);
/*
* Get and validate the pipe handle info.
*/
HPIPE hNative = (HPIPE)hNativePipe;
ULONG ulType = 0;
ULONG ulAttr = 0;
APIRET orc = DosQueryHType(hNative, &ulType, &ulAttr);
AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
AssertReturn((ulType & 0x7) == HANDTYPE_PIPE, VERR_INVALID_HANDLE);
#if 0
union
{
PIPEINFO PipeInfo;
uint8_t abPadding[sizeof(PIPEINFO) + 127];
} Buf;
orc = DosQueryNPipeInfo(hNative, 1, &Buf, sizeof(Buf));
if (orc != NO_ERROR)
{
/* Sorry, anonymous pips are not supported. */
AssertMsgFailed(("%d\n", orc));
return VERR_INVALID_HANDLE;
}
AssertReturn(Buf.PipeInfo.cbMaxInst == 1, VERR_INVALID_HANDLE);
#endif
ULONG fPipeState = 0;
orc = DosQueryNPHState(hNative, &fPipeState);
if (orc != NO_ERROR)
{
/* Sorry, anonymous pips are not supported. */
AssertMsgFailed(("%d\n", orc));
return VERR_INVALID_HANDLE;
}
AssertReturn(!(fPipeState & NP_TYPE_MESSAGE), VERR_INVALID_HANDLE);
AssertReturn(!(fPipeState & NP_READMODE_MESSAGE), VERR_INVALID_HANDLE);
AssertReturn((fPipeState & 0xff) == 1, VERR_INVALID_HANDLE);
ULONG fFileState = 0;
orc = DosQueryFHState(hNative, &fFileState);
AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), VERR_INVALID_HANDLE);
AssertMsgReturn( (fFileState & 0x3) == (fFlags & RTPIPE_N_READ ? OPEN_ACCESS_READONLY : OPEN_ACCESS_WRITEONLY)
|| (fFileState & 0x3) == OPEN_ACCESS_READWRITE
, ("%#x\n", fFileState), VERR_INVALID_HANDLE);
/*
* Looks kind of OK. Fix the inherit flag.
*/
orc = DosSetFHState(hNative, (fFileState & (OPEN_FLAGS_WRITE_THROUGH | OPEN_FLAGS_FAIL_ON_ERROR | OPEN_FLAGS_NO_CACHE))
| (fFlags & RTPIPE_N_INHERIT ? 0 : OPEN_FLAGS_NOINHERIT));
AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
/*
* Create a handle so we can try rtPipeQueryInfo on it
* and see if we need to duplicate it to make that call work.
*/
RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
if (!pThis)
return VERR_NO_MEMORY;
int rc = RTCritSectInit(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
pThis->u32Magic = RTPIPE_MAGIC;
pThis->hPipe = hNative;
pThis->hev = NULLHANDLE;
pThis->fRead = !!(fFlags & RTPIPE_N_READ);
pThis->fBlocking = !(fPipeState & NP_NOWAIT);
//pThis->fBrokenPipe = false;
//pThis->cUsers = 0;
pThis->hPollSet = NIL_RTPOLLSET;
*phPipe = pThis;
return VINF_SUCCESS;
//RTCritSectDelete(&pThis->CritSect);
}
RTMemFree(pThis);
return rc;
}
RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, -1);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1);
return (RTHCINTPTR)pThis->hPipe;
}
/**
* Prepare blocking mode.
*
* @returns IPRT status code.
* @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
* attempted.
*
* @param pThis The pipe handle.
*
* @remarks Caller owns the critical section.
*/
static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis)
{
if (!pThis->fBlocking)
{
if (pThis->cUsers != 0)
return VERR_WRONG_ORDER;
APIRET orc = DosSetNPHState(pThis->hPipe, NP_WAIT | NP_READMODE_BYTE);
if (orc != NO_ERROR)
{
if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
return RTErrConvertFromOS2(orc);
pThis->fBrokenPipe = true;
}
pThis->fBlocking = true;
}
pThis->cUsers++;
return VINF_SUCCESS;
}
/**
* Prepare non-blocking mode.
*
* @returns IPRT status code.
* @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
* attempted.
*
* @param pThis The pipe handle.
*/
static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis)
{
if (pThis->fBlocking)
{
if (pThis->cUsers != 0)
return VERR_WRONG_ORDER;
APIRET orc = DosSetNPHState(pThis->hPipe, NP_NOWAIT | NP_READMODE_BYTE);
if (orc != NO_ERROR)
{
if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
return RTErrConvertFromOS2(orc);
pThis->fBrokenPipe = true;
}
pThis->fBlocking = false;
}
pThis->cUsers++;
return VINF_SUCCESS;
}
/**
* Checks if the read pipe has been broken.
*
* @returns true if broken, false if no.
* @param pThis The pipe handle (read).
*/
static bool rtPipeOs2IsBroken(RTPIPEINTERNAL *pThis)
{
Assert(pThis->fRead);
#if 0
/*
* Query it via the semaphore. Not sure how fast this is...
*/
PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
APIRET orc = DosQueryNPipeSemState(pThis->hev, &aStates[0], sizeof(aStates));
if (orc == NO_ERROR)
{
if (aStates[0].fStatus == NPSS_CLOSE)
return true;
if (aStates[0].fStatus == NPSS_RDATA)
return false;
}
AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
/*
* Fall back / alternative method.
*/
#endif
ULONG cbActual = 0;
ULONG ulState = 0;
AVAILDATA Avail = { 0, 0 };
APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
if (orc != NO_ERROR)
{
if (orc != ERROR_PIPE_BUSY)
AssertMsgFailed(("%d\n", orc));
return false;
}
return ulState != NP_STATE_CONNECTED;
}
RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
AssertPtr(pcbRead);
AssertPtr(pvBuf);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
rc = rtPipeTryNonBlocking(pThis);
if (RT_SUCCESS(rc))
{
RTCritSectLeave(&pThis->CritSect);
ULONG cbActual = 0;
APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
if (orc == NO_ERROR)
{
if (cbActual || !cbToRead || !rtPipeOs2IsBroken(pThis))
*pcbRead = cbActual;
else
rc = VERR_BROKEN_PIPE;
}
else if (orc == ERROR_NO_DATA)
{
*pcbRead = 0;
rc = VINF_TRY_AGAIN;
}
else
rc = RTErrConvertFromOS2(orc);
RTCritSectEnter(&pThis->CritSect);
if (rc == VERR_BROKEN_PIPE)
pThis->fBrokenPipe = true;
pThis->cUsers--;
}
else
rc = VERR_WRONG_ORDER;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
AssertPtr(pvBuf);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
rc = rtPipeTryBlocking(pThis);
if (RT_SUCCESS(rc))
{
RTCritSectLeave(&pThis->CritSect);
size_t cbTotalRead = 0;
while (cbToRead > 0)
{
ULONG cbActual = 0;
APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
if (orc != NO_ERROR)
{
rc = RTErrConvertFromOS2(orc);
break;
}
if (!cbActual && rtPipeOs2IsBroken(pThis))
{
rc = VERR_BROKEN_PIPE;
break;
}
/* advance */
pvBuf = (char *)pvBuf + cbActual;
cbTotalRead += cbActual;
cbToRead -= cbActual;
}
if (pcbRead)
{
*pcbRead = cbTotalRead;
if ( RT_FAILURE(rc)
&& cbTotalRead)
rc = VINF_SUCCESS;
}
RTCritSectEnter(&pThis->CritSect);
if (rc == VERR_BROKEN_PIPE)
pThis->fBrokenPipe = true;
pThis->cUsers--;
}
else
rc = VERR_WRONG_ORDER;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
/**
* Gets the available write buffer size of the pipe.
*
* @returns Number of bytes, 1 on failure.
* @param pThis The pipe handle.
*/
static ULONG rtPipeOs2GetSpace(RTPIPEINTERNAL *pThis)
{
Assert(!pThis->fRead);
#if 0 /* Not sure which is more efficient, neither are really optimal, I fear. */
/*
* Query via semaphore state.
* This will walk the list of active named pipes...
*/
/** @todo Check how hev and hpipe are associated, if complicated, use the
* alternative method below. */
PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
if (orc == NO_ERROR)
{
if (aStates[0].fStatus == NPSS_WSPACE)
return aStates[0].usAvail;
if (aStates[1].fStatus == NPSS_WSPACE)
return aStates[1].usAvail;
return 0;
}
AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
#else
/*
* Query via the pipe info.
* This will have to lookup and store the pipe name.
*/
union
{
PIPEINFO PipeInfo;
uint8_t abPadding[sizeof(PIPEINFO) + 127];
} Buf;
APIRET orc = DosQueryNPipeInfo(pThis->hPipe, 1, &Buf, sizeof(Buf));
if (orc == NO_ERROR)
return Buf.PipeInfo.cbOut;
AssertMsgFailed(("%d\n", orc));
#endif
return 1;
}
RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
AssertPtr(pcbWritten);
AssertPtr(pvBuf);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
rc = rtPipeTryNonBlocking(pThis);
if (RT_SUCCESS(rc))
{
if (cbToWrite > 0)
{
ULONG cbActual = 0;
APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
if (orc == NO_ERROR && cbActual == 0)
{
/* Retry with the request adjusted to the available buffer space. */
ULONG cbAvail = rtPipeOs2GetSpace(pThis);
orc = DosWrite(pThis->hPipe, pvBuf, RT_MIN(cbAvail, cbToWrite), &cbActual);
}
if (orc == NO_ERROR)
{
*pcbWritten = cbActual;
if (cbActual == 0)
rc = VINF_TRY_AGAIN;
}
else
{
rc = RTErrConvertFromOS2(orc);
if (rc == VERR_PIPE_NOT_CONNECTED)
rc = VERR_BROKEN_PIPE;
}
}
else
*pcbWritten = 0;
if (rc == VERR_BROKEN_PIPE)
pThis->fBrokenPipe = true;
pThis->cUsers--;
}
else
rc = VERR_WRONG_ORDER;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
AssertPtr(pvBuf);
AssertPtrNull(pcbWritten);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
rc = rtPipeTryBlocking(pThis);
if (RT_SUCCESS(rc))
{
RTCritSectLeave(&pThis->CritSect);
size_t cbTotalWritten = 0;
while (cbToWrite > 0)
{
ULONG cbActual = 0;
APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
if (orc != NO_ERROR)
{
rc = RTErrConvertFromOS2(orc);
if (rc == VERR_PIPE_NOT_CONNECTED)
rc = VERR_BROKEN_PIPE;
break;
}
pvBuf = (char const *)pvBuf + cbActual;
cbToWrite -= cbActual;
cbTotalWritten += cbActual;
}
if (pcbWritten)
{
*pcbWritten = cbTotalWritten;
if ( RT_FAILURE(rc)
&& cbTotalWritten)
rc = VINF_SUCCESS;
}
RTCritSectEnter(&pThis->CritSect);
if (rc == VERR_BROKEN_PIPE)
pThis->fBrokenPipe = true;
pThis->cUsers--;
}
else
rc = VERR_WRONG_ORDER;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
RTDECL(int) RTPipeFlush(RTPIPE hPipe)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
APIRET orc = DosResetBuffer(pThis->hPipe);
if (orc != NO_ERROR)
{
int rc = RTErrConvertFromOS2(orc);
if (rc == VERR_BROKEN_PIPE)
{
RTCritSectEnter(&pThis->CritSect);
pThis->fBrokenPipe = true;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
return VINF_SUCCESS;
}
RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
uint64_t const StartMsTS = RTTimeMilliTS();
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_FAILURE(rc))
return rc;
rc = rtPipeOs2EnsureSem(pThis);
if (RT_SUCCESS(rc) && cMillies > 0)
{
/* Stop polling attempts if we might block. */
if (pThis->hPollSet == NIL_RTPOLLSET)
pThis->hPollSet = (RTPOLLSET)(uintptr_t)0xbeef0042;
else
rc = VERR_WRONG_ORDER;
}
if (RT_SUCCESS(rc))
{
for (unsigned iLoop = 0;; iLoop++)
{
/*
* Check the handle state.
*/
APIRET orc;
if (cMillies > 0)
{
ULONG ulIgnore;
orc = DosResetEventSem(pThis->hev, &ulIgnore);
AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc));
}
PIPESEMSTATE aStates[4]; RT_ZERO(aStates);
orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
if (orc != NO_ERROR)
{
rc = RTErrConvertFromOS2(orc);
break;
}
int i = 0;
if (pThis->fRead)
while (aStates[i].fStatus == NPSS_WSPACE)
i++;
else
while (aStates[i].fStatus == NPSS_RDATA)
i++;
if (aStates[i].fStatus == NPSS_CLOSE)
break;
Assert(aStates[i].fStatus == NPSS_WSPACE || aStates[i].fStatus == NPSS_RDATA || aStates[i].fStatus == NPSS_EOI);
if ( aStates[i].fStatus != NPSS_EOI
&& aStates[i].usAvail > 0)
break;
/*
* Check for timeout.
*/
ULONG cMsMaxWait = SEM_INDEFINITE_WAIT;
if (cMillies != RT_INDEFINITE_WAIT)
{
uint64_t cElapsed = RTTimeMilliTS() - StartMsTS;
if (cElapsed >= cMillies)
{
rc = VERR_TIMEOUT;
break;
}
cMsMaxWait = cMillies - (uint32_t)cElapsed;
}
/*
* Wait.
*/
RTCritSectLeave(&pThis->CritSect);
orc = DosWaitEventSem(pThis->hev, cMsMaxWait);
RTCritSectEnter(&pThis->CritSect);
if (orc != NO_ERROR && orc != ERROR_TIMEOUT && orc != ERROR_SEM_TIMEOUT )
{
rc = RTErrConvertFromOS2(orc);
break;
}
}
if (rc == VERR_BROKEN_PIPE)
pThis->fBrokenPipe = true;
if (cMillies > 0)
pThis->hPollSet = NIL_RTPOLLSET;
}
RTCritSectLeave(&pThis->CritSect);
return rc;
}
RTDECL(int) RTPipeQueryReadable(RTPIPE hPipe, size_t *pcbReadable)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(pThis->fRead, VERR_PIPE_NOT_READ);
AssertPtrReturn(pcbReadable, VERR_INVALID_POINTER);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_FAILURE(rc))
return rc;
ULONG cbActual = 0;
ULONG ulState = 0;
AVAILDATA Avail = { 0, 0 };
APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
if (orc == NO_ERROR)
{
if (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED)
*pcbReadable = Avail.cbpipe;
else
rc = VERR_PIPE_NOT_CONNECTED; /*??*/
}
else
rc = RTErrConvertFromOS2(orc);
RTCritSectLeave(&pThis->CritSect);
return rc;
}
int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PRTHCINTPTR phNative)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER);
AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER);
int rc = RTCritSectEnter(&pThis->CritSect);
if (RT_SUCCESS(rc))
{
rc = rtPipeOs2EnsureSem(pThis);
if (RT_SUCCESS(rc))
*phNative = (RTHCINTPTR)pThis->hev;
RTCritSectLeave(&pThis->CritSect);
}
return rc;
}
/**
* Checks for pending events.
*
* @returns Event mask or 0.
* @param pThis The pipe handle.
* @param fEvents The desired events.
* @param fResetEvtSem Whether to reset the event semaphore.
*/
static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents, bool fResetEvtSem)
{
/*
* Reset the event semaphore if we're gonna wait.
*/
APIRET orc;
ULONG ulIgnore;
if (fResetEvtSem)
{
orc = DosResetEventSem(pThis->hev, &ulIgnore);
AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc));
}
/*
* Check for events.
*/
uint32_t fRetEvents = 0;
if (pThis->fBrokenPipe)
fRetEvents |= RTPOLL_EVT_ERROR;
else if (pThis->fRead)
{
ULONG cbActual = 0;
ULONG ulState = 0;
AVAILDATA Avail = { 0, 0 };
orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
if (orc != NO_ERROR)
{
fRetEvents |= RTPOLL_EVT_ERROR;
if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED)
pThis->fBrokenPipe = true;
}
else if (Avail.cbpipe > 0)
fRetEvents |= RTPOLL_EVT_READ;
else if (ulState != NP_STATE_CONNECTED)
{
fRetEvents |= RTPOLL_EVT_ERROR;
pThis->fBrokenPipe = true;
}
}
else
{
PIPESEMSTATE aStates[4]; RT_ZERO(aStates);
orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
if (orc == NO_ERROR)
{
int i = 0;
while (aStates[i].fStatus == NPSS_RDATA)
i++;
if (aStates[i].fStatus == NPSS_CLOSE)
{
fRetEvents |= RTPOLL_EVT_ERROR;
pThis->fBrokenPipe = true;
}
else if ( aStates[i].fStatus == NPSS_WSPACE
&& aStates[i].usAvail > 0)
fRetEvents |= RTPOLL_EVT_WRITE;
}
else
{
fRetEvents |= RTPOLL_EVT_ERROR;
if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED)
pThis->fBrokenPipe = true;
}
}
return fRetEvents & (fEvents | RTPOLL_EVT_ERROR);
}
uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, UINT32_MAX);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX);
int rc = RTCritSectEnter(&pThis->CritSect);
AssertRCReturn(rc, UINT32_MAX);
/* Check that this is the only current use of this pipe. */
uint32_t fRetEvents;
if ( pThis->cUsers == 0
|| pThis->hPollSet == NIL_RTPOLLSET)
{
fRetEvents = rtPipePollCheck(pThis, fEvents, fNoWait);
if (!fRetEvents && !fNoWait)
{
/* Mark the set busy while waiting. */
pThis->cUsers++;
pThis->hPollSet = hPollSet;
}
}
else
{
AssertFailed();
fRetEvents = UINT32_MAX;
}
RTCritSectLeave(&pThis->CritSect);
return fRetEvents;
}
uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry, bool fHarvestEvents)
{
RTPIPEINTERNAL *pThis = hPipe;
AssertPtrReturn(pThis, 0);
AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0);
int rc = RTCritSectEnter(&pThis->CritSect);
AssertRCReturn(rc, 0);
Assert(pThis->cUsers > 0);
/* harvest events. */
uint32_t fRetEvents = rtPipePollCheck(pThis, fEvents, false);
/* update counters. */
pThis->cUsers--;
pThis->hPollSet = NIL_RTPOLLSET;
RTCritSectLeave(&pThis->CritSect);
return fRetEvents;
}