nsPipe3.cpp revision 677833bc953b6cb418c701facbdcf4aa18d6c44e
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla.
*
* The Initial Developer of the Original Code is
* Netscape Communications Corporation.
* Portions created by the Initial Developer are Copyright (C) 2002
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
* Darin Fisher <darin@netscape.com>
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#include "nsIPipe.h"
#include "nsIEventTarget.h"
#include "nsISeekableStream.h"
#include "nsSegmentedBuffer.h"
#include "nsStreamUtils.h"
#include "nsAutoLock.h"
#include "nsCOMPtr.h"
#include "nsCRT.h"
#include "prlog.h"
#include "nsInt64.h"
#if defined(PR_LOGGING)
//
// set NSPR_LOG_MODULES=nsPipe:5
//
#else
#endif
#define DEFAULT_SEGMENT_SIZE 4096
#define DEFAULT_SEGMENT_COUNT 16
class nsPipe;
class nsPipeEvents;
class nsPipeInputStream;
class nsPipeOutputStream;
//-----------------------------------------------------------------------------
// this class is used to delay notifications until the end of a particular
// scope. it helps avoid the complexity of issuing callbacks while inside
// a critical section.
class nsPipeEvents
{
public:
nsPipeEvents() { }
~nsPipeEvents();
{
}
{
}
private:
};
//-----------------------------------------------------------------------------
// the input end of a pipe (allocated as a member of the pipe).
class nsPipeInputStream : public nsIAsyncInputStream
, public nsISeekableStream
, public nsISearchableInputStream
{
public:
// since this class will be allocated as a member of the pipe, we do not
// need our own ref count. instead, we share the lifetime (the ref count)
// of the entire pipe. this macro is just convenience since it does not
// declare a mRefCount variable; however, don't let the name fool you...
// we are not inheriting from nsPipe ;-)
, mReaderRefCnt(0)
, mLogicalOffset(0)
, mAvailable(0)
, mCallbackFlags(0)
{ }
// synchronously wait for the pipe to become readable.
// these functions return true to indicate that the pipe's monitor should
// be notified, to wake up a blocked reader if any.
private:
// separate refcnt so that we know when to close the consumer
// these variables can only be accessed while inside the pipe's monitor
};
//-----------------------------------------------------------------------------
// the output end of a pipe (allocated as a member of the pipe).
class nsPipeOutputStream : public nsIAsyncOutputStream
, public nsISeekableStream
{
public:
// since this class will be allocated as a member of the pipe, we do not
// need our own ref count. instead, we share the lifetime (the ref count)
// of the entire pipe. this macro is just convenience since it does not
// declare a mRefCount variable; however, don't let the name fool you...
// we are not inheriting from nsPipe ;-)
, mWriterRefCnt(0)
, mLogicalOffset(0)
, mCallbackFlags(0)
{ }
// synchronously wait for the pipe to become writable.
// these functions return true to indicate that the pipe's monitor should
// be notified, to wake up a blocked writer if any.
private:
// separate refcnt so that we know when to close the producer
// these variables can only be accessed while inside the pipe's monitor
};
//-----------------------------------------------------------------------------
{
public:
friend class nsPipeInputStream;
friend class nsPipeOutputStream;
// nsPipe methods:
nsPipe();
private:
~nsPipe();
public:
//
// methods below may only be called while inside the pipe's monitor
//
//
// methods below may be called while outside the pipe's monitor
//
protected:
// We can't inherit from both nsIInputStream and nsIOutputStream
// because they collide on their Close method. Consequently we nest their
// implementations to avoid the extra object allocation.
char* mReadCursor;
char* mReadLimit;
char* mWriteCursor;
char* mWriteLimit;
};
//
// NOTES on buffer architecture:
//
// +-----------------+ - - mBuffer.GetSegment(0)
// | |
// + - - - - - - - - + - - mReadCursor
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// +-----------------+ - - mReadLimit
// |
// +-----------------+
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// +-----------------+
// |
// +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
// |/////////////////|
// |/////////////////|
// |/////////////////|
// + - - - - - - - - + - - mWriteCursor
// | |
// | |
// +-----------------+ - - mWriteLimit
//
// (shaded region contains data)
//
// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
// small allocations (e.g., 64 byte allocations). this means that buffers may
// be allocated back-to-back. in the diagram above, for example, mReadLimit
// would actually be pointing at the beginning of the next segment. when
// making changes to this file, please keep this fact in mind.
//
//-----------------------------------------------------------------------------
// nsPipe methods:
//-----------------------------------------------------------------------------
: mInput(this)
, mOutput(this)
, mReadLimit(nsnull)
, mWriteSegment(-1)
{
}
{
if (mMonitor)
}
{
mMonitor = PR_NewMonitor();
if (!mMonitor)
return NS_ERROR_OUT_OF_MEMORY;
if (segmentSize == 0)
if (segmentCount == 0)
// protect against overflow
if (segmentCount > maxCount)
return rv;
return NS_OK;
}
{
return NS_OK;
}
{
return NS_OK;
}
void
{
if (index == 0) {
limit = mReadLimit;
}
else {
if (index >= numSegments)
else {
else
}
}
}
{
if (mReadCursor == mReadLimit)
return NS_OK;
}
void
{
{
mReadCursor += bytesRead;
if (mReadCursor == mReadLimit) {
// we've reached the limit of how much we can read from this segment.
// if at the end of this segment, then we must discard this segment.
// if still writing in this segment then bail because we're not done
// with the segment and have to wait for now...
return;
}
// shift write segment index (-1 indicates an empty buffer).
// done with this segment
LOG(("III deleting first segment\n"));
if (mWriteSegment == -1) {
// buffer is completely empty
mReadLimit = nsnull;
}
else {
// advance read cursor and limit to next buffer segment
if (mWriteSegment == 0)
else
}
// we've free'd up a segment, so notify output stream that pipe has
// room for a new segment.
}
}
}
{
return mStatus;
// write cursor and limit may both be null indicating an empty buffer.
if (mWriteCursor == mWriteLimit) {
// pipe is full
return NS_BASE_STREAM_WOULD_BLOCK;
LOG(("OOO appended new segment\n"));
mWriteCursor = seg;
}
// make sure read cursor is initialized
if (mReadCursor == nsnull) {
}
// check to see if we can roll-back our read and write cursors to the
}
return NS_OK;
}
void
{
{
// update read limit if reading in the same segment
// update the writable flag on the output stream
if (mWriteCursor == mWriteLimit) {
}
// notify input stream that pipe now contains additional data
}
}
void
{
LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
reason, outputOnly));
{
// if we've already hit an exception, then ignore this one.
return;
// an output-only exception applies to the input end if the pipe has
// zero bytes available.
if (!outputOnly)
}
}
//-----------------------------------------------------------------------------
// nsPipeEvents methods:
//-----------------------------------------------------------------------------
{
// dispatch any pending events
if (mInputCallback) {
mInputCallback = 0;
mInputStream = 0;
}
if (mOutputCallback) {
mOutputCallback = 0;
mOutputStream = 0;
}
}
//-----------------------------------------------------------------------------
// nsPipeInputStream methods:
//-----------------------------------------------------------------------------
{
LOG(("III pipe input: waiting for data\n"));
LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
}
}
{
mCallback = 0;
mCallbackFlags = 0;
}
else if (mBlocked)
return result;
}
{
LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
this, reason));
// force count of available bytes to zero.
mAvailable = 0;
if (mCallback) {
mCallback = 0;
mCallbackFlags = 0;
}
else if (mBlocked)
return result;
}
nsPipeInputStream::AddRef(void)
{
}
nsPipeInputStream::Release(void)
{
if (--mReaderRefCnt == 0)
Close();
}
{
if (NS_SUCCEEDED(reason))
return NS_OK;
}
{
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
{
// return error if pipe closed
*result = mAvailable;
return NS_OK;
}
void *closure,
{
const char *segment;
*readCount = 0;
while (count) {
// ignore this error if we've already read something.
if (*readCount > 0) {
break;
}
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
// pipe is empty
if (!mBlocking)
break;
// wait for some data to be written to the pipe
if (NS_SUCCEEDED(rv))
continue;
}
// ignore this error, just return.
if (rv == NS_BASE_STREAM_CLOSED) {
break;
}
break;
}
// read no more than count
if (segmentLen > count)
segmentLen = count;
while (segmentLen) {
writeCount = 0;
count = 0;
// any errors returned from the writer end here: do not
// propogate to the caller of ReadSegments.
break;
}
segment += writeCount;
segmentLen -= writeCount;
count -= writeCount;
*readCount += writeCount;
}
if (segmentLen < originalLen)
}
return rv;
}
static NS_METHOD
void *closure,
const char *fromRawSegment,
{
*writeCount = count;
return NS_OK;
}
{
}
{
*aNonBlocking = !mBlocking;
return NS_OK;
}
{
LOG(("III AsyncWait [this=%x]\n", this));
{
// replace a pending callback
mCallback = 0;
mCallbackFlags = 0;
if (target) {
}
// stream is already closed or readable; post event.
}
else {
// queue up callback object to be notified when data becomes available
}
}
return NS_OK;
}
{
NS_NOTREACHED("nsPipeInputStream::Seek");
return NS_ERROR_NOT_IMPLEMENTED;
}
{
*offset = mLogicalOffset;
return NS_OK;
}
{
NS_NOTREACHED("nsPipeInputStream::SetEOF");
return NS_ERROR_NOT_IMPLEMENTED;
}
(ignoreCase \
{
*offsetSearchedTo = 0;
return NS_OK;
}
while (PR_TRUE) {
// check if the string is in the buffer segment
*offsetSearchedTo = offset + i;
return NS_OK;
}
}
// get the next segment
index++;
return NS_OK;
}
// check if the string is straddling the next buffer segment
for (i = 0; i < lim; ++i) {
return NS_OK;
}
}
// finally continue with the next buffer
}
NS_NOTREACHED("can't get here");
return NS_ERROR_UNEXPECTED; // keep compiler happy
}
//-----------------------------------------------------------------------------
// nsPipeOutputStream methods:
//-----------------------------------------------------------------------------
{
LOG(("OOO pipe output: waiting for space\n"));
LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
}
}
{
mCallback = 0;
mCallbackFlags = 0;
}
else if (mBlocked)
return result;
}
{
LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
this, reason));
if (mCallback) {
mCallback = 0;
mCallbackFlags = 0;
}
else if (mBlocked)
return result;
}
{
}
{
if (--mWriterRefCnt == 0)
Close();
}
{
if (NS_SUCCEEDED(reason))
// input stream may remain open
return NS_OK;
}
{
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
void* closure,
{
char *segment;
*writeCount = 0;
while (count) {
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
// pipe is full
if (!mBlocking) {
// ignore this error if we've already written something
if (*writeCount > 0)
break;
}
// wait for the pipe to have an empty segment.
if (NS_SUCCEEDED(rv))
continue;
}
break;
}
// write no more than count
if (segmentLen > count)
segmentLen = count;
while (segmentLen) {
readCount = 0;
count = 0;
// any errors returned from the reader end here: do not
// propogate to the caller of WriteSegments.
break;
}
segmentLen -= readCount;
*writeCount += readCount;
}
if (segmentLen < originalLen)
}
return rv;
}
static NS_METHOD
void* closure,
char* toRawSegment,
{
return NS_OK;
}
{
}
nsPipeOutputStream::Flush(void)
{
// nothing to do
return NS_OK;
}
static NS_METHOD
void* closure,
char* toRawSegment,
{
}
{
}
{
*aNonBlocking = !mBlocking;
return NS_OK;
}
{
LOG(("OOO AsyncWait [this=%x]\n", this));
{
// replace a pending callback
mCallback = 0;
mCallbackFlags = 0;
if (target) {
}
// stream is already closed or writable; post event.
}
else {
// queue up callback object to be notified when data becomes available
}
}
return NS_OK;
}
{
NS_NOTREACHED("nsPipeOutputStream::Seek");
return NS_ERROR_NOT_IMPLEMENTED;
}
{
*offset = mLogicalOffset;
return NS_OK;
}
{
NS_NOTREACHED("nsPipeOutputStream::SetEOF");
return NS_ERROR_NOT_IMPLEMENTED;
}
////////////////////////////////////////////////////////////////////////////////
{
#if defined(PR_LOGGING)
if (!gPipeLog)
#endif
if (!pipe)
return NS_ERROR_OUT_OF_MEMORY;
return rv;
}
return NS_OK;
}
////////////////////////////////////////////////////////////////////////////////