mqueue.c revision 06a502e8e7b8206c327cec289485fbf7079d5a90
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2005 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
#include "synonyms.h"
#define _KMEMUSER
#include <mqueue.h>
#include <errno.h>
#include <stdarg.h>
#include <limits.h>
#include <pthread.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <inttypes.h>
#include "mqlib.h"
#include "pos4obj.h"
#include "pos4.h"
/*
* The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
* If this assumption is somehow invalidated, mq_open() needs to be changed
* back to the old version which kept a count and enforced a limit.
* by checking _MQ_OPEN_MAX at compile time.
*/
#if _MQ_OPEN_MAX != -1
#error "librt:mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
#endif
#ifdef DEBUG
#define MQ_ASSERT(x) \
assert(x);
_m->mq_totsize));
int _val; \
#else
#define MQ_ASSERT(x)
#endif
#define ABS_TIME 0
#define REL_TIME 1
static int
{
/*
* Any use of a message queue after it was closed is
* undefined. But the standard strongly favours EBADF
* returns. Before we dereference which could be fatal,
* we first do some pointer sanity checks.
*/
}
return (0);
}
static void
{
int i;
/*
* We only need to initialize the non-zero fields. The use of
* ftruncate() on the message queue file assures that the
* pages will be zfod.
*/
/*
* As of this writing (1997), there are 32 message queue priorities.
* If this is to change, then the size of the mq_mask will also
* have to change. If NDEBUG isn't defined, assert that
* _MQ_PRIO_MAX hasn't changed.
*/
/*
* Since the message queue can be mapped into different
* virtual address ranges by different processes, we don't
* keep track of pointers, only offsets into the shared region.
*/
}
}
static size_t
{
/*
* Get the head and tail pointers for the queue of maximum
* priority. We shouldn't be here unless there is a message for
* us, so it's fair to assert that both the head and tail
* pointers are non-NULL.
*/
/*
* We just nuked the last message in this priority's queue.
* Twiddle this priority's bit, and then find the next bit
* tipped.
*/
break;
}
/*
* Copy the message, and put the buffer back on the free list.
*/
}
static void
{
/*
* Grab a free message block, and link it in. We shouldn't
* be here unless there is room in the queue for us; it's
* fair to assert that the free pointer is non-NULL.
*/
/*
* Remove a message from the free list, and copy in the new contents.
*/
if (*tailpp == 0) {
/*
* This is the first message on this queue. Set the
* head and tail pointers, and tip the appropriate bit
* in the priority mask.
*/
} else {
}
}
{
int fd;
int err;
int cr_flag = 0;
int locked = 0;
void *ptr;
return ((mqd_t)-1);
/* acquire MSGQ lock to have atomic operation */
goto out;
locked = 1;
}
goto out;
/* closing permission file */
(void) __close_nc(fd);
if (cr_flag) {
goto out;
goto out;
} else {
}
/* adjust for message size at word boundary */
total_size = sizeof (mqhdr_t) +
if (total_size > SSIZE_MAX) {
goto out;
}
/*
* who have read or write permission
*/
goto out;
/* force permissions to avoid umask effect */
goto out;
goto out;
} else {
goto out;
/* Message queue has not been initialized yet */
sizeof (total_size) || total_size == 0) {
goto out;
}
/* Message queue too big for this process to handle */
if (total_size > SSIZE_MAX) {
goto out;
}
}
goto out;
}
goto out;
cr_flag |= DFILE_MMAP;
/* closing data file */
(void) __close_nc(fd);
cr_flag &= ~DFILE_OPEN;
/*
* create, unlink, size, mmap, and close description file
* all for a flag word in anonymous shared memory
*/
0666, &err)) < 0)
goto out;
cr_flag |= DFILE_OPEN;
goto out;
goto out;
cr_flag |= MQDNP_MMAP;
(void) __close_nc(fd);
cr_flag &= ~DFILE_OPEN;
/*
* we follow the same strategy as filesystem open() routine,
*/
/* new message queue requires initialization */
if ((cr_flag & DFILE_CREATE) != 0) {
/* message queue header has to be initialized */
}
locked = 0; /* fall into the error case */
out:
if ((cr_flag & DFILE_OPEN) != 0)
(void) __close_nc(fd);
if ((cr_flag & DFILE_CREATE) != 0)
if ((cr_flag & PFILE_CREATE) != 0)
if ((cr_flag & DFILE_MMAP) != 0)
if ((cr_flag & MQDNP_MMAP) != 0)
if (locked)
return ((mqd_t)-1);
}
int
{
int canstate;
if (!mq_is_valid(mqdp)) {
return (-1);
}
continue;
/* Notification is set for this descriptor, remove it */
}
/* Invalidate the descriptor before freeing it */
}
int
_mq_unlink(const char *path)
{
int err;
if (__pos4obj_check(path) < 0)
return (-1);
return (-1);
}
errno = 0;
}
return (-1);
return (err);
}
static int
{
int err;
int canstate;
int notify = 0;
/*
* sem_*wait() does cancellation, if called.
* pthread_testcancel() ensures that cancellation takes place if
* there is a cancellation pending when mq_*send() is called.
*/
return (-1);
}
return (-1);
}
return (-1);
}
else {
/*
* We might get cancelled here...
*/
else
}
if (err == -1) {
/*
* errno has been set to EAGAIN / EINTR / ETIMEDOUT
* by sem_*wait(), so we can just return.
*/
return (-1);
}
/*
* By the time we're here, we know that we've got the capacity
* to add to the queue...now acquire the exclusive lock.
*/
if (err == -1) {
/*
* We must have been interrupted by a signal.
* Post on mq_notfull so someone else can take our slot.
*/
return (-1);
}
/*
* Now determine if we want to kick the notification. POSIX
* requires that if a process has registered for notification,
* we must kick it when the queue makes an empty to non-empty
* transition, and there are no blocked receivers. Note that
* this mechanism does _not_ guarantee that the kicked process
* will be able to receive a message without blocking; another
* receiver could intervene in the meantime. Thus,
* the notification mechanism is inherently racy; all we can
* do is hope to minimize the window as much as possible. In
* general, we want to avoid kicking the notification when
* there are clearly receivers blocked. We'll determine if we
* want to kick the notification before the mq_putmsg(), but the
* actual signotify() won't be done until the message is on
* the queue.
*/
notify = 1;
}
/*
* The ordering here is important. We want to make sure that
* one has to have mq_exclusive before being able to kick
* mq_notempty.
*/
if (notify) {
}
return (0);
}
int
{
}
int
{
abs_timeout, ABS_TIME));
}
int
{
rel_timeout, REL_TIME));
}
static void
{
int canstate;
continue;
}
static ssize_t
{
int canstate;
int err;
/*
* sem_*wait() does cancellation, if called.
* pthread_testcancel() ensures that cancellation takes place if
* there is a cancellation pending when mq_*receive() is called.
*/
return (ssize_t)(-1);
}
return (ssize_t)(-1);
}
/*
* The semaphoring scheme for mq_[timed]receive is a little hairy
* thanks to POSIX.1b's arcane notification mechanism. First,
* we try to take the common case and do a sem_trywait().
* If that doesn't work, and O_NONBLOCK hasn't been set,
* then note that we're going to sleep by incrementing the rblocked
* semaphore. We decrement that semaphore after waking up.
*/
/*
* errno has been set to EAGAIN or EINTR by
* sem_trywait(), so we can just return.
*/
return (-1);
}
/*
* If we're here, then we're probably going to block...
* increment the rblocked semaphore. If we get
* cancelled, decrement_rblocked() will decrement it.
*/
else
if (err == -1) {
/*
* We took a signal or timeout while waiting
* on mq_notempty...
*/
return (-1);
}
}
if (err == -1) {
/*
* We must have been interrupted by a signal.
* Post on mq_notfull so someone else can take our message.
*/
return (-1);
}
return (msg_size);
}
{
}
{
abs_timeout, ABS_TIME));
}
{
rel_timeout, REL_TIME));
}
int
{
int canstate;
if (!mq_is_valid(mqdp)) {
return (-1);
}
continue;
if (notification == NULL) {
/*
* Remove signotify_id if queue is registered with
* this process
*/
} else {
/*
* if registered with another process or mqdes
*/
goto bad;
}
} else {
/*
* Register notification with this process.
*/
switch (notification->sigev_notify) {
case SIGEV_NONE:
mq_siginfo.si_signo = 0;
break;
case SIGEV_SIGNAL:
break;
case SIGEV_THREAD:
goto bad;
default:
goto bad;
}
/*
* Either notification is not present, or if
* notification is already present, but the process
* which registered notification does not exist then
* kernel can register notification for current process.
*/
goto bad;
}
return (0);
bad:
return (-1);
}
int
{
if (!mq_is_valid(mqdp)) {
return (-1);
}
/* store current attributes */
int count;
}
/* set description attributes */
return (0);
}
int
{
int count;
if (!mq_is_valid(mqdp)) {
return (-1);
}
return (0);
}