msg.c revision e5994f965f428546c8808ba1aec167c12f0612be
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2008 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
#pragma ident "%Z%%M% %I% %E% SMI"
/*
* Inter-Process Communication Message Facility.
*
*
* Resource controls
* -----------------
*
* Control: zone.max-msg-ids (rc_zone_msgmni)
* Description: Maximum number of message queue ids allowed a zone.
*
* When msgget() is used to allocate a message queue, one id is
* allocated. If the id allocation doesn't succeed, msgget() fails
* and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
* the id is deallocated.
*
* Control: project.max-msg-ids (rc_project_msgmni)
* Description: Maximum number of message queue ids allowed a project.
*
* When msgget() is used to allocate a message queue, one id is
* allocated. If the id allocation doesn't succeed, msgget() fails
* and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
* the id is deallocated.
*
* Control: process.max-msg-qbytes (rc_process_msgmnb)
* Description: Maximum number of bytes of messages on a message queue.
*
* When msgget() successfully allocates a message queue, the minimum
* enforced value of this limit is used to initialize msg_qbytes.
*
* Control: process.max-msg-messages (rc_process_msgtql)
* Description: Maximum number of messages on a message queue.
*
* When msgget() successfully allocates a message queue, the minimum
* enforced value of this limit is used to initialize a per-queue
* limit on the number of messages.
*/
#include <sys/ipc_impl.h>
#include <sys/msg_impl.h>
#include <sys/sysmacros.h>
/*
* The following tunables are obsolete. Though for compatibility we
* still read and interpret msginfo_msgmnb, msginfo_msgmni, and
* msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
* mechanism for administrating the IPC Message facility is through the
* resource controls described at the top of this file.
*/
int msginfo_msgmap = 0; /* (obsolete) */
extern rctl_hndl_t rc_zone_msgmni;
extern rctl_hndl_t rc_project_msgmni;
extern rctl_hndl_t rc_process_msgmnb;
extern rctl_hndl_t rc_process_msgtql;
static ipc_service_t *msq_svc;
static zone_key_t msg_zone_key;
static void msg_dtor(kipc_perm_t *);
static void msg_rmid(kipc_perm_t *);
static void msg_remove_zone(zoneid_t, void *);
/*
* Module linkage information for the kernel.
*/
static struct sysent ipcmsg_sysent = {
6,
#ifdef _LP64
#else
#endif
(int (*)())msgsys
};
#ifdef _SYSCALL32_IMPL
static struct sysent ipcmsg_sysent32 = {
6,
(int (*)())msgsys32
};
#endif /* _SYSCALL32_IMPL */
};
#ifdef _SYSCALL32_IMPL
};
#endif
/*
* Big Theory statement for message queue correctness
*
* The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
* receivers who are waiting for an event. Using the cv_broadcast method
* resulted in negative scaling when the number of waiting receivers are large
* (the thundering herd problem). Instead, the receivers waiting to receive a
* message are now linked in a queue-like fashion and awaken one at a time in
* a controlled manner.
*
* Receivers can block on two different classes of waiting list:
* 1) "sendwait" list, which is the more complex list of the two. The
* receiver will be awakened by a sender posting a new message. There
* are two types of "sendwait" list used:
* a) msg_wait_snd: handles all receivers who are looking for
* a message type >= 0, but was unable to locate a match.
*
* slot 0: reserved for receivers that have designated they
* will take any message type.
* rest: consist of receivers requesting a specific type
* but the type was not present. The entries are
* hashed into a bucket in an attempt to keep
* any list search relatively short.
* b) msg_wait_snd_ngt: handles all receivers that have designated
* a negative message type. Unlike msg_wait_snd, the hash bucket
* serves a range of negative message types (-1 to -5, -6 to -10
* and so forth), where the last bucket is reserved for all the
* negative message types that hash outside of MSG_MAX_QNUM - 1.
* This is done this way to simplify the operation of locating a
* negative message type.
*
* 2) "copyout" list, where the receiver is awakened by another
* receiver after a message is copied out. This is a linked list
* of waiters that are awakened one at a time. Although the solution is
* not optimal, the complexity that would be added in for waking
* up the right entry far exceeds any potential pay back (too many
* correctness and corner case issues).
*
* The lists are doubly linked. In the case of the "sendwait"
* list, this allows the thread to remove itself from the list without having
* to traverse the list. In the case of the "copyout" list it simply allows
* us to use common functions with the "sendwait" list.
*
* To make sure receivers are not hung out to dry, we must guarantee:
* 1. If any queued message matches any receiver, then at least one
* matching receiver must be processing the request.
* 2. Blocking on the copyout queue is only temporary while messages
* are being copied out. The process is guaranted to wakeup
* when it gets to front of the queue (copyout is a FIFO).
*
* Rules for blocking and waking up:
* 1. A receiver entering msgrcv must examine all messages for a match
* before blocking on a sendwait queue.
* 2. If the receiver blocks because the message it chose is already
* being copied out, then when it wakes up needs to start start
* checking the messages from the beginning.
* 3) When ever a process returns from msgrcv for any reason, if it
* had attempted to copy a message or blocked waiting for a copy
* to complete it needs to wakeup the next receiver blocked on
* a copy out.
* 4) When a message is sent, the sender selects a process waiting
* for that type of message. This selection process rotates between
* receivers types of 0, negative and positive to prevent starvation of
* any one particular receiver type.
* 5) The following are the scenarios for processes that are awakened
* by a msgsnd:
* a) The process finds the message and is able to copy
* it out. Once complete, the process returns.
* b) The message that was sent that triggered the wakeup is no
* longer available (another process found the message first).
* We issue a wakeup on copy queue and then go back to
* sleep waiting for another matching message to be sent.
* c) The message that was supposed to be processed was
* already serviced by another process. However a different
* message is present which we can service. The message
* is copied and the process returns.
* d) The message is found, but some sort of error occurs that
* prevents the message from being copied. The receiver
* wakes up the next sender that can service this message
* type and returns an error to the caller.
* e) The message is found, but it is marked as being copied
* out. The receiver then goes to sleep on the copyout
* queue where it will be awakened again sometime in the future.
*
*
* 6) Whenever a message is found that matches the message type designated,
* but is being copied out we have to block on the copyout queue.
* After process copying finishes the copy out, it must wakeup (either
* directly or indirectly) all receivers who blocked on its copyout,
* so they are guaranteed a chance to examine the remaining messages.
* This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
* and so on. The chain cannot be broken. This leads to the following
* cases:
* a) A receiver is finished copying the message (or encountered)
* an error), the first entry on the copyout queue is woken
* up.
* b) When the receiver is woken up, it attempts to locate
* a message type match.
* c) If a message type is found and
* -- MSG_RCVCOPY flag is not set, the message is
* marked for copying out. Regardless of the copyout
* success the next entry on the copyout queue is
* awakened and the operation is completed.
* -- MSG_RCVCOPY is set, we simply go back to sleep again
* on the copyout queue.
* d) If the message type is not found then we wakeup the next
* process on the copyout queue.
*/
static uint_t msg_type_hash(long);
kmsqid_t *);
static void msg_rcvq_wakeup_all(list_t *);
msg_select_t msg_fnd_sndr[] = {
{ msg_fnd_neg_snd, &msg_fnd_sndr[0] }
};
{ msg_fnd_any_rdr, &msg_fnd_rdr[0] },
};
static struct modlinkage modlinkage = {
&modlsys,
#ifdef _SYSCALL32_IMPL
#endif
};
int
_init(void)
{
int result;
return (0);
(void) zone_key_delete(msg_zone_key);
return (result);
}
int
_fini(void)
{
return (EBUSY);
}
int
{
}
static void
{
int ii;
}
}
/*
* msg_rele - decrement the reference count on the message. When count
* reaches zero, free message header and contents.
*/
static void
{
}
}
/*
* msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
* waiting for free bytes on queue.
*
* Called with queue locked.
*/
static void
{
/* Wake up waiting writers */
if (qp->msg_snd_cnt)
}
static void
{
int ii;
/*
* Wake up everyone who is in a wait state of some sort
* for this message queue.
*/
}
if (qp->msg_snd_cnt)
}
/*
* msgctl system call.
*
* gets q lock (via ipc_lookup), releases before return.
* may call users of msg_lock
*/
static int
{
int error;
struct msqid_ds64 ds64;
/*
* Perform pre- or non-lookup actions (e.g. copyins, RMID).
*/
switch (cmd) {
case IPC_SET:
break;
case IPC_SET64:
break;
case IPC_RMID:
return (0);
}
/*
* get msqid_ds for this msgid
*/
switch (cmd) {
case IPC_SET:
secpolicy_ipc_config(cr) != 0) {
}
}
break;
case IPC_STAT:
}
if (qp->msg_rcv_cnt)
if (qp->msg_snd_cnt)
break;
case IPC_SET64:
secpolicy_ipc_config(cr) != 0 &&
}
}
break;
case IPC_STAT64:
if (qp->msg_rcv_cnt)
if (qp->msg_snd_cnt)
break;
default:
}
/*
* Do copyout last (after releasing mutex).
*/
switch (cmd) {
case IPC_STAT:
break;
case IPC_STAT64:
break;
}
return (0);
}
/*
* Remove all message queues associated with a given zone. Called by
* zone_shutdown when the zone is halted.
*/
/*ARGSUSED1*/
static void
{
}
/*
* msgget system call.
*/
static int
{
int ii;
top:
qp->msg_ngt_cnt = 0;
qp->msg_neg_copy = 0;
sizeof (msgq_wakeup_t),
sizeof (msgq_wakeup_t),
}
/*
* The proper initialization of msg_lowest_type is to the
* highest possible value. By doing this we guarantee that
* when the first send happens, the lowest type will be set
* properly.
*/
sizeof (msgq_wakeup_t),
qp->msg_rcv_cnt = 0;
qp->msg_snd_cnt = 0;
(kipc_perm_t *)qp)) {
goto top;
}
}
if (audit_active)
return (id);
}
static ssize_t
{
int error = 0;
int cvres;
}
goto msgrcv_out;
}
/*
* Various information (including the condvar_t) required for the
* process to sleep is provided by it's stack.
*/
msg_entry.msgw_snd_wake = 0;
if (smp) {
/*
* We found a possible message to copy out.
*/
long t = msg_entry.msgw_snd_wake;
/*
* It is available, attempt to copy it.
*/
/*
* It is possible to consume a different message
* type then what originally awakened for (negative
* types). If this happens a check must be done to
* to determine if another receiver is available
* for the waking message type, Failure to do this
* can result in a message on the queue that can be
* serviced by a sleeping receiver.
*/
/*
* Don't forget to wakeup a sleeper that blocked because
* we were copying things out.
*/
goto msgrcv_out;
}
/*
* The selected message is being copied out, so block. We do
* not need to wake the next person up on the msg_cpy_block list
* due to the fact some one is copying out and they will get
* things moving again once the copy is completed.
*/
if (error) {
goto msgrcv_out;
}
goto findmsg;
}
/*
* There isn't a message to copy out that matches the designated
* criteria.
*/
if (msgflg & IPC_NOWAIT) {
goto msgrcv_out;
}
/*
* Wait for new message. We keep the negative and positive types
* separate for performance reasons.
*/
msg_entry.msgw_snd_wake = 0;
if (msgtyp >= 0) {
} else {
qp->msg_ngt_cnt++;
qp->msg_ngt_cnt--;
}
goto findmsg;
}
if (error) {
if (msg_entry.msgw_snd_wake) {
}
}
}
static int
{
return (EIDRM);
}
if (cvres == 0) {
return (EINTR);
}
return (0);
}
static int
{
int copyerror = 0;
if ((msgflg & MSG_NOERROR) == 0) {
return (E2BIG);
} else {
}
} else {
}
/*
* To prevent a DOS attack we mark the message as being
* copied out and release mutex. When the copy is completed
* we need to acquire the mutex and make the appropriate updates.
*/
if (msgtyp < 0) {
}
mutex_exit(*lock);
if (mdl == DATAMODEL_NATIVE) {
} else {
/*
* 32-bit callers need an imploded msg type.
*/
sizeof (msg_type32));
}
}
/*
* Reclaim the mutex and make sure the message queue still exists.
*/
if (msgtyp < 0) {
qp->msg_neg_copy = 0;
}
return (EIDRM);
}
if (copyerror) {
return (EFAULT);
}
return (0);
}
static struct msg *
{
long qp_low;
long low_msgtype;
static struct msg neg_copy_smp;
if (msgtyp == 0) {
} else {
if (msgtyp > 0) {
/*
* If our lowest possible message type is larger than
* the message type desired, then we know there is
* no entry present.
*/
return (NULL);
}
break;
}
}
} else {
/*
* We have kept track of the lowest possible message
* type on the send queue. This allows us to terminate
* the search early if we find a message type of that
* type. Note, the lowest type may not be the actual
* lowest value in the system, it is only guaranteed
* that there isn't a value lower than that.
*/
low_msgtype = -msgtyp;
if (low_msgtype < qp_low) {
return (NULL);
}
if (qp->msg_neg_copy) {
return (&neg_copy_smp);
}
if (low_msgtype == qp_low) {
break;
}
}
}
if (smp) {
/*
* Update the lowest message type.
*/
}
}
}
return (smp);
}
/*
* msgids system call.
*/
static int
{
int error;
return (0);
}
/*
* msgsnap system call.
*/
static int
{
int error, i;
}
/*
* First compute the required buffer size and
* the number of messages on the queue.
*/
if (msgtyp == 0 ||
nmsg++;
if (mdl == DATAMODEL_NATIVE)
else
}
}
nmsg = 0;
if (nmsg > 0) {
/*
* Mark the messages as being copied.
*/
i = 0;
if (msgtyp == 0 ||
i++;
}
}
}
/*
* Copy out the buffer header.
*/
/*
* Now copy out the messages one by one.
*/
for (i = 0; i < nmsg; i++) {
if (error == 0) {
if (error == 0 &&
if (mdl == DATAMODEL_NATIVE)
else
}
/* Check for msg q deleted or reallocated */
}
if (nmsg > 0)
if (error)
return (0);
}
#define MSG_PREALLOC_LIMIT 8192
/*
* msgsnd system call.
*/
static int
{
long type;
int error = 0;
if (mdl == DATAMODEL_NATIVE) {
} else {
}
if (type < 1)
/*
* We want the value here large enough that most of the
* the message operations will use the "lockless" path,
* but small enough that a user can not reserve large
* chunks of kernel memory unless they have a valid
* reason to.
*/
if (msgsz <= MSG_PREALLOC_LIMIT) {
/*
* We are small enough that we can afford to do the
* allocation now. This saves dropping the lock
* and then reacquiring the lock.
*/
if (msgsz) {
goto msgsnd_out;
}
}
}
goto msgsnd_out;
}
goto msgsnd_out;
}
goto msgsnd_out;
top:
/*
* Allocate space on q, message header, & buffer space.
*/
int cvres;
if (msgflg & IPC_NOWAIT) {
goto msgsnd_out;
}
qp->msg_snd_cnt++;
qp->msg_snd_cnt--;
goto msgsnd_out;
}
}
int failure;
goto msgsnd_out;
}
if (failure) {
goto msgsnd_out;
}
goto top;
}
/*
* Everything is available, put msg on q.
*/
/*
* Get the proper receiver going.
*/
if (lock)
if (error) {
if (mp)
}
return (0);
}
static void
{
do {
if (wakeup) {
if (type) {
}
}
}
static uint_t
msg_type_hash(long msg_type)
{
if (msg_type < 0) {
/*
* Negative message types are hashed over an
* interval. Any message type that hashes
* beyond MSG_MAX_QNUM is automatically placed
* in the last bucket.
*/
if (hash > MSG_MAX_QNUM)
hash = MSG_MAX_QNUM;
return (hash);
}
/*
* 0 or positive message type. The first bucket is reserved for
* message receivers of type 0, the other buckets we hash into.
*/
if (msg_type)
return (0);
}
/*
* Routines to see if we have a receiver of type 0 either blocked waiting
* for a message. Simply return the first guy on the list.
*/
static msgq_wakeup_t *
/* ARGSUSED */
{
if (walker)
return (walker);
}
static msgq_wakeup_t *
/* ARGSUSED */
{
if (walker)
return (walker);
}
static msgq_wakeup_t *
{
if (walker)
return (walker);
}
/* ARGSUSED */
static msgq_wakeup_t *
{
int count;
int check_index;
int neg_index;
int nbuckets;
if (!qp->msg_ngt_cnt) {
return (NULL);
}
/*
* Check for a match among the negative type queues. Any buckets
* at neg_index or larger can match the type. Use the last send
* time to randomize the starting bucket to prevent starvation.
* Search all buckets from neg_index to MSG_MAX_QNUM, starting
* from the random starting point, and wrapping around after
* MSG_MAX_QNUM.
*/
while (qptr) {
/*
* The lowest hash bucket may actually contain
* message types that are not valid for this
* request. This can happen due to the fact that
* the message buckets actually contain a consecutive
* range of types.
*/
qptr);
return (qptr);
}
qptr);
}
if (++check_index > MSG_MAX_QNUM) {
}
}
return (NULL);
}
static int
{
int cvres;
qp->msg_rcv_cnt++;
qp->msg_rcv_cnt--;
/*
* We woke up unexpectedly, remove ourself.
*/
}
return (cvres);
}
static void
{
}
}
/*
* msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
* system calls.
*/
static ssize_t
{
switch (opcode) {
case MSGGET:
break;
case MSGCTL:
break;
case MSGRCV:
break;
case MSGSND:
break;
case MSGIDS:
break;
case MSGSNAP:
break;
default:
break;
}
return (error);
}
#ifdef _SYSCALL32_IMPL
/*
* msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
* system calls for 32-bit callers on LP64 kernel.
*/
static ssize32_t
{
switch (opcode) {
case MSGGET:
break;
case MSGCTL:
break;
case MSGRCV:
break;
case MSGSND:
break;
case MSGIDS:
break;
case MSGSNAP:
break;
default:
break;
}
return (error);
}
#endif /* SYSCALL32_IMPL */