stream.c revision aa3da23c1e735ce968058c7dfd8b0480e31d4e3d
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
* Copyright 2006 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
#include <sys/sysmacros.h>
#include <sys/multidata.h>
#include <sys/multidata_impl.h>
#ifdef DEBUG
#include <sys/kmem_impl.h>
#endif
/*
* This file contains all the STREAMS utility routines that may
* be used by modules and drivers.
*/
/*
* STREAMS message allocator: principles of operation
*
* The streams message allocator consists of all the routines that
* allocate, dup and free streams messages: allocb(), [d]esballoc[a],
* dupb(), freeb() and freemsg(). What follows is a high-level view
* of how the allocator works.
*
* Every streams message consists of one or more mblks, a dblk, and data.
* All mblks for all types of messages come from a common mblk_cache.
* The dblk and data come in several flavors, depending on how the
* message is allocated:
*
* (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
* PAGESIZE, dblks are allocated separately from the buffer.
* The associated buffer is allocated by the constructor using kmem_alloc().
* For all other message sizes, dblk and its associated data is allocated
* as a single contiguous chunk of memory.
* Objects in these caches consist of a dblk plus its associated data.
* allocb() determines the nearest-size cache by table lookup:
* the dblk_cache[] array provides the mapping from size to dblk cache.
*
* (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
* kmem_alloc()'ing a buffer for the data and supplying that
* buffer to gesballoc(), described below.
*
* (3) The four flavors of [d]esballoc[a] are all implemented by a
* common routine, gesballoc() ("generic esballoc"). gesballoc()
* allocates a dblk from the global dblk_esb_cache and sets db_base,
* db_lim and db_frtnp to describe the caller-supplied buffer.
*
* While there are several routines to allocate messages, there is only
* one routine to free messages: freeb(). freeb() simply invokes the
* dblk's free method, dbp->db_free(), which is set at allocation time.
*
* dupb() creates a new reference to a message by allocating a new mblk,
* incrementing the dblk reference count and setting the dblk's free
* method to dblk_decref(). The dblk's original free method is retained
* in db_lastfree. dblk_decref() decrements the reference count on each
* freeb(). If this is not the last reference it just frees the mblk;
* if this *is* the last reference, it restores db_free to db_lastfree,
* sets db_mblk to the current mblk (see below), and invokes db_lastfree.
*
* The implementation makes aggressive use of kmem object caching for
* maximum performance. This makes the code simple and compact, but
* also a bit abstruse in some places. The invariants that constitute a
* message's constructed state, described below, are more subtle than usual.
*
* Every dblk has an "attached mblk" as part of its constructed state.
* The mblk is allocated by the dblk's constructor and remains attached
* until the message is either dup'ed or pulled up. In the dupb() case
* the mblk association doesn't matter until the last free, at which time
* dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
* the mblk association because it swaps the leading mblks of two messages,
* so it is responsible for swapping their db_mblk pointers accordingly.
* From a constructed-state viewpoint it doesn't matter that a dblk's
* attached mblk can change while the message is allocated; all that
* matters is that the dblk has *some* attached mblk when it's freed.
*
* The sizes of the allocb() small-message caches are not magical.
* They represent a good trade-off between internal and external
* fragmentation for current workloads. They should be reevaluated
* periodically, especially if allocations larger than DBLK_MAX_CACHE
* become common. We use 64-byte alignment so that dblks don't
* straddle cache lines unnecessarily.
*/
#define DBLK_MAX_CACHE 73728
#define DBLK_CACHE_ALIGN 64
#define DBLK_MIN_SIZE 8
#define DBLK_SIZE_SHIFT 3
#ifdef _BIG_ENDIAN
#define DBLK_RTFU_SHIFT(field) \
#else
#define DBLK_RTFU_SHIFT(field) \
#endif
static size_t dblk_sizes[] = {
#ifdef _LP64
16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
#else
64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
#endif
};
static struct kmem_cache *mblk_cache;
static struct kmem_cache *dblk_esb_cache;
static struct kmem_cache *fthdr_cache;
static struct kmem_cache *ftblk_cache;
static int allocb_tryhard_fails;
static void frnop_func(void *arg);
/*
*/
int dblk_kmem_flags = 0;
int mblk_kmem_flags = 0;
static int
{
return (-1);
if ((msg_size & PAGEOFFSET) == 0) {
return (-1);
}
} else {
}
dbp->db_struioflag = 0;
return (0);
}
/*ARGSUSED*/
static int
{
return (-1);
dbp->db_struioflag = 0;
return (0);
}
static int
{
return (-1);
return (-1);
}
dbp->db_struioflag = 0;
return (0);
}
/*ARGSUSED*/
static void
{
if ((msg_size & PAGEOFFSET) == 0) {
}
}
static void
{
}
void
streams_msg_init(void)
{
char name[40];
struct kmem_cache *cp;
int offset;
/*
* We are in the middle of a page, dblk should
* be allocated on the same page
*/
< PAGESIZE);
} else {
/*
* buf size is multiple of page size, dblk and
* buffer are allocated separately.
*/
}
}
}
sizeof (dblk_t), DBLK_CACHE_ALIGN,
/* Initialize Multidata caches */
mmd_init();
}
/*ARGSUSED*/
mblk_t *
{
if (size != 0) {
goto out;
}
index = 0;
}
goto out;
}
MBLK_BAND_FLAG_WORD(mp) = 0;
out:
return (mp);
}
mblk_t *
{
}
return (mp);
}
mblk_t *
{
return (mp);
}
mblk_t *
{
return (mp);
}
void
{
}
void
{
while (mp) {
}
}
/*
* Reallocate a block for another use. Try hard to use the old block.
* If the old data is wanted (copy), leave b_wptr at the end of the data,
* otherwise return b_wptr = b_rptr.
*
* This routine is private and unstable.
*/
mblk_t *
{
unsigned char *old_rptr;
/*
* If the data is wanted and it will fit where it is, no
* work is required.
*/
return (mp);
/* XXX other mp state could be copied too, db_flags ... ? */
} else {
return (NULL);
}
if (copy) {
}
return (mp1);
}
static void
{
/* set credp and projid to be 'unspecified' before returning to cache */
}
/* Reset the struioflag and the checksum flag fields */
dbp->db_struioflag = 0;
/* and the COOKED flag */
}
static void
{
/*
* atomic_add_32_nv() just decremented db_ref, so we no longer
* have a reference to the dblk, which means another thread
* could free it. Therefore we cannot examine the dblk to
* determine whether ours was the last reference. Instead,
* we extract the new and minimum reference counts from rtfu.
* Note that all we're really saying is "if (ref != refmin)".
*/
return;
}
}
}
mblk_t *
{
goto out;
do {
/*
* If db_ref is maxed out we can't dup this message anymore.
*/
goto out;
}
out:
return (new_mp);
}
static void
{
/* set credp and projid to be 'unspecified' before returning to cache */
}
dbp->db_struioflag = 0;
}
/*ARGSUSED*/
static void
frnop_func(void *arg)
{
}
/*
* Generic esballoc used to implement the four flavors: [d]esballoc[a].
*/
static mblk_t *
{
goto out;
}
MBLK_BAND_FLAG_WORD(mp) = 0;
out:
return (mp);
}
/*ARGSUSED*/
mblk_t *
{
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
return (mp);
}
}
/*
* Same as esballoc() but sleeps waiting for memory.
*/
/*ARGSUSED*/
mblk_t *
{
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
return (mp);
}
}
/*ARGSUSED*/
mblk_t *
{
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
return (mp);
}
}
/*ARGSUSED*/
mblk_t *
{
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
return (mp);
}
}
/*ARGSUSED*/
mblk_t *
{
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
return (mp);
}
}
static void
{
/* set credp and projid to be 'unspecified' before returning to cache */
}
dbp->db_struioflag = 0;
} else {
}
}
bcache_t *
{
char buffer[255];
NULL) {
return (NULL);
}
return (bcp);
}
void
{
} else {
}
}
/*ARGSUSED*/
mblk_t *
{
goto out;
}
goto out;
}
MBLK_BAND_FLAG_WORD(mp) = 0;
out:
return (mp);
}
static void
{
/* set credp and projid to be 'unspecified' before returning to cache */
}
dbp->db_struioflag = 0;
}
static mblk_t *
{
void *buf;
return (NULL);
return (mp);
}
mblk_t *
{
size += DBLK_CACHE_ALIGN)
return (bp);
return (NULL);
}
/*
* This routine is consolidation private for STREAMS internal use
* This routine may only be called from sync routines (i.e., not
* from put or service procedures). It is located here (rather
* than strsubr.c) so that we don't have to expose all of the
* allocb() implementation details in header files.
*/
mblk_t *
{
if (size != 0) {
FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
return (mp);
}
index = 0;
}
MBLK_BAND_FLAG_WORD(mp) = 0;
} else {
return (NULL);
}
}
return (mp);
}
/*
* Call function 'func' with 'arg' when a class zero block can
* be allocated with priority 'pri'.
*/
{
}
/*
* Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
* ioc_id, rval and error of the struct ioctl to set up an ioctl call.
* This provides consistency for all internal allocators of ioctl.
*/
mblk_t *
{
/*
* Allocate enough space for any of the ioctl related messages.
*/
return (NULL);
/*
* Set the mblk_t information and ptrs correctly.
*/
/*
* Fill in the fields.
*/
return (mp);
}
/*
* test if block of given size can be allocated with a request of
* the given priority.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
int
{
}
/*
* Call function 'func' with argument 'arg' when there is a reasonably
* good chance that a block of size 'size' can be allocated.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
{
struct strbufcall *bcp;
return (0);
/*
* After bcp is linked into strbcalls and strbcall_lock is dropped there
* should be no references to bcp since it may be freed by
* runbufcalls(). Since bcp_id field is returned, we save its value in
* the local var.
*/
/*
* add newly allocated stream event to existing
* linked list of events.
*/
} else {
}
return (bc_id);
}
/*
* Cancel a bufcall request.
*/
void
{
break;
}
if (bcp) {
goto again;
}
} else {
if (pbcp)
else
}
}
}
/*
* Duplicate a message block by block (uses dupb), returning
* a pointer to the duplicate message.
* Returns a non-NULL value only if the entire message
* was dup'd.
*/
mblk_t *
{
return (NULL);
return (NULL);
}
}
return (head);
}
#define DUPB_NOLOAN(bp) \
mblk_t *
{
return (NULL);
return (NULL);
}
}
return (head);
}
/*
* Copy data from message and data block to newly allocated message and
* data block. Returns new message block pointer, or NULL if error.
* The alignment of rptr (w.r.t. word alignment) will be the same in the copy
* as in the original even when db_base is not word aligned. (bug 1052877)
*/
mblk_t *
{
/*
* Special handling for Multidata message; this should be
* removed once a copy-callback routine is made available.
*/
return (NULL);
/* See comments below on potential issues. */
return (nbp);
}
return (NULL);
/*
* Well, here is a potential issue. If we are trying to
* trace a flow, and we copy the message, we might lose
* information about where this message might have been.
* So we should inherit the FT data. On the other hand,
* a user might be interested only in alloc to free data.
* So I guess the real answer is to provide a tunable.
*/
return (nbp);
}
/*
* Copy data from message to newly allocated message using new
* data blocks. Returns a pointer to the new message, or NULL if error.
*/
mblk_t *
{
return (NULL);
return (NULL);
}
}
return (head);
}
/*
* link a message block to tail of message
*/
void
{
;
}
/*
* unlink a message block from head of message
* return pointer to new message.
* NULL if message becomes empty.
*/
mblk_t *
{
return (bp1);
}
/*
* remove a message block "bp" from message "mp"
*
* Return pointer to new message or NULL if no message remains.
* Return -1 if bp is not found in message.
*/
mblk_t *
{
if (lastp)
else
return (mp);
}
}
return ((mblk_t *)-1);
}
/*
* Concatenate and align first len bytes of common
* message type. Len == -1, means concat everything.
* Returns 1 on success, 0 on failure
* After the pullup, mp points to the pulled up data.
*/
int
{
ssize_t n;
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
return (0);
if (len == -1) {
return (1);
} else {
ASSERT(first_mblk_len >= 0);
/*
* If the length is less than that of the first mblk,
* we want to pull up the message into an aligned mblk.
* Though not part of the spec, some callers assume it.
*/
if (len <= first_mblk_len) {
return (1);
return (0);
}
return (0);
do {
len -= n;
break;
return (1);
}
/*
* Concatenate and align at least the first len bytes of common message
* type. Len == -1 means concatenate everything. The original message is
* unaltered. Returns a pointer to a new message on success, otherwise
* returns NULL.
*/
mblk_t *
{
ssize_t n;
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
return (NULL);
return (NULL);
/*
* Copy all of the first msg type into one new mblk, then dupmsg
* and link the rest onto this.
*/
return (NULL);
while (len > 0) {
ASSERT(n >= 0); /* allow zero-length mblk_t's */
if (n > 0)
len -= n;
}
return (NULL);
}
}
return (newmp);
}
/*
* Trim bytes from message
* len > 0, trim from head
* len < 0, trim from tail
* Returns 1 on success, 0 on failure.
*/
int
{
unsigned char type;
ssize_t n;
int fromhead;
int first;
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
return (0);
if (len < 0) {
fromhead = 0;
} else {
fromhead = 1;
}
return (0);
if (fromhead) {
first = 1;
while (len) {
len -= n;
/*
* If this is not the first zero length
* message remove it
*/
} else {
}
first = 0;
}
} else {
while (len) {
/*
* Find the last message of same type
*/
}
break;
len -= n;
/*
* If this is not the first message
* and we have taken away everything
* from this message, remove it
*/
}
}
}
return (1);
}
/*
* get number of data bytes in message
*/
{
}
return (count);
}
/*
* Get a message off head of queue
*
* If queue has no buffers then mark queue
* with QWANTR. (queue wants to be read by
* someone when data becomes available)
*
* If there is something to take off then do so.
* If queue falls below hi water mark turn off QFULL
* flag. Decrement weighted count of queue.
* Also turn off QWANTR because queue is being read.
*
* The queue count is maintained on a per-band basis.
* Priority band 0 (normal messages) uses q_count,
* q_lowat, etc. Non-zero priority bands use the
* fields in their respective qband structures
* (qb_count, qb_lowat, etc.) All messages appear
* on the same list, linked via their b_next pointers.
* q_first is the head of the list. q_count does
* not reflect the size of all the messages on the
* queue. It only reflects those messages in the
* normal band of flow. The one exception to this
* deals with high priority messages. They are in
* their own conceptual "band", but are accounted
* against q_count.
*
* If queue count is below the lo water mark and QWANTW
* is set, enable the closest backq which has a service
* procedure and turn off the QWANTW flag.
*
* getq could be built on top of rmvq, but isn't because
* of performance considerations.
*
* A note on the use of q_count and q_mblkcnt:
* q_count is the traditional byte count for messages that
* have been put on a queue. Documentation tells us that
* do. What was needed, however, is a mechanism to prevent
* runaway streams from consuming all of the resources,
* and particularly be able to flow control zero-length
* messages. q_mblkcnt is used for this purpose. It
* counts the number of mblk's that are being put on
* the queue. The intention here, is that each mblk should
* contain one byte of data and, for the purpose of
* flow-control, logically does. A queue will become
* full when EITHER of these values (q_count and q_mblkcnt)
* reach the highwater mark. It will clear when BOTH
* of them drop below the highwater mark. And it will
* backenable when BOTH of them drop below the lowwater
* mark.
* to find a reasonably accurate q_count, and the
* framework can still try and limit resource usage.
*/
mblk_t *
{
bp = getq_noenab(q);
/*
* Inlined from qbackenable().
* Quick check without holding the lock.
*/
return (bp);
qbackenable(q, band);
return (bp);
}
/*
* Calculate number of data bytes in a single data message block taking
* multidata messages into account.
*/
} else { \
\
}
/*
* Like getq() but does not backenable. This is used by the stream
* head when a putback() is likely. The caller must call qbackenable()
* after it is done with accessing the queue.
*/
mblk_t *
getq_noenab(queue_t *q)
{
} else
mutex_enter(QLOCK(q));
} else {
else
/* Get message byte count for q_count accounting */
mblkcnt++;
}
}
} else {
int i;
while (--i > 0)
} else {
}
}
}
}
mutex_exit(QLOCK(q));
return (bp);
}
/*
* Determine if a backenable is needed after removing a message in the
* specified band.
* NOTE: This routine assumes that something like getq_noenab() has been
* already called.
*
* For the read side it is ok to hold sd_lock across calling this (and the
* stream head often does).
* But for the write side strwakeq might be invoked and it acquires sd_lock.
*/
void
{
int backenab = 0;
ASSERT(q);
/*
* Quick check without holding the lock.
* OK since after getq() has lowered the q_count these flags
* would not change unless either the qbackenable() is done by
* another thread (which is ok) or the queue has gotten QFULL
* in which case another backenable will take place when the queue
* drops below q_lowat.
*/
return;
} else
mutex_enter(QLOCK(q));
if (band == 0) {
}
} else {
int i;
i = band;
while (--i > 0)
}
}
if (backenab == 0) {
mutex_exit(QLOCK(q));
return;
}
/* Have to drop the lock across strwakeq and backenable */
if (backenab & QWANTWSYNC)
q->q_flag &= ~QWANTWSYNC;
if (band != 0)
else {
}
}
mutex_exit(QLOCK(q));
if (backenab & QWANTWSYNC)
strwakeq(q, QWANTWSYNC);
backenable(q, band);
}
/*
* Remove a message from a queue. The queue count and other
* flow control parameters are adjusted and the back queue
* enabled if necessary.
*
* rmvq can be called with the stream frozen, but other utility functions
*/
void
{
rmvq_noenab(q, mp);
/*
* qbackenable can handle a frozen stream but not a "random"
* qlock being held. Drop lock across qbackenable.
*/
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(q));
} else {
}
}
/*
* Like rmvq() but without any backenabling.
* This exists to handle SR_CONSOL_DATA in strrput().
*/
void
{
int i;
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
} else
mutex_enter(QLOCK(q));
while (--i > 0)
else
}
else
}
}
/*
* Remove the message from the list.
*/
else
else
/* Get the size of the message for q_count accounting */
mblkcnt++;
}
}
} else { /* Perform qb_count accounting */
}
}
mutex_exit(QLOCK(q));
}
/*
* Empty a queue.
* If flag is set, remove all messages. Otherwise, remove
* only non-control messages. If queue falls below its low
* water mark, and QWANTW is set, enable the nearest upstream
* service procedure.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushq did not have a check
* for q_lowat == 0 in the backenabling test.
*
* pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
* if one exists on the queue.
*/
void
{
int backenab = 0;
unsigned char bpri;
return;
mutex_enter(QLOCK(q));
q->q_count = 0;
q->q_mblkcnt = 0;
qbp->qb_mblkcnt = 0;
}
mutex_exit(QLOCK(q));
while (mp) {
else
}
bpri = 1;
mutex_enter(QLOCK(q));
backenab = 1;
} else
bpri++;
}
backenab = 1;
qbf[0] = 1;
} else
qbf[0] = 0;
/*
* If any band can now be written to, and there is a writer
* for that band, then backenable the closest service procedure.
*/
if (backenab) {
mutex_exit(QLOCK(q));
backenable(q, bpri);
if (qbf[0])
backenable(q, 0);
} else
mutex_exit(QLOCK(q));
}
/*
* The real flushing takes place in flushq_common. This is done so that
* a flag which specifies whether or not M_PCPROTO messages should be flushed
* or not. Currently the only place that uses this flag is the stream head.
*/
void
{
flushq_common(q, flag, 0);
}
/*
* Flush the queue of messages of the given priority band.
* There is some duplication of code between flushq and flushband.
* This is because we want to optimize the code as much as possible.
* The assumption is that there will be more messages in the normal
* (priority 0) band than in any other.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushband had an extra check for
* did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
* case. That check does not match the man page for flushband and was not
* in the strrput flush code hence it was removed.
*/
void
{
int band;
return;
}
mutex_enter(QLOCK(q));
if (pri == 0) {
q->q_count = 0;
q->q_mblkcnt = 0;
qbp->qb_mblkcnt = 0;
}
mutex_exit(QLOCK(q));
while (mp) {
else
}
mutex_enter(QLOCK(q));
mutex_exit(QLOCK(q));
backenable(q, pri);
} else
mutex_exit(QLOCK(q));
} else { /* pri != 0 */
while (--band > 0)
mutex_exit(QLOCK(q));
return;
}
/*
* rmvq_noenab() and freemsg() are called for each mblk that
* meets the criteria. The loop is executed until the last
* mblk has been processed.
*/
rmvq_noenab(q, mp);
}
}
mutex_exit(QLOCK(q));
/*
* If any mblk(s) has been freed, we know that qbackenable()
* will need to be called.
*/
if (flushed)
qbackenable(q, pri);
}
}
/*
* Return 1 if the queue is not full. If the queue is full, return
* 0 (may not put message) and set QWANTW flag (caller wants to write
* to the queue).
*/
int
{
/* this is for loopback transports, they should not do a canput */
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
return (1);
}
mutex_enter(QLOCK(q));
mutex_exit(QLOCK(q));
return (0);
}
mutex_exit(QLOCK(q));
return (1);
}
/*
* This is the new canput for use with priority bands. Return 1 if the
* band is not full. If the band is full, return 0 (may not put message)
* and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
* write to the queue).
*/
int
{
if (!q)
return (0);
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
mutex_enter(QLOCK(q));
if (pri == 0) {
mutex_exit(QLOCK(q));
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
} else { /* pri != 0 */
/*
* No band exists yet, so return success.
*/
mutex_exit(QLOCK(q));
return (1);
}
while (--pri)
mutex_exit(QLOCK(q));
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
}
mutex_exit(QLOCK(q));
return (1);
}
/*
* Put a message on a queue.
*
* Messages are enqueued on a priority basis. The priority classes
* are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
* and B_NORMAL (type < QPCTL && band == 0).
*
* Add appropriate weighted data block sizes to queue count.
* If queue hits high water mark then set QFULL flag.
*
* If QNOENAB is not set (putq is allowed to enable the queue),
* enable the queue only if the message is PRIORITY,
* or the QWANTR flag is set (indicating that the service procedure
* is ready to read the queue. This implies that a service
* procedure must NEVER put a high priority message back on its own
* queue, as this would result in an infinite loop (!).
*/
int
{
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
int i;
/*
* The qband structure for this priority band is
* not on the queue yet, so we have to allocate
* one on the fly. It would be wasteful to
* associate the qband structures with every
* queue when the queues are allocated. This is
* because most queues will only need the normal
* band of flow which can be described entirely
* by the queue itself.
*/
while (*qbpp)
mutex_exit(QLOCK(q));
return (0);
}
q->q_nband++;
}
}
while (--i)
}
/*
* If queue is empty, add the message and initialize the pointers.
* Otherwise, adjust message pointers and queue pointers based on
* the type of the message and where it belongs on the queue. Some
* code is duplicated to minimize the number of conditionals and
* hopefully minimize the amount of time this routine takes.
*/
if (!q->q_first) {
if (qbp) {
}
} else if (!qbp) { /* bp->b_band == 0 */
/*
* If queue class of message is less than or equal to
* that of the last one on the queue, tack on to the end.
*/
} else {
/*
* Insert bp before tmp.
*/
else
}
} else { /* bp->b_band != 0 */
/*
* Insert bp after the last message in this band.
*/
else
} else {
/*
* Tack bp on end of queue.
*/
} else {
/*
* Insert bp before tmp.
*/
else
}
}
}
/* Get message byte count for q_count accounting */
mblkcnt++;
}
if (qbp) {
}
} else {
}
}
qenable_locked(q);
mutex_exit(QLOCK(q));
return (1);
}
/*
* Put stuff back at beginning of Q according to priority order.
* See comment on putq above for details.
*/
int
{
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
int i;
while (*qbpp)
mutex_exit(QLOCK(q));
return (0);
}
q->q_nband++;
}
}
while (--i)
}
/*
* If queue is empty or if message is high priority,
* place on the front of the queue.
*/
if (tmp)
else
if (qbp) {
}
} else if (qbp) { /* bp->b_band != 0 */
if (tmp) {
/*
* Insert bp before the first message in this band.
*/
else
} else {
/*
* Tack bp on end of queue.
*/
} else {
/*
* Insert bp before tmp.
*/
else
}
}
} else { /* bp->b_band == 0 && !QPCTL */
/*
* If the queue class or band is less than that of the last
* message on the queue, tack bp on the end of the queue.
*/
} else {
/*
* Insert bp before tmp.
*/
else
}
}
/* Get message byte count for q_count accounting */
mblkcnt++;
}
if (qbp) {
}
} else {
}
}
qenable_locked(q);
mutex_exit(QLOCK(q));
return (1);
}
/*
* Insert a message before an existing message on the queue. If the
* existing message is NULL, the new messages is placed on the end of
* the queue. The queue class of the new message is ignored. However,
* the priority band of the new message must adhere to the following
* ordering:
*
* emp->b_prev->b_band >= mp->b_band >= emp->b_band.
*
* All flow control parameters are updated.
*
* insq can be called with the stream frozen, but other utility functions
*/
int
{
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
} else
mutex_enter(QLOCK(q));
goto badord;
}
if (emp) {
goto badord;
}
} else {
"insq: attempt to insert message out of order "
"on q %p", (void *)q);
mutex_exit(QLOCK(q));
return (0);
}
}
int i;
while (*qbpp)
mutex_exit(QLOCK(q));
return (0);
}
q->q_nband++;
}
}
while (--i)
}
else
} else {
else
}
/* Get mblk and byte count for q_count accounting */
mblkcnt++;
}
if (qbp) { /* adjust qband pointers and count */
} else {
}
}
} else {
}
}
qenable_locked(q);
mutex_exit(QLOCK(q));
return (1);
}
/*
* Create and put a control message on queue.
*/
int
{
return (0);
return (1);
}
/*
* Control message with a single-byte parameter
*/
int
{
return (0);
return (1);
}
int
{
return (0);
return (1);
}
int
{
return (0);
return (1);
}
/*
* Return the queue upstream from this one
*/
queue_t *
{
q = _OTHERQ(q);
if (q->q_next) {
q = q->q_next;
return (_OTHERQ(q));
}
return (NULL);
}
/*
* Send a block back up the queue in reverse from this
* one (e.g. to respond to ioctls)
*/
void
{
}
/*
* Streams Queue Scheduling
*
* Queues are enabled through qenable() when they have messages to
* process. They are serviced by queuerun(), which runs each enabled
* queue's service procedure. The call to queuerun() is processor
* dependent - the general principle is that it be run whenever a queue
* is enabled but before returning to user level. For system calls,
* the function runqueues() is called if their action causes a queue
* to be enabled. For device interrupts, queuerun() should be
* called before returning from the last level of interrupt. Beyond
* this, no timing assumptions should be made about queue scheduling.
*/
/*
* Enable a queue: put it on list of those whose service procedures are
* ready to run and set up the scheduling mechanism.
* The broadcast is done outside the mutex -> to avoid the woken thread
* from contending with the mutex. This is OK 'cos the queue has been
* enqueued on the runlist and flagged safely at this point.
*/
void
{
mutex_enter(QLOCK(q));
qenable_locked(q);
mutex_exit(QLOCK(q));
}
/*
* Return number of messages on queue
*/
int
{
int count = 0;
count++;
return (count);
}
/*
* noenable - set queue so that putq() will not enable it.
* enableok - set queue so that putq() can enable it.
*/
void
{
mutex_enter(QLOCK(q));
mutex_exit(QLOCK(q));
}
void
{
mutex_enter(QLOCK(q));
mutex_exit(QLOCK(q));
}
/*
* Set queue fields.
*/
int
{
int error = 0;
} else
mutex_enter(QLOCK(q));
goto done;
}
if (pri != 0) {
int i;
while (*qbpp)
goto done;
}
q->q_nband++;
}
}
i = pri;
while (--i)
}
switch (what) {
case QHIWAT:
if (qbp)
else
break;
case QLOWAT:
if (qbp)
else
break;
case QMAXPSZ:
if (qbp)
else
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
mutex_exit(QLOCK(q));
}
if (strmsgsz != 0) {
else {
else
}
}
mutex_enter(QLOCK(q));
}
break;
case QMINPSZ:
if (qbp)
else
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
mutex_exit(QLOCK(q));
}
mutex_enter(QLOCK(q));
}
break;
case QSTRUIOT:
if (qbp)
else
break;
case QCOUNT:
case QFIRST:
case QLAST:
case QFLAG:
break;
default:
break;
}
done:
mutex_exit(QLOCK(q));
return (error);
}
/*
* Get queue fields.
*/
int
{
int error = 0;
} else
mutex_enter(QLOCK(q));
goto done;
}
if (pri != 0) {
int i;
while (*qbpp)
goto done;
}
q->q_nband++;
}
}
i = pri;
while (--i)
}
switch (what) {
case QHIWAT:
if (qbp)
else
break;
case QLOWAT:
if (qbp)
else
break;
case QMAXPSZ:
if (qbp)
else
break;
case QMINPSZ:
if (qbp)
else
break;
case QCOUNT:
if (qbp)
else
break;
case QFIRST:
if (qbp)
else
break;
case QLAST:
if (qbp)
else
break;
case QFLAG:
if (qbp)
else
break;
case QSTRUIOT:
if (qbp)
else
break;
default:
break;
}
done:
mutex_exit(QLOCK(q));
return (error);
}
/*
* QWANTWSYNC or QWANTR or QWANTW,
*
* Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
* deferred wakeup will be done. Also if strpoll() in progress then a
* deferred pollwakeup will be done.
*/
void
{
pollhead_t *pl;
if (flag & QWANTWSYNC) {
} else {
}
} else {
}
{
if (events)
}
} else {
}
}
}
int
{
int typ = STRUIOT_STANDARD;
unsigned char *ptr;
int error = 0;
/*
* Plumbing may change while taking the type so store the
* queue in a temporary variable. It doesn't matter even
* if the we take the type from the previous plumbing,
* that's because if the plumbing has changed when we were
* holding the queue in a temporary variable, we can continue
* processing the message the way it would have been processed
* in the old plumbing, without any side effects but a bit
* extra processing for partial ip header checksum.
*
* This has been done to avoid holding the sd_lock which is
* very hot.
*/
if (stwrq)
/*
* Either this mblk has already been processed
* or there is no more room in this mblk (?).
*/
continue;
}
switch (typ) {
case STRUIOT_STANDARD:
if (noblock) {
no_trap();
error = EWOULDBLOCK;
goto out;
}
}
if (noblock)
no_trap();
goto out;
}
if (noblock)
no_trap();
break;
default:
goto out;
}
}
out:
/*
* A fault has occured and some bytes were moved to the
* current mblk, the uio_t has already been updated by
* the appropriate uio routine, so also update the mblk
* to reflect this in case this same mblk chain is used
* again (after the fault has been handled).
*/
}
return (error);
}
/*
* Try to enter queue synchronously. Any attempt to enter a closing queue will
* fails. The qp->q_rwcnt keeps track of the number of successful entries so
* that removeq() will not try to close the queue while a thread is inside the
* queue.
*/
static boolean_t
{
return (B_FALSE);
}
return (B_TRUE);
}
/*
* Decrease the count of threads running in sync stream queue and wake up any
* threads blocked in removeq().
*/
static void
{
}
}
/*
* The purpose of rwnext() is to call the rw procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue sq_count is incremented and it does
* not matter if any regular put entrypoints have been already entered. We
* can't increment one of the sq_putcounts (instead of sq_count) because
* qwait_rw won't know which counter to decrement.
*
* It would be reasonable to add the lockless FASTPUT logic.
*/
int
{
int (*proc)();
int isread;
int rval;
/*
* Prevent q_next from changing by holding sd_lock until acquiring
* SQLOCK. Note that a read-side rwnext from the streamhead will
* already have sd_lock acquired. In either case sd_lock is always
* released after acquiring SQLOCK.
*
* The streamhead read-side holding sd_lock when calling rwnext is
* required to prevent a race condition were M_DATA mblks flowing
* up the read-side of the stream could be bypassed by a rwnext()
* down-call. In this case sd_lock acts as the streamhead perimeter.
*/
isread = 0;
} else {
isread = 1;
/* Not streamhead */
}
/*
* Not a synchronous module or no r/w procedure for this
* queue, so just return EINVAL and let the caller handle it.
*/
return (EINVAL);
}
return (EINVAL);
}
/*
* if this queue is being closed, return.
*/
return (EINVAL);
}
/*
* Wait until we can enter the inner perimeter.
*/
}
/*
* Stream plumbing changed while waiting for inner perimeter
* so just return EINVAL and let the caller handle it.
*/
return (EINVAL);
}
/*
* Note: The only message ordering guarantee that rwnext() makes is
* for the write queue flow-control case. All others (r/w queue
* with q_count > 0 (or q_first != 0)) are the resposibilty of
* the queue's rw procedure. This could be genralized here buy
* running the queue's service procedure, but that wouldn't be
* the most efficent for all cases.
*/
/*
* Write queue may be flow controlled. If so,
* mark the queue for wakeup when it's not.
*/
rval = EWOULDBLOCK;
goto out;
}
}
out:
/*
* The queue is protected from being freed by sq_count, so it is
* safe to call rwnext_exit and reacquire SQLOCK(sq).
*/
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
return (rval);
}
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
}
return (rval);
}
/*
* The purpose of infonext() is to call the info procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue regular sq_count is incremented and
* it does not matter if any regular put entrypoints have been already
* entered.
*/
int
{
int (*proc)();
int rval;
/*
* Prevent q_next from changing by holding sd_lock until
* acquiring SQLOCK.
*/
} else {
}
return (EINVAL);
}
/*
* Wait until we can enter the inner perimeter.
*/
}
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
return (rval);
}
/*
* XXXX
* I am not certain the next comment is correct here. I need to consider
* why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
* might cause other problems. It just might be safer to drop it if
* !SQ_CIPUT because that is when we set it.
*/
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
return (rval);
}
/*
* Return nonzero if the queue is responsible for struio(), else return 0.
*/
int
{
return (STREAM(q)->sd_struiordq == q);
else
return (STREAM(q)->sd_struiowrq == q);
}
#if defined(__sparc)
int disable_putlocks = 0;
#else
int disable_putlocks = 1;
#endif
/*
* called by create_putlock.
*/
static void
{
int i;
ASSERT(disable_putlocks == 0);
return;
for (i = 0; i <= 1; i++) {
} else {
/*
* putnext checks sq_ciputctrl without holding
* SQLOCK. if it is not NULL putnext assumes
* sq_nciputctrl is initialized. membar below
* insures that.
*/
}
}
if (i == 1)
break;
q = _OTHERQ(q);
break;
}
}
}
/*
* If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
* syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
* the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
* starting from q and down to the driver.
*
* This should be called after the affected queues are part of stream
* qprocson() call. It is also called from nfs syscall where it is known that
* stream is configured and won't change its geometry during create_putlock
* call.
*
* caller normally uses 0 value for the stream argument to speed up MT putnext
* into the perimeter of q for example because its perimeter is per module
* (e.g. IP).
*
* caller normally uses non 0 value for the stream argument to hint the system
* that the stream of q is a very contended global system stream
* particularly MT hot.
*
* Caller insures stream plumbing won't happen while we are here and therefore
* q_next can be safely used.
*/
void
{
q = _WR(q);
if (disable_putlocks != 0)
return;
if (n_ciputctrl < min_n_ciputctrl)
return;
} else {
/*
* putnext checks sd_ciputctrl without holding
* sd_lock. if it is not NULL putnext assumes
* sd_nciputctrl is initialized. membar below
* insures that.
*/
}
}
while (_SAMESTR(q)) {
if (stream == 0)
return;
q = q->q_next;
}
}
/*
* STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
* through a stream.
*
* Data currently record per event is a hrtime stamp, queue address, event
* type, and a per type datum. Much of the STREAMS framework is instrumented
* for automatic flow tracing (when enabled). Events can be defined and used
* by STREAMS modules and drivers.
*
* Global objects:
*
* str_ftevent() - Add a flow-trace event to a dblk.
* str_ftfree() - Free flow-trace data
*
* Local objects:
*
* fthdr_cache - pointer to the kmem cache for trace header.
* ftblk_cache - pointer to the kmem cache for trace data blocks.
*/
void
{
for (;;) {
/*
* Tail doesn't have room, so need a new tail.
*
* To make this MT safe, first, allocate a new
* ftblk, and initialize it. To make life a
* little easier, reserve the first slot (mostly
* by making ix = 1). When we are finished with
* the initialization, CAS this pointer to the
* tail. If this succeeds, this is the new
* "next" block. Otherwise, another thread
* got here first, so free the block and start
* again.
*/
KM_NOSLEEP))) {
/* no mem, so punt */
str_ftnever++;
/* free up all flow data? */
return;
}
/*
* Just in case there is another thread about
* to get the next index, we need to make sure
* the value is there for it.
*/
/* CAS was successful */
ix = 0;
goto cas_good;
} else {
continue;
}
}
}
}
break;
}
}
if (evnt & FTEV_QMASK) {
/*
* It is possible that the module info is broke
* (as is logsubr.c at this comment writing).
* Instead of panicing or doing other unmentionables,
* we shall put a dummy name as the mid, and continue.
*/
else
} else {
}
}
/*
* Free flow-trace data.
*/
void
{
/*
* Clear out the hash, have the tail point to itself, and free
* any continuation blocks.
*/
}
}
}