stream.c revision e4506d67152cff4f3b9e808f5acdbc4417dc2197
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
* Copyright 2005 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
#include <sys/types.h>
#include <sys/param.h>
#include <sys/thread.h>
#include <sys/sysmacros.h>
#include <sys/stropts.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/strsun.h>
#include <sys/conf.h>
#include <sys/debug.h>
#include <sys/cmn_err.h>
#include <sys/kmem.h>
#include <sys/atomic.h>
#include <sys/errno.h>
#include <sys/vtrace.h>
#include <sys/ftrace.h>
#include <sys/ontrap.h>
#include <sys/multidata.h>
#include <sys/multidata_impl.h>
#include <sys/sdt.h>
#ifdef DEBUG
#include <sys/kmem_impl.h>
#endif
/*
* This file contains all the STREAMS utility routines that may
* be used by modules and drivers.
*/
/*
* STREAMS message allocator: principles of operation
*
* The streams message allocator consists of all the routines that
* allocate, dup and free streams messages: allocb(), [d]esballoc[a],
* dupb(), freeb() and freemsg(). What follows is a high-level view
* of how the allocator works.
*
* Every streams message consists of one or more mblks, a dblk, and data.
* All mblks for all types of messages come from a common mblk_cache.
* The dblk and data come in several flavors, depending on how the
* message is allocated:
*
* (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
* fixed-size dblk/data caches. For message sizes that are multiples of
* PAGESIZE, dblks are allocated separately from the buffer.
* The associated buffer is allocated by the constructor using kmem_alloc().
* For all other message sizes, dblk and its associated data is allocated
* as a single contiguous chunk of memory.
* Objects in these caches consist of a dblk plus its associated data.
* allocb() determines the nearest-size cache by table lookup:
* the dblk_cache[] array provides the mapping from size to dblk cache.
*
* (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
* kmem_alloc()'ing a buffer for the data and supplying that
* buffer to gesballoc(), described below.
*
* (3) The four flavors of [d]esballoc[a] are all implemented by a
* common routine, gesballoc() ("generic esballoc"). gesballoc()
* allocates a dblk from the global dblk_esb_cache and sets db_base,
* db_lim and db_frtnp to describe the caller-supplied buffer.
*
* While there are several routines to allocate messages, there is only
* one routine to free messages: freeb(). freeb() simply invokes the
* dblk's free method, dbp->db_free(), which is set at allocation time.
*
* dupb() creates a new reference to a message by allocating a new mblk,
* incrementing the dblk reference count and setting the dblk's free
* method to dblk_decref(). The dblk's original free method is retained
* in db_lastfree. dblk_decref() decrements the reference count on each
* freeb(). If this is not the last reference it just frees the mblk;
* if this *is* the last reference, it restores db_free to db_lastfree,
* sets db_mblk to the current mblk (see below), and invokes db_lastfree.
*
* The implementation makes aggressive use of kmem object caching for
* maximum performance. This makes the code simple and compact, but
* also a bit abstruse in some places. The invariants that constitute a
* message's constructed state, described below, are more subtle than usual.
*
* Every dblk has an "attached mblk" as part of its constructed state.
* The mblk is allocated by the dblk's constructor and remains attached
* until the message is either dup'ed or pulled up. In the dupb() case
* the mblk association doesn't matter until the last free, at which time
* dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
* the mblk association because it swaps the leading mblks of two messages,
* so it is responsible for swapping their db_mblk pointers accordingly.
* From a constructed-state viewpoint it doesn't matter that a dblk's
* attached mblk can change while the message is allocated; all that
* matters is that the dblk has *some* attached mblk when it's freed.
*
* The sizes of the allocb() small-message caches are not magical.
* They represent a good trade-off between internal and external
* fragmentation for current workloads. They should be reevaluated
* periodically, especially if allocations larger than DBLK_MAX_CACHE
* become common. We use 64-byte alignment so that dblks don't
* straddle cache lines unnecessarily.
*/
#define DBLK_MAX_CACHE 73728
#define DBLK_CACHE_ALIGN 64
#define DBLK_MIN_SIZE 8
#define DBLK_SIZE_SHIFT 3
#ifdef _BIG_ENDIAN
#define DBLK_RTFU_SHIFT(field) \
(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
#else
#define DBLK_RTFU_SHIFT(field) \
(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
#endif
#define DBLK_RTFU(ref, type, flags, uioflag) \
(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
((type) << DBLK_RTFU_SHIFT(db_type)) | \
(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
#define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
#define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
#define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
static size_t dblk_sizes[] = {
#ifdef _LP64
16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
#else
64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
#endif
DBLK_MAX_CACHE, 0
};
static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
static struct kmem_cache *mblk_cache;
static struct kmem_cache *dblk_esb_cache;
static struct kmem_cache *fthdr_cache;
static struct kmem_cache *ftblk_cache;
static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
static mblk_t *allocb_oversize(size_t size, int flags);
static int allocb_tryhard_fails;
static void frnop_func(void *arg);
frtn_t frnop = { frnop_func };
static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
static boolean_t rwnext_enter(queue_t *qp);
static void rwnext_exit(queue_t *qp);
/*
* Patchable mblk/dblk kmem_cache flags.
*/
int dblk_kmem_flags = 0;
int mblk_kmem_flags = 0;
static int
dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
ssize_t msg_size = (ssize_t)cdrarg;
size_t index;
ASSERT(msg_size != 0);
index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
if ((msg_size & PAGEOFFSET) == 0) {
dbp->db_base = kmem_alloc(msg_size, kmflags);
if (dbp->db_base == NULL) {
kmem_cache_free(mblk_cache, dbp->db_mblk);
return (-1);
}
} else {
dbp->db_base = (unsigned char *)&dbp[1];
}
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = dblk_cache[index];
dbp->db_lim = dbp->db_base + msg_size;
dbp->db_free = dbp->db_lastfree = dblk_lastfree;
dbp->db_frtnp = NULL;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
/*ARGSUSED*/
static int
dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = dblk_esb_cache;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
static int
bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
bcache_t *bcp = (bcache_t *)cdrarg;
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
kmflags)) == NULL) {
kmem_cache_free(mblk_cache, dbp->db_mblk);
return (-1);
}
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = (void *)bcp;
dbp->db_lim = dbp->db_base + bcp->size;
dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
dbp->db_frtnp = NULL;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
/*ARGSUSED*/
static void
dblk_destructor(void *buf, void *cdrarg)
{
dblk_t *dbp = buf;
ssize_t msg_size = (ssize_t)cdrarg;
ASSERT(dbp->db_mblk->b_datap == dbp);
ASSERT(msg_size != 0);
ASSERT(dbp->db_struioflag == 0);
ASSERT(dbp->db_struioun.cksum.flags == 0);
if ((msg_size & PAGEOFFSET) == 0) {
kmem_free(dbp->db_base, msg_size);
}
kmem_cache_free(mblk_cache, dbp->db_mblk);
}
static void
bcache_dblk_destructor(void *buf, void *cdrarg)
{
dblk_t *dbp = buf;
bcache_t *bcp = (bcache_t *)cdrarg;
kmem_cache_free(bcp->buffer_cache, dbp->db_base);
ASSERT(dbp->db_mblk->b_datap == dbp);
ASSERT(dbp->db_struioflag == 0);
ASSERT(dbp->db_struioun.cksum.flags == 0);
kmem_cache_free(mblk_cache, dbp->db_mblk);
}
void
streams_msg_init(void)
{
char name[40];
size_t size;
size_t lastsize = DBLK_MIN_SIZE;
size_t *sizep;
struct kmem_cache *cp;
size_t tot_size;
int offset;
mblk_cache = kmem_cache_create("streams_mblk",
sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
mblk_kmem_flags);
for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
if ((offset = (size & PAGEOFFSET)) != 0) {
/*
* We are in the middle of a page, dblk should
* be allocated on the same page
*/
tot_size = size + sizeof (dblk_t);
ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
< PAGESIZE);
ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
} else {
/*
* buf size is multiple of page size, dblk and
* buffer are allocated separately.
*/
ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
tot_size = sizeof (dblk_t);
}
(void) sprintf(name, "streams_dblk_%ld", size);
cp = kmem_cache_create(name, tot_size,
DBLK_CACHE_ALIGN, dblk_constructor,
dblk_destructor, NULL,
(void *)(size), NULL, dblk_kmem_flags);
while (lastsize <= size) {
dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
lastsize += DBLK_MIN_SIZE;
}
}
dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
sizeof (dblk_t), DBLK_CACHE_ALIGN,
dblk_esb_constructor, dblk_destructor, NULL,
(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
fthdr_cache = kmem_cache_create("streams_fthdr",
sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
ftblk_cache = kmem_cache_create("streams_ftblk",
sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
/* Initialize Multidata caches */
mmd_init();
}
/*ARGSUSED*/
mblk_t *
allocb(size_t size, uint_t pri)
{
dblk_t *dbp;
mblk_t *mp;
size_t index;
index = (size - 1) >> DBLK_SIZE_SHIFT;
if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
if (size != 0) {
mp = allocb_oversize(size, KM_NOSLEEP);
goto out;
}
index = 0;
}
if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
mp = NULL;
goto out;
}
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
out:
FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
return (mp);
}
mblk_t *
allocb_tmpl(size_t size, const mblk_t *tmpl)
{
mblk_t *mp = allocb(size, 0);
if (mp != NULL) {
cred_t *cr = DB_CRED(tmpl);
if (cr != NULL)
crhold(mp->b_datap->db_credp = cr);
DB_CPID(mp) = DB_CPID(tmpl);
DB_TYPE(mp) = DB_TYPE(tmpl);
}
return (mp);
}
mblk_t *
allocb_cred(size_t size, cred_t *cr)
{
mblk_t *mp = allocb(size, 0);
if (mp != NULL && cr != NULL)
crhold(mp->b_datap->db_credp = cr);
return (mp);
}
mblk_t *
allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
{
mblk_t *mp = allocb_wait(size, 0, flags, error);
if (mp != NULL && cr != NULL)
crhold(mp->b_datap->db_credp = cr);
return (mp);
}
void
freeb(mblk_t *mp)
{
dblk_t *dbp = mp->b_datap;
ASSERT(dbp->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
dbp->db_free(mp, dbp);
}
void
freemsg(mblk_t *mp)
{
FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
while (mp) {
dblk_t *dbp = mp->b_datap;
mblk_t *mp_cont = mp->b_cont;
ASSERT(dbp->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
dbp->db_free(mp, dbp);
mp = mp_cont;
}
}
/*
* Reallocate a block for another use. Try hard to use the old block.
* If the old data is wanted (copy), leave b_wptr at the end of the data,
* otherwise return b_wptr = b_rptr.
*
* This routine is private and unstable.
*/
mblk_t *
reallocb(mblk_t *mp, size_t size, uint_t copy)
{
mblk_t *mp1;
unsigned char *old_rptr;
ptrdiff_t cur_size;
if (mp == NULL)
return (allocb(size, BPRI_HI));
cur_size = mp->b_wptr - mp->b_rptr;
old_rptr = mp->b_rptr;
ASSERT(mp->b_datap->db_ref != 0);
if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
/*
* If the data is wanted and it will fit where it is, no
* work is required.
*/
if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
return (mp);
mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
mp1 = mp;
} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
/* XXX other mp state could be copied too, db_flags ... ? */
mp1->b_cont = mp->b_cont;
} else {
return (NULL);
}
if (copy) {
bcopy(old_rptr, mp1->b_rptr, cur_size);
mp1->b_wptr = mp1->b_rptr + cur_size;
}
if (mp != mp1)
freeb(mp);
return (mp1);
}
static void
dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
/* Reset the struioflag and the checksum flag fields */
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_cache_free(dbp->db_cache, dbp);
}
static void
dblk_decref(mblk_t *mp, dblk_t *dbp)
{
if (dbp->db_ref != 1) {
uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
-(1 << DBLK_RTFU_SHIFT(db_ref)));
/*
* atomic_add_32_nv() just decremented db_ref, so we no longer
* have a reference to the dblk, which means another thread
* could free it. Therefore we cannot examine the dblk to
* determine whether ours was the last reference. Instead,
* we extract the new and minimum reference counts from rtfu.
* Note that all we're really saying is "if (ref != refmin)".
*/
if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
kmem_cache_free(mblk_cache, mp);
return;
}
}
dbp->db_mblk = mp;
dbp->db_free = dbp->db_lastfree;
dbp->db_lastfree(mp, dbp);
}
mblk_t *
dupb(mblk_t *mp)
{
dblk_t *dbp = mp->b_datap;
mblk_t *new_mp;
uint32_t oldrtfu, newrtfu;
if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
goto out;
new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
new_mp->b_rptr = mp->b_rptr;
new_mp->b_wptr = mp->b_wptr;
new_mp->b_datap = dbp;
new_mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
/*
* First-dup optimization. The enabling assumption is that there
* can can never be a race (in correct code) to dup the first copy
* of a message. Therefore we don't need to do it atomically.
*/
if (dbp->db_free != dblk_decref) {
dbp->db_free = dblk_decref;
dbp->db_ref++;
goto out;
}
do {
ASSERT(dbp->db_ref > 0);
oldrtfu = DBLK_RTFU_WORD(dbp);
newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
/*
* If db_ref is maxed out we can't dup this message anymore.
*/
if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
kmem_cache_free(mblk_cache, new_mp);
new_mp = NULL;
goto out;
}
} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
out:
FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
return (new_mp);
}
static void
dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
{
frtn_t *frp = dbp->db_frtnp;
ASSERT(dbp->db_mblk == mp);
frp->free_func(frp->free_arg);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_cache_free(dbp->db_cache, dbp);
}
/*ARGSUSED*/
static void
frnop_func(void *arg)
{
}
/*
* Generic esballoc used to implement the four flavors: [d]esballoc[a].
*/
static mblk_t *
gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
{
dblk_t *dbp;
mblk_t *mp;
ASSERT(base != NULL && frp != NULL);
if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
mp = NULL;
goto out;
}
mp = dbp->db_mblk;
dbp->db_base = base;
dbp->db_lim = base + size;
dbp->db_free = dbp->db_lastfree = lastfree;
dbp->db_frtnp = frp;
DBLK_RTFU_WORD(dbp) = db_rtfu;
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
out:
FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
return (mp);
}
/*ARGSUSED*/
mblk_t *
esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP));
}
/*
* Same as esballoc() but sleeps waiting for memory.
*/
/*ARGSUSED*/
mblk_t *
esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_SLEEP);
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_SLEEP));
}
/*ARGSUSED*/
mblk_t *
desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP));
}
/*ARGSUSED*/
mblk_t *
esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP));
}
/*ARGSUSED*/
mblk_t *
desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP));
}
static void
bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
bcache_t *bcp = dbp->db_cache;
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
mutex_enter(&bcp->mutex);
kmem_cache_free(bcp->dblk_cache, dbp);
bcp->alloc--;
if (bcp->alloc == 0 && bcp->destroy != 0) {
kmem_cache_destroy(bcp->dblk_cache);
kmem_cache_destroy(bcp->buffer_cache);
mutex_exit(&bcp->mutex);
mutex_destroy(&bcp->mutex);
kmem_free(bcp, sizeof (bcache_t));
} else {
mutex_exit(&bcp->mutex);
}
}
bcache_t *
bcache_create(char *name, size_t size, uint_t align)
{
bcache_t *bcp;
char buffer[255];
ASSERT((align & (align - 1)) == 0);
if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
NULL) {
return (NULL);
}
bcp->size = size;
bcp->align = align;
bcp->alloc = 0;
bcp->destroy = 0;
mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
(void) sprintf(buffer, "%s_buffer_cache", name);
bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
NULL, NULL, NULL, 0);
(void) sprintf(buffer, "%s_dblk_cache", name);
bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
NULL, (void *)bcp, NULL, 0);
return (bcp);
}
void
bcache_destroy(bcache_t *bcp)
{
ASSERT(bcp != NULL);
mutex_enter(&bcp->mutex);
if (bcp->alloc == 0) {
kmem_cache_destroy(bcp->dblk_cache);
kmem_cache_destroy(bcp->buffer_cache);
mutex_exit(&bcp->mutex);
mutex_destroy(&bcp->mutex);
kmem_free(bcp, sizeof (bcache_t));
} else {
bcp->destroy++;
mutex_exit(&bcp->mutex);
}
}
/*ARGSUSED*/
mblk_t *
bcache_allocb(bcache_t *bcp, uint_t pri)
{
dblk_t *dbp;
mblk_t *mp = NULL;
ASSERT(bcp != NULL);
mutex_enter(&bcp->mutex);
if (bcp->destroy != 0) {
mutex_exit(&bcp->mutex);
goto out;
}
if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
mutex_exit(&bcp->mutex);
goto out;
}
bcp->alloc++;
mutex_exit(&bcp->mutex);
ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
out:
FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
return (mp);
}
static void
dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
{
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
kmem_cache_free(dbp->db_cache, dbp);
}
static mblk_t *
allocb_oversize(size_t size, int kmflags)
{
mblk_t *mp;
void *buf;
size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
if ((buf = kmem_alloc(size, kmflags)) == NULL)
return (NULL);
if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
&frnop, dblk_lastfree_oversize, kmflags)) == NULL)
kmem_free(buf, size);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
return (mp);
}
mblk_t *
allocb_tryhard(size_t target_size)
{
size_t size;
mblk_t *bp;
for (size = target_size; size < target_size + 512;
size += DBLK_CACHE_ALIGN)
if ((bp = allocb(size, BPRI_HI)) != NULL)
return (bp);
allocb_tryhard_fails++;
return (NULL);
}
/*
* This routine is consolidation private for STREAMS internal use
* This routine may only be called from sync routines (i.e., not
* from put or service procedures). It is located here (rather
* than strsubr.c) so that we don't have to expose all of the
* allocb() implementation details in header files.
*/
mblk_t *
allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
{
dblk_t *dbp;
mblk_t *mp;
size_t index;
index = (size -1) >> DBLK_SIZE_SHIFT;
if (flags & STR_NOSIG) {
if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
if (size != 0) {
mp = allocb_oversize(size, KM_SLEEP);
FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
(uintptr_t)mp);
return (mp);
}
index = 0;
}
dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
} else {
while ((mp = allocb(size, pri)) == NULL) {
if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
return (NULL);
}
}
return (mp);
}
/*
* Call function 'func' with 'arg' when a class zero block can
* be allocated with priority 'pri'.
*/
bufcall_id_t
esbbcall(uint_t pri, void (*func)(void *), void *arg)
{
return (bufcall(1, pri, func, arg));
}
/*
* Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
* ioc_id, rval and error of the struct ioctl to set up an ioctl call.
* This provides consistency for all internal allocators of ioctl.
*/
mblk_t *
mkiocb(uint_t cmd)
{
struct iocblk *ioc;
mblk_t *mp;
/*
* Allocate enough space for any of the ioctl related messages.
*/
if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
return (NULL);
bzero(mp->b_rptr, sizeof (union ioctypes));
/*
* Set the mblk_t information and ptrs correctly.
*/
mp->b_wptr += sizeof (struct iocblk);
mp->b_datap->db_type = M_IOCTL;
/*
* Fill in the fields.
*/
ioc = (struct iocblk *)mp->b_rptr;
ioc->ioc_cmd = cmd;
ioc->ioc_cr = kcred;
ioc->ioc_id = getiocseqno();
ioc->ioc_flag = IOC_NATIVE;
return (mp);
}
/*
* test if block of given size can be allocated with a request of
* the given priority.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
int
testb(size_t size, uint_t pri)
{
return ((size + sizeof (dblk_t)) <= kmem_avail());
}
/*
* Call function 'func' with argument 'arg' when there is a reasonably
* good chance that a block of size 'size' can be allocated.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
bufcall_id_t
bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
{
static long bid = 1; /* always odd to save checking for zero */
bufcall_id_t bc_id;
struct strbufcall *bcp;
if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
return (0);
bcp->bc_func = func;
bcp->bc_arg = arg;
bcp->bc_size = size;
bcp->bc_next = NULL;
bcp->bc_executor = NULL;
mutex_enter(&strbcall_lock);
/*
* After bcp is linked into strbcalls and strbcall_lock is dropped there
* should be no references to bcp since it may be freed by
* runbufcalls(). Since bcp_id field is returned, we save its value in
* the local var.
*/
bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
/*
* add newly allocated stream event to existing
* linked list of events.
*/
if (strbcalls.bc_head == NULL) {
strbcalls.bc_head = strbcalls.bc_tail = bcp;
} else {
strbcalls.bc_tail->bc_next = bcp;
strbcalls.bc_tail = bcp;
}
cv_signal(&strbcall_cv);
mutex_exit(&strbcall_lock);
return (bc_id);
}
/*
* Cancel a bufcall request.
*/
void
unbufcall(bufcall_id_t id)
{
strbufcall_t *bcp, *pbcp;
mutex_enter(&strbcall_lock);
again:
pbcp = NULL;
for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
if (id == bcp->bc_id)
break;
pbcp = bcp;
}
if (bcp) {
if (bcp->bc_executor != NULL) {
if (bcp->bc_executor != curthread) {
cv_wait(&bcall_cv, &strbcall_lock);
goto again;
}
} else {
if (pbcp)
pbcp->bc_next = bcp->bc_next;
else
strbcalls.bc_head = bcp->bc_next;
if (bcp == strbcalls.bc_tail)
strbcalls.bc_tail = pbcp;
kmem_free(bcp, sizeof (strbufcall_t));
}
}
mutex_exit(&strbcall_lock);
}
/*
* Duplicate a message block by block (uses dupb), returning
* a pointer to the duplicate message.
* Returns a non-NULL value only if the entire message
* was dup'd.
*/
mblk_t *
dupmsg(mblk_t *bp)
{
mblk_t *head, *nbp;
if (!bp || !(nbp = head = dupb(bp)))
return (NULL);
while (bp->b_cont) {
if (!(nbp->b_cont = dupb(bp->b_cont))) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
#define DUPB_NOLOAN(bp) \
((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
copyb((bp)) : dupb((bp)))
mblk_t *
dupmsg_noloan(mblk_t *bp)
{
mblk_t *head, *nbp;
if (bp == NULL || DB_TYPE(bp) != M_DATA ||
((nbp = head = DUPB_NOLOAN(bp)) == NULL))
return (NULL);
while (bp->b_cont) {
if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
/*
* Copy data from message and data block to newly allocated message and
* data block. Returns new message block pointer, or NULL if error.
* The alignment of rptr (w.r.t. word alignment) will be the same in the copy
* as in the original even when db_base is not word aligned. (bug 1052877)
*/
mblk_t *
copyb(mblk_t *bp)
{
mblk_t *nbp;
dblk_t *dp, *ndp;
uchar_t *base;
size_t size;
size_t unaligned;
ASSERT(bp->b_wptr >= bp->b_rptr);
dp = bp->b_datap;
if (dp->db_fthdr != NULL)
STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
/*
* Special handling for Multidata message; this should be
* removed once a copy-callback routine is made available.
*/
if (dp->db_type == M_MULTIDATA) {
cred_t *cr;
if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
return (NULL);
nbp->b_flag = bp->b_flag;
nbp->b_band = bp->b_band;
ndp = nbp->b_datap;
/* See comments below on potential issues. */
STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
ASSERT(ndp->db_type == dp->db_type);
cr = dp->db_credp;
if (cr != NULL)
crhold(ndp->db_credp = cr);
ndp->db_cpid = dp->db_cpid;
return (nbp);
}
size = dp->db_lim - dp->db_base;
unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
return (NULL);
nbp->b_flag = bp->b_flag;
nbp->b_band = bp->b_band;
ndp = nbp->b_datap;
/*
* Well, here is a potential issue. If we are trying to
* trace a flow, and we copy the message, we might lose
* information about where this message might have been.
* So we should inherit the FT data. On the other hand,
* a user might be interested only in alloc to free data.
* So I guess the real answer is to provide a tunable.
*/
STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
base = ndp->db_base + unaligned;
bcopy(dp->db_base, ndp->db_base + unaligned, size);
nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
return (nbp);
}
/*
* Copy data from message to newly allocated message using new
* data blocks. Returns a pointer to the new message, or NULL if error.
*/
mblk_t *
copymsg(mblk_t *bp)
{
mblk_t *head, *nbp;
if (!bp || !(nbp = head = copyb(bp)))
return (NULL);
while (bp->b_cont) {
if (!(nbp->b_cont = copyb(bp->b_cont))) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
/*
* link a message block to tail of message
*/
void
linkb(mblk_t *mp, mblk_t *bp)
{
ASSERT(mp && bp);
for (; mp->b_cont; mp = mp->b_cont)
;
mp->b_cont = bp;
}
/*
* unlink a message block from head of message
* return pointer to new message.
* NULL if message becomes empty.
*/
mblk_t *
unlinkb(mblk_t *bp)
{
mblk_t *bp1;
bp1 = bp->b_cont;
bp->b_cont = NULL;
return (bp1);
}
/*
* remove a message block "bp" from message "mp"
*
* Return pointer to new message or NULL if no message remains.
* Return -1 if bp is not found in message.
*/
mblk_t *
rmvb(mblk_t *mp, mblk_t *bp)
{
mblk_t *tmp;
mblk_t *lastp = NULL;
ASSERT(mp && bp);
for (tmp = mp; tmp; tmp = tmp->b_cont) {
if (tmp == bp) {
if (lastp)
lastp->b_cont = tmp->b_cont;
else
mp = tmp->b_cont;
tmp->b_cont = NULL;
return (mp);
}
lastp = tmp;
}
return ((mblk_t *)-1);
}
/*
* Concatenate and align first len bytes of common
* message type. Len == -1, means concat everything.
* Returns 1 on success, 0 on failure
* After the pullup, mp points to the pulled up data.
*/
int
pullupmsg(mblk_t *mp, ssize_t len)
{
mblk_t *bp, *b_cont;
dblk_t *dbp;
ssize_t n;
ASSERT(mp->b_datap->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (0);
if (len == -1) {
if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
return (1);
len = xmsgsize(mp);
} else {
ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
ASSERT(first_mblk_len >= 0);
/*
* If the length is less than that of the first mblk,
* we want to pull up the message into an aligned mblk.
* Though not part of the spec, some callers assume it.
*/
if (len <= first_mblk_len) {
if (str_aligned(mp->b_rptr))
return (1);
len = first_mblk_len;
} else if (xmsgsize(mp) < len)
return (0);
}
if ((bp = allocb_tmpl(len, mp)) == NULL)
return (0);
dbp = bp->b_datap;
*bp = *mp; /* swap mblks so bp heads the old msg... */
mp->b_datap = dbp; /* ... and mp heads the new message */
mp->b_datap->db_mblk = mp;
bp->b_datap->db_mblk = bp;
mp->b_rptr = mp->b_wptr = dbp->db_base;
do {
ASSERT(bp->b_datap->db_ref > 0);
ASSERT(bp->b_wptr >= bp->b_rptr);
n = MIN(bp->b_wptr - bp->b_rptr, len);
bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
mp->b_wptr += n;
bp->b_rptr += n;
len -= n;
if (bp->b_rptr != bp->b_wptr)
break;
b_cont = bp->b_cont;
freeb(bp);
bp = b_cont;
} while (len && bp);
mp->b_cont = bp; /* tack on whatever wasn't pulled up */
return (1);
}
/*
* Concatenate and align at least the first len bytes of common message
* type. Len == -1 means concatenate everything. The original message is
* unaltered. Returns a pointer to a new message on success, otherwise
* returns NULL.
*/
mblk_t *
msgpullup(mblk_t *mp, ssize_t len)
{
mblk_t *newmp;
ssize_t totlen;
ssize_t n;
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (NULL);
totlen = xmsgsize(mp);
if ((len > 0) && (len > totlen))
return (NULL);
/*
* Copy all of the first msg type into one new mblk, then dupmsg
* and link the rest onto this.
*/
len = totlen;
if ((newmp = allocb_tmpl(len, mp)) == NULL)
return (NULL);
newmp->b_flag = mp->b_flag;
newmp->b_band = mp->b_band;
while (len > 0) {
n = mp->b_wptr - mp->b_rptr;
ASSERT(n >= 0); /* allow zero-length mblk_t's */
if (n > 0)
bcopy(mp->b_rptr, newmp->b_wptr, n);
newmp->b_wptr += n;
len -= n;
mp = mp->b_cont;
}
if (mp != NULL) {
newmp->b_cont = dupmsg(mp);
if (newmp->b_cont == NULL) {
freemsg(newmp);
return (NULL);
}
}
return (newmp);
}
/*
* Trim bytes from message
* len > 0, trim from head
* len < 0, trim from tail
* Returns 1 on success, 0 on failure.
*/
int
adjmsg(mblk_t *mp, ssize_t len)
{
mblk_t *bp;
mblk_t *save_bp = NULL;
mblk_t *prev_bp;
mblk_t *bcont;
unsigned char type;
ssize_t n;
int fromhead;
int first;
ASSERT(mp != NULL);
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (0);
if (len < 0) {
fromhead = 0;
len = -len;
} else {
fromhead = 1;
}
if (xmsgsize(mp) < len)
return (0);
if (fromhead) {
first = 1;
while (len) {
ASSERT(mp->b_wptr >= mp->b_rptr);
n = MIN(mp->b_wptr - mp->b_rptr, len);
mp->b_rptr += n;
len -= n;
/*
* If this is not the first zero length
* message remove it
*/
if (!first && (mp->b_wptr == mp->b_rptr)) {
bcont = mp->b_cont;
freeb(mp);
mp = save_bp->b_cont = bcont;
} else {
save_bp = mp;
mp = mp->b_cont;
}
first = 0;
}
} else {
type = mp->b_datap->db_type;
while (len) {
bp = mp;
save_bp = NULL;
/*
* Find the last message of same type
*/
while (bp && bp->b_datap->db_type == type) {
ASSERT(bp->b_wptr >= bp->b_rptr);
prev_bp = save_bp;
save_bp = bp;
bp = bp->b_cont;
}
if (save_bp == NULL)
break;
n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
save_bp->b_wptr -= n;
len -= n;
/*
* If this is not the first message
* and we have taken away everything
* from this message, remove it
*/
if ((save_bp != mp) &&
(save_bp->b_wptr == save_bp->b_rptr)) {
bcont = save_bp->b_cont;
freeb(save_bp);
prev_bp->b_cont = bcont;
}
}
}
return (1);
}
/*
* get number of data bytes in message
*/
size_t
msgdsize(mblk_t *bp)
{
size_t count = 0;
for (; bp; bp = bp->b_cont)
if (bp->b_datap->db_type == M_DATA) {
ASSERT(bp->b_wptr >= bp->b_rptr);
count += bp->b_wptr - bp->b_rptr;
}
return (count);
}
/*
* Get a message off head of queue
*
* If queue has no buffers then mark queue
* with QWANTR. (queue wants to be read by
* someone when data becomes available)
*
* If there is something to take off then do so.
* If queue falls below hi water mark turn off QFULL
* flag. Decrement weighted count of queue.
* Also turn off QWANTR because queue is being read.
*
* The queue count is maintained on a per-band basis.
* Priority band 0 (normal messages) uses q_count,
* q_lowat, etc. Non-zero priority bands use the
* fields in their respective qband structures
* (qb_count, qb_lowat, etc.) All messages appear
* on the same list, linked via their b_next pointers.
* q_first is the head of the list. q_count does
* not reflect the size of all the messages on the
* queue. It only reflects those messages in the
* normal band of flow. The one exception to this
* deals with high priority messages. They are in
* their own conceptual "band", but are accounted
* against q_count.
*
* If queue count is below the lo water mark and QWANTW
* is set, enable the closest backq which has a service
* procedure and turn off the QWANTW flag.
*
* getq could be built on top of rmvq, but isn't because
* of performance considerations.
*
* A note on the use of q_count and q_mblkcnt:
* q_count is the traditional byte count for messages that
* have been put on a queue. Documentation tells us that
* we shouldn't rely on that count, but some drivers/modules
* do. What was needed, however, is a mechanism to prevent
* runaway streams from consuming all of the resources,
* and particularly be able to flow control zero-length
* messages. q_mblkcnt is used for this purpose. It
* counts the number of mblk's that are being put on
* the queue. The intention here, is that each mblk should
* contain one byte of data and, for the purpose of
* flow-control, logically does. A queue will become
* full when EITHER of these values (q_count and q_mblkcnt)
* reach the highwater mark. It will clear when BOTH
* of them drop below the highwater mark. And it will
* backenable when BOTH of them drop below the lowwater
* mark.
* With this algorithm, a driver/module might be able
* to find a reasonably accurate q_count, and the
* framework can still try and limit resource usage.
*/
mblk_t *
getq(queue_t *q)
{
mblk_t *bp;
uchar_t band = 0;
bp = getq_noenab(q);
if (bp != NULL)
band = bp->b_band;
/*
* Inlined from qbackenable().
* Quick check without holding the lock.
*/
if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
return (bp);
qbackenable(q, band);
return (bp);
}
/*
* Like getq() but does not backenable. This is used by the stream
* head when a putback() is likely. The caller must call qbackenable()
* after it is done with accessing the queue.
*/
mblk_t *
getq_noenab(queue_t *q)
{
mblk_t *bp;
mblk_t *tmp;
qband_t *qbp;
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
/* freezestr should allow its caller to call getq/putq */
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if ((bp = q->q_first) == 0) {
q->q_flag |= QWANTR;
} else {
if ((q->q_first = bp->b_next) == NULL)
q->q_last = NULL;
else
q->q_first->b_prev = NULL;
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
if (bp->b_band == 0) {
q->q_count -= bytecnt;
q->q_mblkcnt -= mblkcnt;
if ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat)) {
q->q_flag &= ~QFULL;
}
} else {
int i;
ASSERT(bp->b_band <= q->q_nband);
ASSERT(q->q_bandp != NULL);
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
i = bp->b_band;
while (--i > 0)
qbp = qbp->qb_next;
if (qbp->qb_first == qbp->qb_last) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
} else {
qbp->qb_first = bp->b_next;
}
qbp->qb_count -= bytecnt;
qbp->qb_mblkcnt -= mblkcnt;
if ((qbp->qb_count < qbp->qb_hiwat) &&
(qbp->qb_mblkcnt < qbp->qb_hiwat)) {
qbp->qb_flag &= ~QB_FULL;
}
}
q->q_flag &= ~QWANTR;
bp->b_next = NULL;
bp->b_prev = NULL;
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
return (bp);
}
/*
* Determine if a backenable is needed after removing a message in the
* specified band.
* NOTE: This routine assumes that something like getq_noenab() has been
* already called.
*
* For the read side it is ok to hold sd_lock across calling this (and the
* stream head often does).
* But for the write side strwakeq might be invoked and it acquires sd_lock.
*/
void
qbackenable(queue_t *q, uchar_t band)
{
int backenab = 0;
qband_t *qbp;
kthread_id_t freezer;
ASSERT(q);
ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
/*
* Quick check without holding the lock.
* OK since after getq() has lowered the q_count these flags
* would not change unless either the qbackenable() is done by
* another thread (which is ok) or the queue has gotten QFULL
* in which case another backenable will take place when the queue
* drops below q_lowat.
*/
if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
return;
/* freezestr should allow its caller to call getq/putq */
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (band == 0) {
if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
q->q_mblkcnt < q->q_lowat)) {
backenab = q->q_flag & (QWANTW|QWANTWSYNC);
}
} else {
int i;
ASSERT((unsigned)band <= q->q_nband);
ASSERT(q->q_bandp != NULL);
qbp = q->q_bandp;
i = band;
while (--i > 0)
qbp = qbp->qb_next;
if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
qbp->qb_mblkcnt < qbp->qb_lowat)) {
backenab = qbp->qb_flag & QB_WANTW;
}
}
if (backenab == 0) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return;
}
/* Have to drop the lock across strwakeq and backenable */
if (backenab & QWANTWSYNC)
q->q_flag &= ~QWANTWSYNC;
if (backenab & (QWANTW|QB_WANTW)) {
if (band != 0)
qbp->qb_flag &= ~QB_WANTW;
else {
q->q_flag &= ~QWANTW;
}
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
if (backenab & QWANTWSYNC)
strwakeq(q, QWANTWSYNC);
if (backenab & (QWANTW|QB_WANTW))
backenable(q, band);
}
/*
* Remove a message from a queue. The queue count and other
* flow control parameters are adjusted and the back queue
* enabled if necessary.
*
* rmvq can be called with the stream frozen, but other utility functions
* holding QLOCK, and by streams modules without any locks/frozen.
*/
void
rmvq(queue_t *q, mblk_t *mp)
{
ASSERT(mp != NULL);
rmvq_noenab(q, mp);
if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
/*
* qbackenable can handle a frozen stream but not a "random"
* qlock being held. Drop lock across qbackenable.
*/
mutex_exit(QLOCK(q));
qbackenable(q, mp->b_band);
mutex_enter(QLOCK(q));
} else {
qbackenable(q, mp->b_band);
}
}
/*
* Like rmvq() but without any backenabling.
* This exists to handle SR_CONSOL_DATA in strrput().
*/
void
rmvq_noenab(queue_t *q, mblk_t *mp)
{
mblk_t *tmp;
int i;
qband_t *qbp = NULL;
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
freezer = curthread;
} else
mutex_enter(QLOCK(q));
ASSERT(mp->b_band <= q->q_nband);
if (mp->b_band != 0) { /* Adjust band pointers */
ASSERT(q->q_bandp != NULL);
qbp = q->q_bandp;
i = mp->b_band;
while (--i > 0)
qbp = qbp->qb_next;
if (mp == qbp->qb_first) {
if (mp->b_next && mp->b_band == mp->b_next->b_band)
qbp->qb_first = mp->b_next;
else
qbp->qb_first = NULL;
}
if (mp == qbp->qb_last) {
if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
qbp->qb_last = mp->b_prev;
else
qbp->qb_last = NULL;
}
}
/*
* Remove the message from the list.
*/
if (mp->b_prev)
mp->b_prev->b_next = mp->b_next;
else
q->q_first = mp->b_next;
if (mp->b_next)
mp->b_next->b_prev = mp->b_prev;
else
q->q_last = mp->b_prev;
mp->b_next = NULL;
mp->b_prev = NULL;
/* Get the size of the message for q_count accounting */
for (tmp = mp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
if (mp->b_band == 0) { /* Perform q_count accounting */
q->q_count -= bytecnt;
q->q_mblkcnt -= mblkcnt;
if ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat)) {
q->q_flag &= ~QFULL;
}
} else { /* Perform qb_count accounting */
qbp->qb_count -= bytecnt;
qbp->qb_mblkcnt -= mblkcnt;
if ((qbp->qb_count < qbp->qb_hiwat) &&
(qbp->qb_mblkcnt < qbp->qb_hiwat)) {
qbp->qb_flag &= ~QB_FULL;
}
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
}
/*
* Empty a queue.
* If flag is set, remove all messages. Otherwise, remove
* only non-control messages. If queue falls below its low
* water mark, and QWANTW is set, enable the nearest upstream
* service procedure.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushq did not have a check
* for q_lowat == 0 in the backenabling test.
*
* pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
* if one exists on the queue.
*/
void
flushq_common(queue_t *q, int flag, int pcproto_flag)
{
mblk_t *mp, *nmp;
qband_t *qbp;
int backenab = 0;
unsigned char bpri;
unsigned char qbf[NBAND]; /* band flushing backenable flags */
if (q->q_first == NULL)
return;
mutex_enter(QLOCK(q));
mp = q->q_first;
q->q_first = NULL;
q->q_last = NULL;
q->q_count = 0;
q->q_mblkcnt = 0;
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
qbp->qb_count = 0;
qbp->qb_mblkcnt = 0;
qbp->qb_flag &= ~QB_FULL;
}
q->q_flag &= ~QFULL;
mutex_exit(QLOCK(q));
while (mp) {
nmp = mp->b_next;
mp->b_next = mp->b_prev = NULL;
STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
(void) putq(q, mp);
else if (flag || datamsg(mp->b_datap->db_type))
freemsg(mp);
else
(void) putq(q, mp);
mp = nmp;
}
bpri = 1;
mutex_enter(QLOCK(q));
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
if ((qbp->qb_flag & QB_WANTW) &&
(((qbp->qb_count < qbp->qb_lowat) &&
(qbp->qb_mblkcnt < qbp->qb_lowat)) ||
qbp->qb_lowat == 0)) {
qbp->qb_flag &= ~QB_WANTW;
backenab = 1;
qbf[bpri] = 1;
} else
qbf[bpri] = 0;
bpri++;
}
ASSERT(bpri == (unsigned char)(q->q_nband + 1));
if ((q->q_flag & QWANTW) &&
(((q->q_count < q->q_lowat) &&
(q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
q->q_flag &= ~QWANTW;
backenab = 1;
qbf[0] = 1;
} else
qbf[0] = 0;
/*
* If any band can now be written to, and there is a writer
* for that band, then backenable the closest service procedure.
*/
if (backenab) {
mutex_exit(QLOCK(q));
for (bpri = q->q_nband; bpri != 0; bpri--)
if (qbf[bpri])
backenable(q, bpri);
if (qbf[0])
backenable(q, 0);
} else
mutex_exit(QLOCK(q));
}
/*
* The real flushing takes place in flushq_common. This is done so that
* a flag which specifies whether or not M_PCPROTO messages should be flushed
* or not. Currently the only place that uses this flag is the stream head.
*/
void
flushq(queue_t *q, int flag)
{
flushq_common(q, flag, 0);
}
/*
* Flush the queue of messages of the given priority band.
* There is some duplication of code between flushq and flushband.
* This is because we want to optimize the code as much as possible.
* The assumption is that there will be more messages in the normal
* (priority 0) band than in any other.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushband had an extra check for
* did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
* case. That check does not match the man page for flushband and was not
* in the strrput flush code hence it was removed.
*/
void
flushband(queue_t *q, unsigned char pri, int flag)
{
mblk_t *mp;
mblk_t *nmp;
mblk_t *last;
qband_t *qbp;
int band;
ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
if (pri > q->q_nband) {
return;
}
mutex_enter(QLOCK(q));
if (pri == 0) {
mp = q->q_first;
q->q_first = NULL;
q->q_last = NULL;
q->q_count = 0;
q->q_mblkcnt = 0;
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
qbp->qb_count = 0;
qbp->qb_mblkcnt = 0;
qbp->qb_flag &= ~QB_FULL;
}
q->q_flag &= ~QFULL;
mutex_exit(QLOCK(q));
while (mp) {
nmp = mp->b_next;
mp->b_next = mp->b_prev = NULL;
if ((mp->b_band == 0) &&
((flag == FLUSHALL) ||
datamsg(mp->b_datap->db_type)))
freemsg(mp);
else
(void) putq(q, mp);
mp = nmp;
}
mutex_enter(QLOCK(q));
if ((q->q_flag & QWANTW) &&
(((q->q_count < q->q_lowat) &&
(q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
q->q_flag &= ~QWANTW;
mutex_exit(QLOCK(q));
backenable(q, pri);
} else
mutex_exit(QLOCK(q));
} else { /* pri != 0 */
boolean_t flushed = B_FALSE;
band = pri;
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
while (--band > 0)
qbp = qbp->qb_next;
mp = qbp->qb_first;
if (mp == NULL) {
mutex_exit(QLOCK(q));
return;
}
last = qbp->qb_last->b_next;
/*
* rmvq_noenab() and freemsg() are called for each mblk that
* meets the criteria. The loop is executed until the last
* mblk has been processed.
*/
while (mp != last) {
ASSERT(mp->b_band == pri);
nmp = mp->b_next;
if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
rmvq_noenab(q, mp);
freemsg(mp);
flushed = B_TRUE;
}
mp = nmp;
}
mutex_exit(QLOCK(q));
/*
* If any mblk(s) has been freed, we know that qbackenable()
* will need to be called.
*/
if (flushed)
qbackenable(q, pri);
}
}
/*
* Return 1 if the queue is not full. If the queue is full, return
* 0 (may not put message) and set QWANTW flag (caller wants to write
* to the queue).
*/
int
canput(queue_t *q)
{
TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
/* this is for loopback transports, they should not do a canput */
ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
if (!(q->q_flag & QFULL)) {
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
return (1);
}
mutex_enter(QLOCK(q));
if (q->q_flag & QFULL) {
q->q_flag |= QWANTW;
mutex_exit(QLOCK(q));
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
return (0);
}
mutex_exit(QLOCK(q));
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
return (1);
}
/*
* This is the new canput for use with priority bands. Return 1 if the
* band is not full. If the band is full, return 0 (may not put message)
* and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
* write to the queue).
*/
int
bcanput(queue_t *q, unsigned char pri)
{
qband_t *qbp;
TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
if (!q)
return (0);
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
mutex_enter(QLOCK(q));
if (pri == 0) {
if (q->q_flag & QFULL) {
q->q_flag |= QWANTW;
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
} else { /* pri != 0 */
if (pri > q->q_nband) {
/*
* No band exists yet, so return success.
*/
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 1);
return (1);
}
qbp = q->q_bandp;
while (--pri)
qbp = qbp->qb_next;
if (qbp->qb_flag & QB_FULL) {
qbp->qb_flag |= QB_WANTW;
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
}
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 1);
return (1);
}
/*
* Put a message on a queue.
*
* Messages are enqueued on a priority basis. The priority classes
* are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
* and B_NORMAL (type < QPCTL && band == 0).
*
* Add appropriate weighted data block sizes to queue count.
* If queue hits high water mark then set QFULL flag.
*
* If QNOENAB is not set (putq is allowed to enable the queue),
* enable the queue only if the message is PRIORITY,
* or the QWANTR flag is set (indicating that the service procedure
* is ready to read the queue. This implies that a service
* procedure must NEVER put a high priority message back on its own
* queue, as this would result in an infinite loop (!).
*/
int
putq(queue_t *q, mblk_t *bp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(bp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
if (mcls == QPCTL) {
if (bp->b_band != 0)
bp->b_band = 0; /* force to be correct */
} else if (bp->b_band != 0) {
int i;
qband_t **qbpp;
if (bp->b_band > q->q_nband) {
/*
* The qband structure for this priority band is
* not on the queue yet, so we have to allocate
* one on the fly. It would be wasteful to
* associate the qband structures with every
* queue when the queues are allocated. This is
* because most queues will only need the normal
* band of flow which can be described entirely
* by the queue itself.
*/
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (bp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
i = bp->b_band;
while (--i)
qbp = qbp->qb_next;
}
/*
* If queue is empty, add the message and initialize the pointers.
* Otherwise, adjust message pointers and queue pointers based on
* the type of the message and where it belongs on the queue. Some
* code is duplicated to minimize the number of conditionals and
* hopefully minimize the amount of time this routine takes.
*/
if (!q->q_first) {
bp->b_next = NULL;
bp->b_prev = NULL;
q->q_first = bp;
q->q_last = bp;
if (qbp) {
qbp->qb_first = bp;
qbp->qb_last = bp;
}
} else if (!qbp) { /* bp->b_band == 0 */
/*
* If queue class of message is less than or equal to
* that of the last one on the queue, tack on to the end.
*/
tmp = q->q_last;
if (mcls <= (int)queclass(tmp)) {
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while ((int)queclass(tmp) >= mcls)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
} else { /* bp->b_band != 0 */
if (qbp->qb_first) {
tmp = qbp->qb_last;
/*
* Insert bp after the last message in this band.
*/
bp->b_next = tmp->b_next;
if (tmp->b_next)
tmp->b_next->b_prev = bp;
else
q->q_last = bp;
bp->b_prev = tmp;
tmp->b_next = bp;
} else {
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) ||
(bp->b_band <= tmp->b_band)) {
/*
* Tack bp on end of queue.
*/
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band >= bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
qbp->qb_first = bp;
}
qbp->qb_last = bp;
}
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
if (qbp) {
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
if ((mcls > QNORM) ||
(canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Put stuff back at beginning of Q according to priority order.
* See comment on putq above for details.
*/
int
putbq(queue_t *q, mblk_t *bp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(bp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
ASSERT(q && bp);
ASSERT(bp->b_next == NULL);
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
if (mcls == QPCTL) {
if (bp->b_band != 0)
bp->b_band = 0; /* force to be correct */
} else if (bp->b_band != 0) {
int i;
qband_t **qbpp;
if (bp->b_band > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (bp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = bp->b_band;
while (--i)
qbp = qbp->qb_next;
}
/*
* If queue is empty or if message is high priority,
* place on the front of the queue.
*/
tmp = q->q_first;
if ((!tmp) || (mcls == QPCTL)) {
bp->b_next = tmp;
if (tmp)
tmp->b_prev = bp;
else
q->q_last = bp;
q->q_first = bp;
bp->b_prev = NULL;
if (qbp) {
qbp->qb_first = bp;
qbp->qb_last = bp;
}
} else if (qbp) { /* bp->b_band != 0 */
tmp = qbp->qb_first;
if (tmp) {
/*
* Insert bp before the first message in this band.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
} else {
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) ||
(bp->b_band < tmp->b_band)) {
/*
* Tack bp on end of queue.
*/
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band > bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
qbp->qb_last = bp;
}
qbp->qb_first = bp;
} else { /* bp->b_band == 0 && !QPCTL */
/*
* If the queue class or band is less than that of the last
* message on the queue, tack bp on the end of the queue.
*/
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band > bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
}
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
if (qbp) {
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Insert a message before an existing message on the queue. If the
* existing message is NULL, the new messages is placed on the end of
* the queue. The queue class of the new message is ignored. However,
* the priority band of the new message must adhere to the following
* ordering:
*
* emp->b_prev->b_band >= mp->b_band >= emp->b_band.
*
* All flow control parameters are updated.
*
* insq can be called with the stream frozen, but other utility functions
* holding QLOCK, and by streams modules without any locks/frozen.
*/
int
insq(queue_t *q, mblk_t *emp, mblk_t *mp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(mp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
freezer = curthread;
} else
mutex_enter(QLOCK(q));
if (mcls == QPCTL) {
if (mp->b_band != 0)
mp->b_band = 0; /* force to be correct */
if (emp && emp->b_prev &&
(emp->b_prev->b_datap->db_type < QPCTL))
goto badord;
}
if (emp) {
if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
(emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
(emp->b_prev->b_band < mp->b_band))) {
goto badord;
}
} else {
tmp = q->q_last;
if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
badord:
cmn_err(CE_WARN,
"insq: attempt to insert message out of order "
"on q %p", (void *)q);
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
}
if (mp->b_band != 0) {
int i;
qband_t **qbpp;
if (mp->b_band > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (mp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = mp->b_band;
while (--i)
qbp = qbp->qb_next;
}
if ((mp->b_next = emp) != NULL) {
if ((mp->b_prev = emp->b_prev) != NULL)
emp->b_prev->b_next = mp;
else
q->q_first = mp;
emp->b_prev = mp;
} else {
if ((mp->b_prev = q->q_last) != NULL)
q->q_last->b_next = mp;
else
q->q_first = mp;
q->q_last = mp;
}
/* Get mblk and byte count for q_count accounting */
for (tmp = mp; tmp; tmp = tmp->b_cont) {
bytecnt += (tmp->b_wptr - tmp->b_rptr);
mblkcnt++;
}
if (qbp) { /* adjust qband pointers and count */
if (!qbp->qb_first) {
qbp->qb_first = mp;
qbp->qb_last = mp;
} else {
if (mp->b_prev == NULL || (mp->b_prev != NULL &&
(mp->b_prev->b_band != mp->b_band)))
qbp->qb_first = mp;
else if (mp->b_next == NULL || (mp->b_next != NULL &&
(mp->b_next->b_band != mp->b_band)))
qbp->qb_last = mp;
}
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
if (canenable(q) && (q->q_flag & QWANTR))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Create and put a control message on queue.
*/
int
putctl(queue_t *q, int type)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
(bp = allocb_tryhard(0)) == NULL)
return (0);
bp->b_datap->db_type = (unsigned char) type;
put(q, bp);
return (1);
}
/*
* Control message with a single-byte parameter
*/
int
putctl1(queue_t *q, int type, int param)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
(bp = allocb_tryhard(1)) == NULL)
return (0);
bp->b_datap->db_type = (unsigned char)type;
*bp->b_wptr++ = (unsigned char)param;
put(q, bp);
return (1);
}
int
putnextctl1(queue_t *q, int type, int param)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
((bp = allocb_tryhard(1)) == NULL))
return (0);
bp->b_datap->db_type = (unsigned char)type;
*bp->b_wptr++ = (unsigned char)param;
putnext(q, bp);
return (1);
}
int
putnextctl(queue_t *q, int type)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
((bp = allocb_tryhard(0)) == NULL))
return (0);
bp->b_datap->db_type = (unsigned char)type;
putnext(q, bp);
return (1);
}
/*
* Return the queue upstream from this one
*/
queue_t *
backq(queue_t *q)
{
q = _OTHERQ(q);
if (q->q_next) {
q = q->q_next;
return (_OTHERQ(q));
}
return (NULL);
}
/*
* Send a block back up the queue in reverse from this
* one (e.g. to respond to ioctls)
*/
void
qreply(queue_t *q, mblk_t *bp)
{
ASSERT(q && bp);
putnext(_OTHERQ(q), bp);
}
/*
* Streams Queue Scheduling
*
* Queues are enabled through qenable() when they have messages to
* process. They are serviced by queuerun(), which runs each enabled
* queue's service procedure. The call to queuerun() is processor
* dependent - the general principle is that it be run whenever a queue
* is enabled but before returning to user level. For system calls,
* the function runqueues() is called if their action causes a queue
* to be enabled. For device interrupts, queuerun() should be
* called before returning from the last level of interrupt. Beyond
* this, no timing assumptions should be made about queue scheduling.
*/
/*
* Enable a queue: put it on list of those whose service procedures are
* ready to run and set up the scheduling mechanism.
* The broadcast is done outside the mutex -> to avoid the woken thread
* from contending with the mutex. This is OK 'cos the queue has been
* enqueued on the runlist and flagged safely at this point.
*/
void
qenable(queue_t *q)
{
mutex_enter(QLOCK(q));
qenable_locked(q);
mutex_exit(QLOCK(q));
}
/*
* Return number of messages on queue
*/
int
qsize(queue_t *qp)
{
int count = 0;
mblk_t *mp;
mutex_enter(QLOCK(qp));
for (mp = qp->q_first; mp; mp = mp->b_next)
count++;
mutex_exit(QLOCK(qp));
return (count);
}
/*
* noenable - set queue so that putq() will not enable it.
* enableok - set queue so that putq() can enable it.
*/
void
noenable(queue_t *q)
{
mutex_enter(QLOCK(q));
q->q_flag |= QNOENB;
mutex_exit(QLOCK(q));
}
void
enableok(queue_t *q)
{
mutex_enter(QLOCK(q));
q->q_flag &= ~QNOENB;
mutex_exit(QLOCK(q));
}
/*
* Set queue fields.
*/
int
strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
{
qband_t *qbp = NULL;
queue_t *wrq;
int error = 0;
kthread_id_t freezer;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (what >= QBAD) {
error = EINVAL;
goto done;
}
if (pri != 0) {
int i;
qband_t **qbpp;
if (pri > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (pri > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
error = EAGAIN;
goto done;
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = pri;
while (--i)
qbp = qbp->qb_next;
}
switch (what) {
case QHIWAT:
if (qbp)
qbp->qb_hiwat = (size_t)val;
else
q->q_hiwat = (size_t)val;
break;
case QLOWAT:
if (qbp)
qbp->qb_lowat = (size_t)val;
else
q->q_lowat = (size_t)val;
break;
case QMAXPSZ:
if (qbp)
error = EINVAL;
else
q->q_maxpsz = (ssize_t)val;
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
wrq = STREAM(q)->sd_wrq;
if (q != wrq->q_next)
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
if (freezer != curthread) {
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(wrq));
}
ASSERT(MUTEX_HELD(QLOCK(wrq)));
if (strmsgsz != 0) {
if (val == INFPSZ)
val = strmsgsz;
else {
if (STREAM(q)->sd_vnode->v_type == VFIFO)
val = MIN(PIPE_BUF, val);
else
val = MIN(strmsgsz, val);
}
}
STREAM(q)->sd_qn_maxpsz = val;
if (freezer != curthread) {
mutex_exit(QLOCK(wrq));
mutex_enter(QLOCK(q));
}
break;
case QMINPSZ:
if (qbp)
error = EINVAL;
else
q->q_minpsz = (ssize_t)val;
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
wrq = STREAM(q)->sd_wrq;
if (q != wrq->q_next)
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
if (freezer != curthread) {
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(wrq));
}
STREAM(q)->sd_qn_minpsz = (ssize_t)val;
if (freezer != curthread) {
mutex_exit(QLOCK(wrq));
mutex_enter(QLOCK(q));
}
break;
case QSTRUIOT:
if (qbp)
error = EINVAL;
else
q->q_struiot = (ushort_t)val;
break;
case QCOUNT:
case QFIRST:
case QLAST:
case QFLAG:
error = EPERM;
break;
default:
error = EINVAL;
break;
}
done:
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (error);
}
/*
* Get queue fields.
*/
int
strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
{
qband_t *qbp = NULL;
int error = 0;
kthread_id_t freezer;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (what >= QBAD) {
error = EINVAL;
goto done;
}
if (pri != 0) {
int i;
qband_t **qbpp;
if (pri > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (pri > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
error = EAGAIN;
goto done;
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = pri;
while (--i)
qbp = qbp->qb_next;
}
switch (what) {
case QHIWAT:
if (qbp)
*(size_t *)valp = qbp->qb_hiwat;
else
*(size_t *)valp = q->q_hiwat;
break;
case QLOWAT:
if (qbp)
*(size_t *)valp = qbp->qb_lowat;
else
*(size_t *)valp = q->q_lowat;
break;
case QMAXPSZ:
if (qbp)
error = EINVAL;
else
*(ssize_t *)valp = q->q_maxpsz;
break;
case QMINPSZ:
if (qbp)
error = EINVAL;
else
*(ssize_t *)valp = q->q_minpsz;
break;
case QCOUNT:
if (qbp)
*(size_t *)valp = qbp->qb_count;
else
*(size_t *)valp = q->q_count;
break;
case QFIRST:
if (qbp)
*(mblk_t **)valp = qbp->qb_first;
else
*(mblk_t **)valp = q->q_first;
break;
case QLAST:
if (qbp)
*(mblk_t **)valp = qbp->qb_last;
else
*(mblk_t **)valp = q->q_last;
break;
case QFLAG:
if (qbp)
*(uint_t *)valp = qbp->qb_flag;
else
*(uint_t *)valp = q->q_flag;
break;
case QSTRUIOT:
if (qbp)
error = EINVAL;
else
*(short *)valp = q->q_struiot;
break;
default:
error = EINVAL;
break;
}
done:
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (error);
}
/*
* Function awakes all in cvwait/sigwait/pollwait, on one of:
* QWANTWSYNC or QWANTR or QWANTW,
*
* Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
* deferred wakeup will be done. Also if strpoll() in progress then a
* deferred pollwakeup will be done.
*/
void
strwakeq(queue_t *q, int flag)
{
stdata_t *stp = STREAM(q);
pollhead_t *pl;
mutex_enter(&stp->sd_lock);
pl = &stp->sd_pollist;
if (flag & QWANTWSYNC) {
ASSERT(!(q->q_flag & QREADR));
if (stp->sd_flag & WSLEEP) {
stp->sd_flag &= ~WSLEEP;
cv_broadcast(&stp->sd_wrq->q_wait);
} else {
stp->sd_wakeq |= WSLEEP;
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLWRNORM);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_WRNORM)
strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
} else if (flag & QWANTR) {
if (stp->sd_flag & RSLEEP) {
stp->sd_flag &= ~RSLEEP;
cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
} else {
stp->sd_wakeq |= RSLEEP;
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLIN | POLLRDNORM);
mutex_enter(&stp->sd_lock);
{
int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
if (events)
strsendsig(stp->sd_siglist, events, 0, 0);
}
} else {
if (stp->sd_flag & WSLEEP) {
stp->sd_flag &= ~WSLEEP;
cv_broadcast(&stp->sd_wrq->q_wait);
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLWRNORM);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_WRNORM)
strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
}
mutex_exit(&stp->sd_lock);
}
int
struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
{
stdata_t *stp = STREAM(q);
int typ = STRUIOT_STANDARD;
uio_t *uiop = &dp->d_uio;
dblk_t *dbp;
ssize_t uiocnt;
ssize_t cnt;
unsigned char *ptr;
ssize_t resid;
int error = 0;
on_trap_data_t otd;
queue_t *stwrq;
/*
* Plumbing may change while taking the type so store the
* queue in a temporary variable. It doesn't matter even
* if the we take the type from the previous plumbing,
* that's because if the plumbing has changed when we were
* holding the queue in a temporary variable, we can continue
* processing the message the way it would have been processed
* in the old plumbing, without any side effects but a bit
* extra processing for partial ip header checksum.
*
* This has been done to avoid holding the sd_lock which is
* very hot.
*/
stwrq = stp->sd_struiowrq;
if (stwrq)
typ = stwrq->q_struiot;
for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
dbp = mp->b_datap;
ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
cnt = MIN(uiocnt, uiop->uio_resid);
if (!(dbp->db_struioflag & STRUIO_SPEC) ||
(dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
/*
* Either this mblk has already been processed
* or there is no more room in this mblk (?).
*/
continue;
}
switch (typ) {
case STRUIOT_STANDARD:
if (noblock) {
if (on_trap(&otd, OT_DATA_ACCESS)) {
no_trap();
error = EWOULDBLOCK;
goto out;
}
}
if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
if (noblock)
no_trap();
goto out;
}
if (noblock)
no_trap();
break;
default:
error = EIO;
goto out;
}
dbp->db_struioflag |= STRUIO_DONE;
dbp->db_cksumstuff += cnt;
}
out:
if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
/*
* A fault has occured and some bytes were moved to the
* current mblk, the uio_t has already been updated by
* the appropriate uio routine, so also update the mblk
* to reflect this in case this same mblk chain is used
* again (after the fault has been handled).
*/
uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
if (uiocnt >= resid)
dbp->db_cksumstuff += resid;
}
return (error);
}
/*
* Try to enter queue synchronously. Any attempt to enter a closing queue will
* fails. The qp->q_rwcnt keeps track of the number of successful entries so
* that removeq() will not try to close the queue while a thread is inside the
* queue.
*/
static boolean_t
rwnext_enter(queue_t *qp)
{
mutex_enter(QLOCK(qp));
if (qp->q_flag & QWCLOSE) {
mutex_exit(QLOCK(qp));
return (B_FALSE);
}
qp->q_rwcnt++;
ASSERT(qp->q_rwcnt != 0);
mutex_exit(QLOCK(qp));
return (B_TRUE);
}
/*
* Decrease the count of threads running in sync stream queue and wake up any
* threads blocked in removeq().
*/
static void
rwnext_exit(queue_t *qp)
{
mutex_enter(QLOCK(qp));
qp->q_rwcnt--;
if (qp->q_flag & QWANTRMQSYNC) {
qp->q_flag &= ~QWANTRMQSYNC;
cv_broadcast(&qp->q_wait);
}
mutex_exit(QLOCK(qp));
}
/*
* The purpose of rwnext() is to call the rw procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue sq_count is incremented and it does
* not matter if any regular put entrypoints have been already entered. We
* can't increment one of the sq_putcounts (instead of sq_count) because
* qwait_rw won't know which counter to decrement.
*
* It would be reasonable to add the lockless FASTPUT logic.
*/
int
rwnext(queue_t *qp, struiod_t *dp)
{
queue_t *nqp;
syncq_t *sq;
uint16_t count;
uint16_t flags;
struct qinit *qi;
int (*proc)();
struct stdata *stp;
int isread;
int rval;
stp = STREAM(qp);
/*
* Prevent q_next from changing by holding sd_lock until acquiring
* SQLOCK. Note that a read-side rwnext from the streamhead will
* already have sd_lock acquired. In either case sd_lock is always
* released after acquiring SQLOCK.
*
* The streamhead read-side holding sd_lock when calling rwnext is
* required to prevent a race condition were M_DATA mblks flowing
* up the read-side of the stream could be bypassed by a rwnext()
* down-call. In this case sd_lock acts as the streamhead perimeter.
*/
if ((nqp = _WR(qp)) == qp) {
isread = 0;
mutex_enter(&stp->sd_lock);
qp = nqp->q_next;
} else {
isread = 1;
if (nqp != stp->sd_wrq)
/* Not streamhead */
mutex_enter(&stp->sd_lock);
qp = _RD(nqp->q_next);
}
qi = qp->q_qinfo;
if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
/*
* Not a synchronous module or no r/w procedure for this
* queue, so just return EINVAL and let the caller handle it.
*/
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
if (rwnext_enter(qp) == B_FALSE) {
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
sq = qp->q_syncq;
mutex_enter(SQLOCK(sq));
mutex_exit(&stp->sd_lock);
count = sq->sq_count;
flags = sq->sq_flags;
ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
/*
* if this queue is being closed, return.
*/
if (qp->q_flag & QWCLOSE) {
mutex_exit(SQLOCK(sq));
rwnext_exit(qp);
return (EINVAL);
}
/*
* Wait until we can enter the inner perimeter.
*/
sq->sq_flags = flags | SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
flags = sq->sq_flags;
}
if (isread == 0 && stp->sd_struiowrq == NULL ||
isread == 1 && stp->sd_struiordq == NULL) {
/*
* Stream plumbing changed while waiting for inner perimeter
* so just return EINVAL and let the caller handle it.
*/
mutex_exit(SQLOCK(sq));
rwnext_exit(qp);
return (EINVAL);
}
if (!(flags & SQ_CIPUT))
sq->sq_flags = flags | SQ_EXCL;
sq->sq_count = count + 1;
ASSERT(sq->sq_count != 0); /* Wraparound */
/*
* Note: The only message ordering guarantee that rwnext() makes is
* for the write queue flow-control case. All others (r/w queue
* with q_count > 0 (or q_first != 0)) are the resposibilty of
* the queue's rw procedure. This could be genralized here buy
* running the queue's service procedure, but that wouldn't be
* the most efficent for all cases.
*/
mutex_exit(SQLOCK(sq));
if (! isread && (qp->q_flag & QFULL)) {
/*
* Write queue may be flow controlled. If so,
* mark the queue for wakeup when it's not.
*/
mutex_enter(QLOCK(qp));
if (qp->q_flag & QFULL) {
qp->q_flag |= QWANTWSYNC;
mutex_exit(QLOCK(qp));
rval = EWOULDBLOCK;
goto out;
}
mutex_exit(QLOCK(qp));
}
if (! isread && dp->d_mp)
STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
dp->d_mp->b_datap->db_base);
rval = (*proc)(qp, dp);
if (isread && dp->d_mp)
STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
out:
/*
* The queue is protected from being freed by sq_count, so it is
* safe to call rwnext_exit and reacquire SQLOCK(sq).
*/
rwnext_exit(qp);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
ASSERT(sq->sq_count != 0);
sq->sq_count--;
if (flags & SQ_TAIL) {
putnext_tail(sq, qp, flags);
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
ASSERT(flags & SQ_TAIL);
return (rval);
}
ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
sq->sq_flags = flags & ~SQ_EXCL;
if (sq->sq_flags & SQ_WANTWAKEUP) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
return (rval);
}
/*
* The purpose of infonext() is to call the info procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue regular sq_count is incremented and
* it does not matter if any regular put entrypoints have been already
* entered.
*/
int
infonext(queue_t *qp, infod_t *idp)
{
queue_t *nqp;
syncq_t *sq;
uint16_t count;
uint16_t flags;
struct qinit *qi;
int (*proc)();
struct stdata *stp;
int rval;
stp = STREAM(qp);
/*
* Prevent q_next from changing by holding sd_lock until
* acquiring SQLOCK.
*/
mutex_enter(&stp->sd_lock);
if ((nqp = _WR(qp)) == qp) {
qp = nqp->q_next;
} else {
qp = _RD(nqp->q_next);
}
qi = qp->q_qinfo;
if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
sq = qp->q_syncq;
mutex_enter(SQLOCK(sq));
mutex_exit(&stp->sd_lock);
count = sq->sq_count;
flags = sq->sq_flags;
ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
/*
* Wait until we can enter the inner perimeter.
*/
sq->sq_flags = flags | SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
flags = sq->sq_flags;
}
if (! (flags & SQ_CIPUT))
sq->sq_flags = flags | SQ_EXCL;
sq->sq_count = count + 1;
ASSERT(sq->sq_count != 0); /* Wraparound */
mutex_exit(SQLOCK(sq));
rval = (*proc)(qp, idp);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
ASSERT(sq->sq_count != 0);
sq->sq_count--;
if (flags & SQ_TAIL) {
putnext_tail(sq, qp, flags);
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
ASSERT(flags & SQ_TAIL);
return (rval);
}
ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
/*
* XXXX
* I am not certain the next comment is correct here. I need to consider
* why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
* might cause other problems. It just might be safer to drop it if
* !SQ_CIPUT because that is when we set it.
*/
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
sq->sq_flags = flags & ~SQ_EXCL;
mutex_exit(SQLOCK(sq));
return (rval);
}
/*
* Return nonzero if the queue is responsible for struio(), else return 0.
*/
int
isuioq(queue_t *q)
{
if (q->q_flag & QREADR)
return (STREAM(q)->sd_struiordq == q);
else
return (STREAM(q)->sd_struiowrq == q);
}
#if defined(__sparc)
int disable_putlocks = 0;
#else
int disable_putlocks = 1;
#endif
/*
* called by create_putlock.
*/
static void
create_syncq_putlocks(queue_t *q)
{
syncq_t *sq = q->q_syncq;
ciputctrl_t *cip;
int i;
ASSERT(sq != NULL);
ASSERT(disable_putlocks == 0);
ASSERT(n_ciputctrl >= min_n_ciputctrl);
ASSERT(ciputctrl_cache != NULL);
if (!(sq->sq_type & SQ_CIPUT))
return;
for (i = 0; i <= 1; i++) {
if (sq->sq_ciputctrl == NULL) {
cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
mutex_enter(SQLOCK(sq));
if (sq->sq_ciputctrl != NULL) {
mutex_exit(SQLOCK(sq));
kmem_cache_free(ciputctrl_cache, cip);
} else {
ASSERT(sq->sq_nciputctrl == 0);
sq->sq_nciputctrl = n_ciputctrl - 1;
/*
* putnext checks sq_ciputctrl without holding
* SQLOCK. if it is not NULL putnext assumes
* sq_nciputctrl is initialized. membar below
* insures that.
*/
membar_producer();
sq->sq_ciputctrl = cip;
mutex_exit(SQLOCK(sq));
}
}
ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
if (i == 1)
break;
q = _OTHERQ(q);
if (!(q->q_flag & QPERQ)) {
ASSERT(sq == q->q_syncq);
break;
}
ASSERT(q->q_syncq != NULL);
ASSERT(sq != q->q_syncq);
sq = q->q_syncq;
ASSERT(sq->sq_type & SQ_CIPUT);
}
}
/*
* If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
* syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
* the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
* starting from q and down to the driver.
*
* This should be called after the affected queues are part of stream
* geometry. It should be called from driver/module open routine after
* qprocson() call. It is also called from nfs syscall where it is known that
* stream is configured and won't change its geometry during create_putlock
* call.
*
* caller normally uses 0 value for the stream argument to speed up MT putnext
* into the perimeter of q for example because its perimeter is per module
* (e.g. IP).
*
* caller normally uses non 0 value for the stream argument to hint the system
* that the stream of q is a very contended global system stream
* (e.g. NFS/UDP) and the part of the stream from q to the driver is
* particularly MT hot.
*
* Caller insures stream plumbing won't happen while we are here and therefore
* q_next can be safely used.
*/
void
create_putlocks(queue_t *q, int stream)
{
ciputctrl_t *cip;
struct stdata *stp = STREAM(q);
q = _WR(q);
ASSERT(stp != NULL);
if (disable_putlocks != 0)
return;
if (n_ciputctrl < min_n_ciputctrl)
return;
ASSERT(ciputctrl_cache != NULL);
if (stream != 0 && stp->sd_ciputctrl == NULL) {
cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
mutex_enter(&stp->sd_lock);
if (stp->sd_ciputctrl != NULL) {
mutex_exit(&stp->sd_lock);
kmem_cache_free(ciputctrl_cache, cip);
} else {
ASSERT(stp->sd_nciputctrl == 0);
stp->sd_nciputctrl = n_ciputctrl - 1;
/*
* putnext checks sd_ciputctrl without holding
* sd_lock. if it is not NULL putnext assumes
* sd_nciputctrl is initialized. membar below
* insures that.
*/
membar_producer();
stp->sd_ciputctrl = cip;
mutex_exit(&stp->sd_lock);
}
}
ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
while (_SAMESTR(q)) {
create_syncq_putlocks(q);
if (stream == 0)
return;
q = q->q_next;
}
ASSERT(q != NULL);
create_syncq_putlocks(q);
}
/*
* STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
* through a stream.
*
* Data currently record per event is a hrtime stamp, queue address, event
* type, and a per type datum. Much of the STREAMS framework is instrumented
* for automatic flow tracing (when enabled). Events can be defined and used
* by STREAMS modules and drivers.
*
* Global objects:
*
* str_ftevent() - Add a flow-trace event to a dblk.
* str_ftfree() - Free flow-trace data
*
* Local objects:
*
* fthdr_cache - pointer to the kmem cache for trace header.
* ftblk_cache - pointer to the kmem cache for trace data blocks.
*/
int str_ftnever = 1; /* Don't do STREAMS flow tracing */
void
str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
{
ftblk_t *bp = hp->tail;
ftblk_t *nbp;
ftevnt_t *ep;
int ix, nix;
ASSERT(hp != NULL);
for (;;) {
if ((ix = bp->ix) == FTBLK_EVNTS) {
/*
* Tail doesn't have room, so need a new tail.
*
* To make this MT safe, first, allocate a new
* ftblk, and initialize it. To make life a
* little easier, reserve the first slot (mostly
* by making ix = 1). When we are finished with
* the initialization, CAS this pointer to the
* tail. If this succeeds, this is the new
* "next" block. Otherwise, another thread
* got here first, so free the block and start
* again.
*/
if (!(nbp = kmem_cache_alloc(ftblk_cache,
KM_NOSLEEP))) {
/* no mem, so punt */
str_ftnever++;
/* free up all flow data? */
return;
}
nbp->nxt = NULL;
nbp->ix = 1;
/*
* Just in case there is another thread about
* to get the next index, we need to make sure
* the value is there for it.
*/
membar_producer();
if (casptr(&hp->tail, bp, nbp) == bp) {
/* CAS was successful */
bp->nxt = nbp;
membar_producer();
bp = nbp;
ix = 0;
goto cas_good;
} else {
kmem_cache_free(ftblk_cache, nbp);
bp = hp->tail;
continue;
}
}
nix = ix + 1;
if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
cas_good:
if (curthread != hp->thread) {
hp->thread = curthread;
evnt |= FTEV_CS;
}
if (CPU->cpu_seqid != hp->cpu_seqid) {
hp->cpu_seqid = CPU->cpu_seqid;
evnt |= FTEV_PS;
}
ep = &bp->ev[ix];
break;
}
}
if (evnt & FTEV_QMASK) {
queue_t *qp = p;
/*
* It is possible that the module info is broke
* (as is logsubr.c at this comment writing).
* Instead of panicing or doing other unmentionables,
* we shall put a dummy name as the mid, and continue.
*/
if (qp->q_qinfo == NULL)
ep->mid = "NONAME";
else
ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
if (!(qp->q_flag & QREADR))
evnt |= FTEV_ISWR;
} else {
ep->mid = (char *)p;
}
ep->ts = gethrtime();
ep->evnt = evnt;
ep->data = data;
hp->hash = (hp->hash << 9) + hp->hash;
hp->hash += (evnt << 16) | data;
hp->hash += (uintptr_t)ep->mid;
}
/*
* Free flow-trace data.
*/
void
str_ftfree(dblk_t *dbp)
{
fthdr_t *hp = dbp->db_fthdr;
ftblk_t *bp = &hp->first;
ftblk_t *nbp;
if (bp != hp->tail || bp->ix != 0) {
/*
* Clear out the hash, have the tail point to itself, and free
* any continuation blocks.
*/
bp = hp->first.nxt;
hp->tail = &hp->first;
hp->hash = 0;
hp->first.nxt = NULL;
hp->first.ix = 0;
while (bp != NULL) {
nbp = bp->nxt;
kmem_cache_free(ftblk_cache, bp);
bp = nbp;
}
}
kmem_cache_free(fthdr_cache, hp);
dbp->db_fthdr = NULL;
}