Lines Matching refs:stream

61  * The following assertions should hold anytime the stream's mutex is not held
63 #define STREAM_INVARIANT(stream) \
65 SHMEM_ASSERT((stream->shared->readOffset < SHARED_BUFFER_SIZE) \
66 && (stream->shared->readOffset >= 0)); \
67 SHMEM_ASSERT((stream->shared->writeOffset < SHARED_BUFFER_SIZE) \
68 && (stream->shared->writeOffset >= 0)); \
119 * stream. access to hasData and hasSpace synchronized
273 #define FULL(stream) (stream->shared->isFull)
274 #define EMPTY(stream) ((stream->shared->writeOffset == stream->shared->readOffset) \
275 && !stream->shared->isFull)
278 leaveMutex(Stream *stream)
280 return sysIPMutexExit(stream->mutex);
283 /* enter the stream's mutex and (optionally) check for a closed stream */
285 enterMutex(Stream *stream, sys_event_t event)
287 jint ret = sysIPMutexEnter(stream->mutex, event);
289 if (IS_STATE_CLOSED(stream->state)) {
290 setLastErrorMsg("stream closed");
294 if (IS_STATE_CLOSED(stream->state)) {
295 setLastErrorMsg("stream closed");
296 (void)leaveMutex(stream);
303 * Enter/exit with stream mutex held.
304 * On error, does not hold the stream mutex.
307 waitForSpace(SharedMemoryConnection *connection, Stream *stream)
312 while ((error == SYS_OK) && FULL(stream)) {
313 CHECK_ERROR(leaveMutex(stream));
314 error = sysEventWait(connection->otherProcess, stream->hasSpace, 0);
316 CHECK_ERROR(enterMutex(stream, connection->shutdown));
325 signalSpace(Stream *stream)
327 return sysEventSignal(stream->hasSpace);
331 * Enter/exit with stream mutex held.
332 * On error, does not hold the stream mutex.
335 waitForData(SharedMemoryConnection *connection, Stream *stream)
340 while ((error == SYS_OK) && EMPTY(stream)) {
341 CHECK_ERROR(leaveMutex(stream));
342 error = sysEventWait(connection->otherProcess, stream->hasData, 0);
344 CHECK_ERROR(enterMutex(stream, connection->shutdown));
353 signalData(Stream *stream)
355 return sysEventSignal(stream->hasData);
360 closeStream(Stream *stream, jboolean linger)
363 * Lock stream during close - ignore shutdown event as we are
366 CHECK_ERROR(enterMutex(stream, NULL));
368 /* mark the stream as closed */
369 stream->state = STATE_CLOSED;
371 sysEventSignal(stream->hasData);
372 sysEventClose(stream->hasData);
374 sysEventSignal(stream->hasSpace);
375 sysEventClose(stream->hasSpace);
378 * If linger requested then give the stream a few seconds to
383 while (!EMPTY(stream) && attempts>0) {
384 CHECK_ERROR(leaveMutex(stream));
386 CHECK_ERROR(enterMutex(stream, NULL));
391 CHECK_ERROR(leaveMutex(stream));
392 sysIPMutexClose(stream->mutex);
397 * Server creates stream.
400 createStream(char *name, Stream *stream)
406 error = createWithGeneratedName(prefix, stream->shared->mutexName,
407 createMutex, &stream->mutex);
413 error = createWithGeneratedName(prefix, stream->shared->hasDataEventName,
414 createEvent, &stream->hasData);
416 (void)closeStream(stream, JNI_FALSE);
421 error = createWithGeneratedName(prefix, stream->shared->hasSpaceEventName,
422 createEvent, &stream->hasSpace);
424 (void)closeStream(stream, JNI_FALSE);
428 stream->shared->readOffset = 0;
429 stream->shared->writeOffset = 0;
430 stream->shared->isFull = JNI_FALSE;
431 stream->state = STATE_OPEN;
437 * Initialization for the stream opened by the other process
440 openStream(Stream *stream)
444 CHECK_ERROR(sysIPMutexOpen(stream->shared->mutexName, &stream->mutex));
446 error = sysEventOpen(stream->shared->hasDataEventName,
447 &stream->hasData);
450 (void)closeStream(stream, JNI_FALSE);
454 error = sysEventOpen(stream->shared->hasSpaceEventName,
455 &stream->hasSpace);
458 (void)closeStream(stream, JNI_FALSE);
462 stream->state = STATE_OPEN;
940 Stream *stream = &connection->outgoing;
941 SharedStream *shared = stream->shared;
946 CHECK_ERROR(enterMutex(stream, connection->shutdown));
947 CHECK_ERROR(waitForSpace(connection, stream));
948 SHMEM_ASSERT(!FULL(stream));
954 STREAM_INVARIANT(stream);
955 CHECK_ERROR(leaveMutex(stream));
957 CHECK_ERROR(signalData(stream));
965 Stream *stream = &connection->incoming;
966 SharedStream *shared = stream->shared;
971 CHECK_ERROR(enterMutex(stream, connection->shutdown));
972 CHECK_ERROR(waitForData(connection, stream));
973 SHMEM_ASSERT(!EMPTY(stream));
979 STREAM_INVARIANT(stream);
980 CHECK_ERROR(leaveMutex(stream));
982 CHECK_ERROR(signalSpace(stream));
990 Stream *stream = &connection->outgoing;
991 SharedStream *shared = stream->shared;
999 CHECK_ERROR(enterMutex(stream, connection->shutdown));
1001 CHECK_ERROR(waitForSpace(connection, stream));
1002 SHMEM_ASSERT(!FULL(stream));
1018 STREAM_INVARIANT(stream);
1019 CHECK_ERROR(signalData(stream));
1022 CHECK_ERROR(leaveMutex(stream));
1062 Stream *stream = &connection->incoming;
1063 SharedStream *shared = stream->shared;
1071 CHECK_ERROR(enterMutex(stream, connection->shutdown));
1073 CHECK_ERROR(waitForData(connection, stream));
1074 SHMEM_ASSERT(!EMPTY(stream));
1089 STREAM_INVARIANT(stream);
1090 CHECK_ERROR(signalSpace(stream));
1092 CHECK_ERROR(leaveMutex(stream));