mqueue.c revision 34537ab1867f3ee4fb96c044d343e33685a463aa
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2008 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
#include "lint.h"
#include "mtlib.h"
#define _KMEMUSER
#include <mqueue.h>
#include <errno.h>
#include <stdarg.h>
#include <limits.h>
#include <pthread.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <inttypes.h>
#include "sigev_thread.h"
#include "pos4obj.h"
/*
* Default values per message queue
*/
#define MQ_MAXMSG 128
#define MQ_MAXSIZE 1024
/*
* Message header which is part of messages in link list
*/
typedef struct {
} msghdr_t;
/*
* message queue description
*/
struct mq_dn {
};
/*
* message queue descriptor structure
*/
typedef struct mq_des {
int mqd_magic; /* magic # to identify mq_des */
int mqd_flags; /* operation flag per open */
int mqd_ownerdead; /* mq_exclusive is inconsistent */
} mqdes_t;
/*
* message queue common header, part of the mmap()ed file.
* Since message queues may be shared between 32- and 64-bit processes,
* care must be taken to make sure that the elements of this structure
* are identical for both _LP64 and _ILP32 cases.
*/
typedef struct mq_header {
/* first field must be mq_totsize, DO NOT insert before this */
} mqhdr_t;
/*
* The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
* If this assumption is somehow invalidated, mq_open() needs to be changed
* back to the old version which kept a count and enforced a limit.
* by checking _MQ_OPEN_MAX at compile time.
*/
#if _MQ_OPEN_MAX != -1
#error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
#endif
#ifdef DEBUG
_m->mq_totsize));
int _val; \
#else
#define MQ_ASSERT(x)
#endif
#define ABS_TIME 0
#define REL_TIME 1
static int
{
/*
* Any use of a message queue after it was closed is
* undefined. But the standard strongly favours EBADF
* returns. Before we dereference which could be fatal,
* we first do some pointer sanity checks.
*/
}
return (0);
}
static void
{
int i;
/*
* We only need to initialize the non-zero fields. The use of
* ftruncate() on the message queue file assures that the
* pages will be zero-filled.
*/
/*
* As of this writing (1997), there are 32 message queue priorities.
* If this is to change, then the size of the mq_mask will
* also have to change. If DEBUG is defined, assert that
* _MQ_PRIO_MAX hasn't changed.
*/
#if defined(DEBUG)
/* LINTED always true */
#endif
/*
* Since the message queue can be mapped into different
* virtual address ranges by different processes, we don't
* keep track of pointers, only offsets into the shared region.
*/
}
}
static size_t
{
/*
* Get the head and tail pointers for the queue of maximum
* priority. We shouldn't be here unless there is a message for
* us, so it's fair to assert that both the head and tail
* pointers are non-NULL.
*/
/*
* We just nuked the last message in this priority's queue.
* Twiddle this priority's bit, and then find the next bit
* tipped.
*/
break;
}
/*
* Copy the message, and put the buffer back on the free list.
*/
}
static void
{
/*
* Grab a free message block, and link it in. We shouldn't
* be here unless there is room in the queue for us; it's
* fair to assert that the free pointer is non-NULL.
*/
/*
* Remove a message from the free list, and copy in the new contents.
*/
if (*tailpp == 0) {
/*
* This is the first message on this queue. Set the
* head and tail pointers, and tip the appropriate bit
* in the priority mask.
*/
} else {
}
}
/*
* Send a notification and also delete the registration.
*/
static void
{
}
/*
* Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
* Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
* they fail with errno == EBADMSG. Trigger any registered notification.
*/
static void
{
if (error == EOWNERDEAD) {
}
}
{
int fd;
int err;
int cr_flag = 0;
int locked = 0;
void *ptr;
return ((mqd_t)-1);
/* acquire MSGQ lock to have atomic operation */
goto out;
locked = 1;
}
goto out;
/* closing permission file */
(void) __close_nc(fd);
if (cr_flag) {
goto out;
goto out;
} else {
}
/* adjust for message size at word boundary */
total_size = sizeof (mqhdr_t) +
if (total_size > SSIZE_MAX) {
goto out;
}
/*
* who have read or write permission
*/
goto out;
/* force permissions to avoid umask effect */
goto out;
goto out;
} else {
goto out;
/* Message queue has not been initialized yet */
sizeof (total_size) || total_size == 0) {
goto out;
}
/* Message queue too big for this process to handle */
if (total_size > SSIZE_MAX) {
goto out;
}
}
goto out;
}
goto out;
cr_flag |= DFILE_MMAP;
/* closing data file */
(void) __close_nc(fd);
cr_flag &= ~DFILE_OPEN;
/*
* create, unlink, size, mmap, and close description file
* all for a flag word in anonymous shared memory
*/
0666, &err)) < 0)
goto out;
cr_flag |= DFILE_OPEN;
goto out;
goto out;
cr_flag |= MQDNP_MMAP;
(void) __close_nc(fd);
cr_flag &= ~DFILE_OPEN;
/*
* we follow the same strategy as filesystem open() routine,
*/
/* new message queue requires initialization */
if ((cr_flag & DFILE_CREATE) != 0) {
/* message queue header has to be initialized */
}
mqdp->mqd_ownerdead = 0;
if (mq_list)
}
locked = 0; /* fall into the error case */
out:
if ((cr_flag & DFILE_OPEN) != 0)
(void) __close_nc(fd);
if ((cr_flag & DFILE_CREATE) != 0)
if ((cr_flag & PFILE_CREATE) != 0)
if ((cr_flag & DFILE_MMAP) != 0)
if ((cr_flag & MQDNP_MMAP) != 0)
if (locked)
return ((mqd_t)-1);
}
static void
{
/* invalidate the descriptor before freeing it */
if (!mqdp->mqd_ownerdead)
}
int
{
int error;
if (!mq_is_valid(mqdp)) {
return (-1);
}
if (error == EOWNERDEAD)
/* carry on regardless, without holding mq_exclusive */
}
/* notification is set for this descriptor, remove it */
}
}
return (0);
}
int
{
int err;
if (__pos4obj_check(path) < 0)
return (-1);
return (-1);
}
errno = 0;
}
return (-1);
return (err);
}
static int
{
int err;
int notify = 0;
/*
* sem_*wait() does cancellation, if called.
* pthread_testcancel() ensures that cancellation takes place if
* there is a cancellation pending when mq_*send() is called.
*/
return (-1);
}
return (-1);
}
return (-1);
}
else {
/*
* We might get cancelled here...
*/
else
}
if (err == -1) {
/*
* errno has been set to EAGAIN / EINTR / ETIMEDOUT
* by sem_*wait(), so we can just return.
*/
return (-1);
}
/*
* By the time we're here, we know that we've got the capacity
* to add to the queue...now acquire the exclusive lock.
*/
return (-1);
}
/*
* Now determine if we want to kick the notification. POSIX
* requires that if a process has registered for notification,
* we must kick it when the queue makes an empty to non-empty
* transition, and there are no blocked receivers. Note that
* this mechanism does _not_ guarantee that the kicked process
* will be able to receive a message without blocking;
* another receiver could intervene in the meantime. Thus,
* the notification mechanism is inherently racy; all we can
* do is hope to minimize the window as much as possible.
* In general, we want to avoid kicking the notification when
* there are clearly receivers blocked. We'll determine if
* we want to kick the notification before the mq_putmsg(),
* but the actual signotify() won't be done until the message
* is on the queue.
*/
notify = 1;
}
if (notify) {
/* notify and also delete the registration */
}
return (0);
}
int
{
}
int
{
abs_timeout, ABS_TIME));
}
int
{
rel_timeout, REL_TIME));
}
static void
{
int cancel_state;
continue;
}
static ssize_t
{
int err;
/*
* sem_*wait() does cancellation, if called.
* pthread_testcancel() ensures that cancellation takes place if
* there is a cancellation pending when mq_*receive() is called.
*/
return (ssize_t)(-1);
}
return (ssize_t)(-1);
}
/*
* The semaphoring scheme for mq_[timed]receive is a little hairy
* thanks to POSIX.1b's arcane notification mechanism. First,
* we try to take the common case and do a sem_trywait().
* If that doesn't work, and O_NONBLOCK hasn't been set,
* then note that we're going to sleep by incrementing the rblocked
* semaphore. We decrement that semaphore after waking up.
*/
/*
* errno has been set to EAGAIN or EINTR by
* sem_trywait(), so we can just return.
*/
return (-1);
}
/*
* If we're here, then we're probably going to block...
* increment the rblocked semaphore. If we get
* cancelled, decrement_rblocked() will decrement it.
*/
else
if (err == -1) {
/*
* We took a signal or timeout while waiting
* on mq_notempty...
*/
return (-1);
}
}
return (-1);
}
return (msg_size);
}
{
}
{
abs_timeout, ABS_TIME));
}
{
rel_timeout, REL_TIME));
}
/*
* Only used below, in mq_notify().
* We already have a spawner thread.
* Verify that the attributes match; cancel it if necessary.
*/
static int
{
if (do_cancel) {
/*
* Attributes don't match, cancel the spawner thread.
*/
} else {
/*
* Reuse the existing spawner thread with possibly
* changed notification function and value.
*/
}
return (do_cancel);
}
int
{
void *userval;
int rval = -1;
int ntype;
int port;
int error;
if (!mq_is_valid(mqdp)) {
return (-1);
}
if (error == EOWNERDEAD)
/* carry on regardless, without holding mq_exclusive */
}
/* notification is set for this descriptor, remove it */
if (tcdp->tcd_msg_enabled) {
/* cancel the spawner thread */
(void) pthread_cancel(
}
}
} else {
/* notification is not set for this descriptor */
goto bad;
}
} else { /* register notification with this process */
case SIGEV_THREAD:
port = -1;
break;
case SIGEV_PORT:
goto bad;
}
break;
}
switch (ntype) {
case SIGEV_NONE:
mq_siginfo.si_signo = 0;
break;
case SIGEV_SIGNAL:
break;
case SIGEV_THREAD:
/* FALLTHROUGH */
case SIGEV_PORT:
/* we must create a spawner thread */
goto bad;
}
tcdp->tcd_msg_enabled = 0;
tcdp->tcd_msg_closing = 0;
if (launch_spawner(tcdp) != 0) {
goto bad;
}
}
mq_siginfo.si_signo = 0;
break;
default:
goto bad;
}
/* register notification */
goto bad;
switch (ntype) {
case SIGEV_THREAD:
case SIGEV_PORT:
break;
}
}
rval = 0; /* success */
bad:
if (error == 0) {
} else {
rval = -1;
}
return (rval);
}
int
{
if (!mq_is_valid(mqdp)) {
return (-1);
}
/* store current attributes */
int count;
}
/* set description attributes */
return (0);
}
int
{
int count;
if (!mq_is_valid(mqdp)) {
return (-1);
}
return (0);
}
/*
* Cleanup after fork1() in the child process.
*/
void
postfork1_child_sigev_mq(void)
{
}
}
}