stream.c revision 17169044f903cb92234f23d0ba0ce43449614a4d
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/*
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * CDDL HEADER START
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The contents of this file are subject to the terms of the
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * Common Development and Distribution License (the "License").
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * You may not use this file except in compliance with the License.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * or http://www.opensolaris.org/os/licensing.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * See the License for the specific language governing permissions
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * and limitations under the License.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * When distributing Covered Code, include this CDDL HEADER in each
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * If applicable, add the following below this CDDL HEADER, with the
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * fields enclosed by brackets "[]" replaced with your own identifying
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * information: Portions Copyright [yyyy] [name of copyright owner]
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * CDDL HEADER END
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/* All Rights Reserved */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/*
9a686fbc186e8e2a64e9a5094d44c7d6fa0ea167Paul Dagnelie * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * Use is subject to license terms.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#pragma ident "%Z%%M% %I% %E% SMI"
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/types.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/param.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/thread.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/sysmacros.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/stropts.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/stream.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/strsubr.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/strsun.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/conf.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/debug.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/cmn_err.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/kmem.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/atomic.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/errno.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/vtrace.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/ftrace.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/ontrap.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/multidata.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/multidata_impl.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/sdt.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/strft.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#ifdef DEBUG
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#include <sys/kmem_impl.h>
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#endif
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/*
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * This file contains all the STREAMS utility routines that may
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * be used by modules and drivers.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson/*
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * STREAMS message allocator: principles of operation
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The streams message allocator consists of all the routines that
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * dupb(), freeb() and freemsg(). What follows is a high-level view
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * of how the allocator works.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * Every streams message consists of one or more mblks, a dblk, and data.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * All mblks for all types of messages come from a common mblk_cache.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The dblk and data come in several flavors, depending on how the
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * message is allocated:
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * fixed-size dblk/data caches. For message sizes that are multiples of
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * PAGESIZE, dblks are allocated separately from the buffer.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The associated buffer is allocated by the constructor using kmem_alloc().
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * For all other message sizes, dblk and its associated data is allocated
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * as a single contiguous chunk of memory.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * Objects in these caches consist of a dblk plus its associated data.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * allocb() determines the nearest-size cache by table lookup:
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * the dblk_cache[] array provides the mapping from size to dblk cache.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * kmem_alloc()'ing a buffer for the data and supplying that
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * buffer to gesballoc(), described below.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * (3) The four flavors of [d]esballoc[a] are all implemented by a
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * common routine, gesballoc() ("generic esballoc"). gesballoc()
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * allocates a dblk from the global dblk_esb_cache and sets db_base,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * db_lim and db_frtnp to describe the caller-supplied buffer.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * While there are several routines to allocate messages, there is only
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * one routine to free messages: freeb(). freeb() simply invokes the
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * dblk's free method, dbp->db_free(), which is set at allocation time.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * dupb() creates a new reference to a message by allocating a new mblk,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * incrementing the dblk reference count and setting the dblk's free
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * method to dblk_decref(). The dblk's original free method is retained
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * in db_lastfree. dblk_decref() decrements the reference count on each
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * freeb(). If this is not the last reference it just frees the mblk;
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * if this *is* the last reference, it restores db_free to db_lastfree,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
9a686fbc186e8e2a64e9a5094d44c7d6fa0ea167Paul Dagnelie *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The implementation makes aggressive use of kmem object caching for
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * maximum performance. This makes the code simple and compact, but
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * also a bit abstruse in some places. The invariants that constitute a
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * message's constructed state, described below, are more subtle than usual.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * Every dblk has an "attached mblk" as part of its constructed state.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The mblk is allocated by the dblk's constructor and remains attached
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * until the message is either dup'ed or pulled up. In the dupb() case
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * the mblk association doesn't matter until the last free, at which time
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * the mblk association because it swaps the leading mblks of two messages,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * so it is responsible for swapping their db_mblk pointers accordingly.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * From a constructed-state viewpoint it doesn't matter that a dblk's
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * attached mblk can change while the message is allocated; all that
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * matters is that the dblk has *some* attached mblk when it's freed.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson *
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * The sizes of the allocb() small-message caches are not magical.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * They represent a good trade-off between internal and external
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * fragmentation for current workloads. They should be reevaluated
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * periodically, especially if allocations larger than DBLK_MAX_CACHE
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * become common. We use 64-byte alignment so that dblks don't
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson * straddle cache lines unnecessarily.
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson */
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_MAX_CACHE 73728
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_CACHE_ALIGN 64
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_MIN_SIZE 8
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_SIZE_SHIFT 3
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#ifdef _BIG_ENDIAN
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_RTFU_SHIFT(field) \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#else
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_RTFU_SHIFT(field) \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#endif
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_RTFU(ref, type, flags, uioflag) \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson ((type) << DBLK_RTFU_SHIFT(db_type)) | \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilsonstatic size_t dblk_sizes[] = {
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#ifdef _LP64
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson 8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson 40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson#else
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
0713e232b7712cd27d99e1e935ebb8d5de61c57dGeorge Wilson 8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
#endif
DBLK_MAX_CACHE, 0
};
static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
static struct kmem_cache *mblk_cache;
static struct kmem_cache *dblk_esb_cache;
static struct kmem_cache *fthdr_cache;
static struct kmem_cache *ftblk_cache;
static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
static mblk_t *allocb_oversize(size_t size, int flags);
static int allocb_tryhard_fails;
static void frnop_func(void *arg);
frtn_t frnop = { frnop_func };
static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
static boolean_t rwnext_enter(queue_t *qp);
static void rwnext_exit(queue_t *qp);
/*
* Patchable mblk/dblk kmem_cache flags.
*/
int dblk_kmem_flags = 0;
int mblk_kmem_flags = 0;
static int
dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
ssize_t msg_size = (ssize_t)cdrarg;
size_t index;
ASSERT(msg_size != 0);
index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
if ((msg_size & PAGEOFFSET) == 0) {
dbp->db_base = kmem_alloc(msg_size, kmflags);
if (dbp->db_base == NULL) {
kmem_cache_free(mblk_cache, dbp->db_mblk);
return (-1);
}
} else {
dbp->db_base = (unsigned char *)&dbp[1];
}
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = dblk_cache[index];
dbp->db_lim = dbp->db_base + msg_size;
dbp->db_free = dbp->db_lastfree = dblk_lastfree;
dbp->db_frtnp = NULL;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
/*ARGSUSED*/
static int
dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = dblk_esb_cache;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
static int
bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
dblk_t *dbp = buf;
bcache_t *bcp = (bcache_t *)cdrarg;
if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
return (-1);
if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
kmflags)) == NULL) {
kmem_cache_free(mblk_cache, dbp->db_mblk);
return (-1);
}
dbp->db_mblk->b_datap = dbp;
dbp->db_cache = (void *)bcp;
dbp->db_lim = dbp->db_base + bcp->size;
dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
dbp->db_frtnp = NULL;
dbp->db_fthdr = NULL;
dbp->db_credp = NULL;
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
return (0);
}
/*ARGSUSED*/
static void
dblk_destructor(void *buf, void *cdrarg)
{
dblk_t *dbp = buf;
ssize_t msg_size = (ssize_t)cdrarg;
ASSERT(dbp->db_mblk->b_datap == dbp);
ASSERT(msg_size != 0);
ASSERT(dbp->db_struioflag == 0);
ASSERT(dbp->db_struioun.cksum.flags == 0);
if ((msg_size & PAGEOFFSET) == 0) {
kmem_free(dbp->db_base, msg_size);
}
kmem_cache_free(mblk_cache, dbp->db_mblk);
}
static void
bcache_dblk_destructor(void *buf, void *cdrarg)
{
dblk_t *dbp = buf;
bcache_t *bcp = (bcache_t *)cdrarg;
kmem_cache_free(bcp->buffer_cache, dbp->db_base);
ASSERT(dbp->db_mblk->b_datap == dbp);
ASSERT(dbp->db_struioflag == 0);
ASSERT(dbp->db_struioun.cksum.flags == 0);
kmem_cache_free(mblk_cache, dbp->db_mblk);
}
void
streams_msg_init(void)
{
char name[40];
size_t size;
size_t lastsize = DBLK_MIN_SIZE;
size_t *sizep;
struct kmem_cache *cp;
size_t tot_size;
int offset;
mblk_cache = kmem_cache_create("streams_mblk",
sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
mblk_kmem_flags);
for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
if ((offset = (size & PAGEOFFSET)) != 0) {
/*
* We are in the middle of a page, dblk should
* be allocated on the same page
*/
tot_size = size + sizeof (dblk_t);
ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
< PAGESIZE);
ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
} else {
/*
* buf size is multiple of page size, dblk and
* buffer are allocated separately.
*/
ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
tot_size = sizeof (dblk_t);
}
(void) sprintf(name, "streams_dblk_%ld", size);
cp = kmem_cache_create(name, tot_size,
DBLK_CACHE_ALIGN, dblk_constructor,
dblk_destructor, NULL,
(void *)(size), NULL, dblk_kmem_flags);
while (lastsize <= size) {
dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
lastsize += DBLK_MIN_SIZE;
}
}
dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
sizeof (dblk_t), DBLK_CACHE_ALIGN,
dblk_esb_constructor, dblk_destructor, NULL,
(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
fthdr_cache = kmem_cache_create("streams_fthdr",
sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
ftblk_cache = kmem_cache_create("streams_ftblk",
sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
/* Initialize Multidata caches */
mmd_init();
/* initialize throttling queue for esballoc */
esballoc_queue_init();
}
/*ARGSUSED*/
mblk_t *
allocb(size_t size, uint_t pri)
{
dblk_t *dbp;
mblk_t *mp;
size_t index;
index = (size - 1) >> DBLK_SIZE_SHIFT;
if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
if (size != 0) {
mp = allocb_oversize(size, KM_NOSLEEP);
goto out;
}
index = 0;
}
if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
mp = NULL;
goto out;
}
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
out:
FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
return (mp);
}
mblk_t *
allocb_tmpl(size_t size, const mblk_t *tmpl)
{
mblk_t *mp = allocb(size, 0);
if (mp != NULL) {
cred_t *cr = DB_CRED(tmpl);
if (cr != NULL)
crhold(mp->b_datap->db_credp = cr);
DB_CPID(mp) = DB_CPID(tmpl);
DB_TYPE(mp) = DB_TYPE(tmpl);
}
return (mp);
}
mblk_t *
allocb_cred(size_t size, cred_t *cr)
{
mblk_t *mp = allocb(size, 0);
if (mp != NULL && cr != NULL)
crhold(mp->b_datap->db_credp = cr);
return (mp);
}
mblk_t *
allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
{
mblk_t *mp = allocb_wait(size, 0, flags, error);
if (mp != NULL && cr != NULL)
crhold(mp->b_datap->db_credp = cr);
return (mp);
}
void
freeb(mblk_t *mp)
{
dblk_t *dbp = mp->b_datap;
ASSERT(dbp->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
dbp->db_free(mp, dbp);
}
void
freemsg(mblk_t *mp)
{
FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
while (mp) {
dblk_t *dbp = mp->b_datap;
mblk_t *mp_cont = mp->b_cont;
ASSERT(dbp->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
dbp->db_free(mp, dbp);
mp = mp_cont;
}
}
/*
* Reallocate a block for another use. Try hard to use the old block.
* If the old data is wanted (copy), leave b_wptr at the end of the data,
* otherwise return b_wptr = b_rptr.
*
* This routine is private and unstable.
*/
mblk_t *
reallocb(mblk_t *mp, size_t size, uint_t copy)
{
mblk_t *mp1;
unsigned char *old_rptr;
ptrdiff_t cur_size;
if (mp == NULL)
return (allocb(size, BPRI_HI));
cur_size = mp->b_wptr - mp->b_rptr;
old_rptr = mp->b_rptr;
ASSERT(mp->b_datap->db_ref != 0);
if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
/*
* If the data is wanted and it will fit where it is, no
* work is required.
*/
if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
return (mp);
mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
mp1 = mp;
} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
/* XXX other mp state could be copied too, db_flags ... ? */
mp1->b_cont = mp->b_cont;
} else {
return (NULL);
}
if (copy) {
bcopy(old_rptr, mp1->b_rptr, cur_size);
mp1->b_wptr = mp1->b_rptr + cur_size;
}
if (mp != mp1)
freeb(mp);
return (mp1);
}
static void
dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
/* Reset the struioflag and the checksum flag fields */
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
/* and the COOKED and/or UIOA flag(s) */
dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
kmem_cache_free(dbp->db_cache, dbp);
}
static void
dblk_decref(mblk_t *mp, dblk_t *dbp)
{
if (dbp->db_ref != 1) {
uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
-(1 << DBLK_RTFU_SHIFT(db_ref)));
/*
* atomic_add_32_nv() just decremented db_ref, so we no longer
* have a reference to the dblk, which means another thread
* could free it. Therefore we cannot examine the dblk to
* determine whether ours was the last reference. Instead,
* we extract the new and minimum reference counts from rtfu.
* Note that all we're really saying is "if (ref != refmin)".
*/
if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
kmem_cache_free(mblk_cache, mp);
return;
}
}
dbp->db_mblk = mp;
dbp->db_free = dbp->db_lastfree;
dbp->db_lastfree(mp, dbp);
}
mblk_t *
dupb(mblk_t *mp)
{
dblk_t *dbp = mp->b_datap;
mblk_t *new_mp;
uint32_t oldrtfu, newrtfu;
if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
goto out;
new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
new_mp->b_rptr = mp->b_rptr;
new_mp->b_wptr = mp->b_wptr;
new_mp->b_datap = dbp;
new_mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
dbp->db_free = dblk_decref;
do {
ASSERT(dbp->db_ref > 0);
oldrtfu = DBLK_RTFU_WORD(dbp);
newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
/*
* If db_ref is maxed out we can't dup this message anymore.
*/
if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
kmem_cache_free(mblk_cache, new_mp);
new_mp = NULL;
goto out;
}
} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
out:
FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
return (new_mp);
}
static void
dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
{
frtn_t *frp = dbp->db_frtnp;
ASSERT(dbp->db_mblk == mp);
frp->free_func(frp->free_arg);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_cache_free(dbp->db_cache, dbp);
}
/*ARGSUSED*/
static void
frnop_func(void *arg)
{
}
/*
* Generic esballoc used to implement the four flavors: [d]esballoc[a].
*/
static mblk_t *
gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
{
dblk_t *dbp;
mblk_t *mp;
ASSERT(base != NULL && frp != NULL);
if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
mp = NULL;
goto out;
}
mp = dbp->db_mblk;
dbp->db_base = base;
dbp->db_lim = base + size;
dbp->db_free = dbp->db_lastfree = lastfree;
dbp->db_frtnp = frp;
DBLK_RTFU_WORD(dbp) = db_rtfu;
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
out:
FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
return (mp);
}
/*ARGSUSED*/
mblk_t *
esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP));
}
/*
* Same as esballoc() but sleeps waiting for memory.
*/
/*ARGSUSED*/
mblk_t *
esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_SLEEP);
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, freebs_enqueue, KM_SLEEP));
}
/*ARGSUSED*/
mblk_t *
desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP));
}
/*ARGSUSED*/
mblk_t *
esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, freebs_enqueue, KM_NOSLEEP));
}
/*ARGSUSED*/
mblk_t *
desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
mblk_t *mp;
/*
* Note that this is structured to allow the common case (i.e.
* STREAMS flowtracing disabled) to call gesballoc() with tail
* call optimization.
*/
if (!str_ftnever) {
mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
return (mp);
}
return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
frp, dblk_lastfree_desb, KM_NOSLEEP));
}
static void
bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
bcache_t *bcp = dbp->db_cache;
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
mutex_enter(&bcp->mutex);
kmem_cache_free(bcp->dblk_cache, dbp);
bcp->alloc--;
if (bcp->alloc == 0 && bcp->destroy != 0) {
kmem_cache_destroy(bcp->dblk_cache);
kmem_cache_destroy(bcp->buffer_cache);
mutex_exit(&bcp->mutex);
mutex_destroy(&bcp->mutex);
kmem_free(bcp, sizeof (bcache_t));
} else {
mutex_exit(&bcp->mutex);
}
}
bcache_t *
bcache_create(char *name, size_t size, uint_t align)
{
bcache_t *bcp;
char buffer[255];
ASSERT((align & (align - 1)) == 0);
if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
NULL) {
return (NULL);
}
bcp->size = size;
bcp->align = align;
bcp->alloc = 0;
bcp->destroy = 0;
mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
(void) sprintf(buffer, "%s_buffer_cache", name);
bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
NULL, NULL, NULL, 0);
(void) sprintf(buffer, "%s_dblk_cache", name);
bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
NULL, (void *)bcp, NULL, 0);
return (bcp);
}
void
bcache_destroy(bcache_t *bcp)
{
ASSERT(bcp != NULL);
mutex_enter(&bcp->mutex);
if (bcp->alloc == 0) {
kmem_cache_destroy(bcp->dblk_cache);
kmem_cache_destroy(bcp->buffer_cache);
mutex_exit(&bcp->mutex);
mutex_destroy(&bcp->mutex);
kmem_free(bcp, sizeof (bcache_t));
} else {
bcp->destroy++;
mutex_exit(&bcp->mutex);
}
}
/*ARGSUSED*/
mblk_t *
bcache_allocb(bcache_t *bcp, uint_t pri)
{
dblk_t *dbp;
mblk_t *mp = NULL;
ASSERT(bcp != NULL);
mutex_enter(&bcp->mutex);
if (bcp->destroy != 0) {
mutex_exit(&bcp->mutex);
goto out;
}
if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
mutex_exit(&bcp->mutex);
goto out;
}
bcp->alloc++;
mutex_exit(&bcp->mutex);
ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
out:
FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
return (mp);
}
static void
dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
{
ASSERT(dbp->db_mblk == mp);
if (dbp->db_fthdr != NULL)
str_ftfree(dbp);
/* set credp and projid to be 'unspecified' before returning to cache */
if (dbp->db_credp != NULL) {
crfree(dbp->db_credp);
dbp->db_credp = NULL;
}
dbp->db_cpid = -1;
dbp->db_struioflag = 0;
dbp->db_struioun.cksum.flags = 0;
kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
kmem_cache_free(dbp->db_cache, dbp);
}
static mblk_t *
allocb_oversize(size_t size, int kmflags)
{
mblk_t *mp;
void *buf;
size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
if ((buf = kmem_alloc(size, kmflags)) == NULL)
return (NULL);
if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
&frnop, dblk_lastfree_oversize, kmflags)) == NULL)
kmem_free(buf, size);
if (mp != NULL)
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
return (mp);
}
mblk_t *
allocb_tryhard(size_t target_size)
{
size_t size;
mblk_t *bp;
for (size = target_size; size < target_size + 512;
size += DBLK_CACHE_ALIGN)
if ((bp = allocb(size, BPRI_HI)) != NULL)
return (bp);
allocb_tryhard_fails++;
return (NULL);
}
/*
* This routine is consolidation private for STREAMS internal use
* This routine may only be called from sync routines (i.e., not
* from put or service procedures). It is located here (rather
* than strsubr.c) so that we don't have to expose all of the
* allocb() implementation details in header files.
*/
mblk_t *
allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
{
dblk_t *dbp;
mblk_t *mp;
size_t index;
index = (size -1) >> DBLK_SIZE_SHIFT;
if (flags & STR_NOSIG) {
if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
if (size != 0) {
mp = allocb_oversize(size, KM_SLEEP);
FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
(uintptr_t)mp);
return (mp);
}
index = 0;
}
dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
mp = dbp->db_mblk;
DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
mp->b_next = mp->b_prev = mp->b_cont = NULL;
mp->b_rptr = mp->b_wptr = dbp->db_base;
mp->b_queue = NULL;
MBLK_BAND_FLAG_WORD(mp) = 0;
STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
} else {
while ((mp = allocb(size, pri)) == NULL) {
if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
return (NULL);
}
}
return (mp);
}
/*
* Call function 'func' with 'arg' when a class zero block can
* be allocated with priority 'pri'.
*/
bufcall_id_t
esbbcall(uint_t pri, void (*func)(void *), void *arg)
{
return (bufcall(1, pri, func, arg));
}
/*
* Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
* ioc_id, rval and error of the struct ioctl to set up an ioctl call.
* This provides consistency for all internal allocators of ioctl.
*/
mblk_t *
mkiocb(uint_t cmd)
{
struct iocblk *ioc;
mblk_t *mp;
/*
* Allocate enough space for any of the ioctl related messages.
*/
if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
return (NULL);
bzero(mp->b_rptr, sizeof (union ioctypes));
/*
* Set the mblk_t information and ptrs correctly.
*/
mp->b_wptr += sizeof (struct iocblk);
mp->b_datap->db_type = M_IOCTL;
/*
* Fill in the fields.
*/
ioc = (struct iocblk *)mp->b_rptr;
ioc->ioc_cmd = cmd;
ioc->ioc_cr = kcred;
ioc->ioc_id = getiocseqno();
ioc->ioc_flag = IOC_NATIVE;
return (mp);
}
/*
* test if block of given size can be allocated with a request of
* the given priority.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
int
testb(size_t size, uint_t pri)
{
return ((size + sizeof (dblk_t)) <= kmem_avail());
}
/*
* Call function 'func' with argument 'arg' when there is a reasonably
* good chance that a block of size 'size' can be allocated.
* 'pri' is no longer used, but is retained for compatibility.
*/
/* ARGSUSED */
bufcall_id_t
bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
{
static long bid = 1; /* always odd to save checking for zero */
bufcall_id_t bc_id;
struct strbufcall *bcp;
if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
return (0);
bcp->bc_func = func;
bcp->bc_arg = arg;
bcp->bc_size = size;
bcp->bc_next = NULL;
bcp->bc_executor = NULL;
mutex_enter(&strbcall_lock);
/*
* After bcp is linked into strbcalls and strbcall_lock is dropped there
* should be no references to bcp since it may be freed by
* runbufcalls(). Since bcp_id field is returned, we save its value in
* the local var.
*/
bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
/*
* add newly allocated stream event to existing
* linked list of events.
*/
if (strbcalls.bc_head == NULL) {
strbcalls.bc_head = strbcalls.bc_tail = bcp;
} else {
strbcalls.bc_tail->bc_next = bcp;
strbcalls.bc_tail = bcp;
}
cv_signal(&strbcall_cv);
mutex_exit(&strbcall_lock);
return (bc_id);
}
/*
* Cancel a bufcall request.
*/
void
unbufcall(bufcall_id_t id)
{
strbufcall_t *bcp, *pbcp;
mutex_enter(&strbcall_lock);
again:
pbcp = NULL;
for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
if (id == bcp->bc_id)
break;
pbcp = bcp;
}
if (bcp) {
if (bcp->bc_executor != NULL) {
if (bcp->bc_executor != curthread) {
cv_wait(&bcall_cv, &strbcall_lock);
goto again;
}
} else {
if (pbcp)
pbcp->bc_next = bcp->bc_next;
else
strbcalls.bc_head = bcp->bc_next;
if (bcp == strbcalls.bc_tail)
strbcalls.bc_tail = pbcp;
kmem_free(bcp, sizeof (strbufcall_t));
}
}
mutex_exit(&strbcall_lock);
}
/*
* Duplicate a message block by block (uses dupb), returning
* a pointer to the duplicate message.
* Returns a non-NULL value only if the entire message
* was dup'd.
*/
mblk_t *
dupmsg(mblk_t *bp)
{
mblk_t *head, *nbp;
if (!bp || !(nbp = head = dupb(bp)))
return (NULL);
while (bp->b_cont) {
if (!(nbp->b_cont = dupb(bp->b_cont))) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
#define DUPB_NOLOAN(bp) \
((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
copyb((bp)) : dupb((bp)))
mblk_t *
dupmsg_noloan(mblk_t *bp)
{
mblk_t *head, *nbp;
if (bp == NULL || DB_TYPE(bp) != M_DATA ||
((nbp = head = DUPB_NOLOAN(bp)) == NULL))
return (NULL);
while (bp->b_cont) {
if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
/*
* Copy data from message and data block to newly allocated message and
* data block. Returns new message block pointer, or NULL if error.
* The alignment of rptr (w.r.t. word alignment) will be the same in the copy
* as in the original even when db_base is not word aligned. (bug 1052877)
*/
mblk_t *
copyb(mblk_t *bp)
{
mblk_t *nbp;
dblk_t *dp, *ndp;
uchar_t *base;
size_t size;
size_t unaligned;
ASSERT(bp->b_wptr >= bp->b_rptr);
dp = bp->b_datap;
if (dp->db_fthdr != NULL)
STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
/*
* Special handling for Multidata message; this should be
* removed once a copy-callback routine is made available.
*/
if (dp->db_type == M_MULTIDATA) {
cred_t *cr;
if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
return (NULL);
nbp->b_flag = bp->b_flag;
nbp->b_band = bp->b_band;
ndp = nbp->b_datap;
/* See comments below on potential issues. */
STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
ASSERT(ndp->db_type == dp->db_type);
cr = dp->db_credp;
if (cr != NULL)
crhold(ndp->db_credp = cr);
ndp->db_cpid = dp->db_cpid;
return (nbp);
}
size = dp->db_lim - dp->db_base;
unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
return (NULL);
nbp->b_flag = bp->b_flag;
nbp->b_band = bp->b_band;
ndp = nbp->b_datap;
/*
* Well, here is a potential issue. If we are trying to
* trace a flow, and we copy the message, we might lose
* information about where this message might have been.
* So we should inherit the FT data. On the other hand,
* a user might be interested only in alloc to free data.
* So I guess the real answer is to provide a tunable.
*/
STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
base = ndp->db_base + unaligned;
bcopy(dp->db_base, ndp->db_base + unaligned, size);
nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
return (nbp);
}
/*
* Copy data from message to newly allocated message using new
* data blocks. Returns a pointer to the new message, or NULL if error.
*/
mblk_t *
copymsg(mblk_t *bp)
{
mblk_t *head, *nbp;
if (!bp || !(nbp = head = copyb(bp)))
return (NULL);
while (bp->b_cont) {
if (!(nbp->b_cont = copyb(bp->b_cont))) {
freemsg(head);
return (NULL);
}
nbp = nbp->b_cont;
bp = bp->b_cont;
}
return (head);
}
/*
* link a message block to tail of message
*/
void
linkb(mblk_t *mp, mblk_t *bp)
{
ASSERT(mp && bp);
for (; mp->b_cont; mp = mp->b_cont)
;
mp->b_cont = bp;
}
/*
* unlink a message block from head of message
* return pointer to new message.
* NULL if message becomes empty.
*/
mblk_t *
unlinkb(mblk_t *bp)
{
mblk_t *bp1;
bp1 = bp->b_cont;
bp->b_cont = NULL;
return (bp1);
}
/*
* remove a message block "bp" from message "mp"
*
* Return pointer to new message or NULL if no message remains.
* Return -1 if bp is not found in message.
*/
mblk_t *
rmvb(mblk_t *mp, mblk_t *bp)
{
mblk_t *tmp;
mblk_t *lastp = NULL;
ASSERT(mp && bp);
for (tmp = mp; tmp; tmp = tmp->b_cont) {
if (tmp == bp) {
if (lastp)
lastp->b_cont = tmp->b_cont;
else
mp = tmp->b_cont;
tmp->b_cont = NULL;
return (mp);
}
lastp = tmp;
}
return ((mblk_t *)-1);
}
/*
* Concatenate and align first len bytes of common
* message type. Len == -1, means concat everything.
* Returns 1 on success, 0 on failure
* After the pullup, mp points to the pulled up data.
*/
int
pullupmsg(mblk_t *mp, ssize_t len)
{
mblk_t *bp, *b_cont;
dblk_t *dbp;
ssize_t n;
ASSERT(mp->b_datap->db_ref > 0);
ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (0);
if (len == -1) {
if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
return (1);
len = xmsgsize(mp);
} else {
ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
ASSERT(first_mblk_len >= 0);
/*
* If the length is less than that of the first mblk,
* we want to pull up the message into an aligned mblk.
* Though not part of the spec, some callers assume it.
*/
if (len <= first_mblk_len) {
if (str_aligned(mp->b_rptr))
return (1);
len = first_mblk_len;
} else if (xmsgsize(mp) < len)
return (0);
}
if ((bp = allocb_tmpl(len, mp)) == NULL)
return (0);
dbp = bp->b_datap;
*bp = *mp; /* swap mblks so bp heads the old msg... */
mp->b_datap = dbp; /* ... and mp heads the new message */
mp->b_datap->db_mblk = mp;
bp->b_datap->db_mblk = bp;
mp->b_rptr = mp->b_wptr = dbp->db_base;
do {
ASSERT(bp->b_datap->db_ref > 0);
ASSERT(bp->b_wptr >= bp->b_rptr);
n = MIN(bp->b_wptr - bp->b_rptr, len);
bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
mp->b_wptr += n;
bp->b_rptr += n;
len -= n;
if (bp->b_rptr != bp->b_wptr)
break;
b_cont = bp->b_cont;
freeb(bp);
bp = b_cont;
} while (len && bp);
mp->b_cont = bp; /* tack on whatever wasn't pulled up */
return (1);
}
/*
* Concatenate and align at least the first len bytes of common message
* type. Len == -1 means concatenate everything. The original message is
* unaltered. Returns a pointer to a new message on success, otherwise
* returns NULL.
*/
mblk_t *
msgpullup(mblk_t *mp, ssize_t len)
{
mblk_t *newmp;
ssize_t totlen;
ssize_t n;
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (NULL);
totlen = xmsgsize(mp);
if ((len > 0) && (len > totlen))
return (NULL);
/*
* Copy all of the first msg type into one new mblk, then dupmsg
* and link the rest onto this.
*/
len = totlen;
if ((newmp = allocb_tmpl(len, mp)) == NULL)
return (NULL);
newmp->b_flag = mp->b_flag;
newmp->b_band = mp->b_band;
while (len > 0) {
n = mp->b_wptr - mp->b_rptr;
ASSERT(n >= 0); /* allow zero-length mblk_t's */
if (n > 0)
bcopy(mp->b_rptr, newmp->b_wptr, n);
newmp->b_wptr += n;
len -= n;
mp = mp->b_cont;
}
if (mp != NULL) {
newmp->b_cont = dupmsg(mp);
if (newmp->b_cont == NULL) {
freemsg(newmp);
return (NULL);
}
}
return (newmp);
}
/*
* Trim bytes from message
* len > 0, trim from head
* len < 0, trim from tail
* Returns 1 on success, 0 on failure.
*/
int
adjmsg(mblk_t *mp, ssize_t len)
{
mblk_t *bp;
mblk_t *save_bp = NULL;
mblk_t *prev_bp;
mblk_t *bcont;
unsigned char type;
ssize_t n;
int fromhead;
int first;
ASSERT(mp != NULL);
/*
* We won't handle Multidata message, since it contains
* metadata which this function has no knowledge of; we
* assert on DEBUG, and return failure otherwise.
*/
ASSERT(mp->b_datap->db_type != M_MULTIDATA);
if (mp->b_datap->db_type == M_MULTIDATA)
return (0);
if (len < 0) {
fromhead = 0;
len = -len;
} else {
fromhead = 1;
}
if (xmsgsize(mp) < len)
return (0);
if (fromhead) {
first = 1;
while (len) {
ASSERT(mp->b_wptr >= mp->b_rptr);
n = MIN(mp->b_wptr - mp->b_rptr, len);
mp->b_rptr += n;
len -= n;
/*
* If this is not the first zero length
* message remove it
*/
if (!first && (mp->b_wptr == mp->b_rptr)) {
bcont = mp->b_cont;
freeb(mp);
mp = save_bp->b_cont = bcont;
} else {
save_bp = mp;
mp = mp->b_cont;
}
first = 0;
}
} else {
type = mp->b_datap->db_type;
while (len) {
bp = mp;
save_bp = NULL;
/*
* Find the last message of same type
*/
while (bp && bp->b_datap->db_type == type) {
ASSERT(bp->b_wptr >= bp->b_rptr);
prev_bp = save_bp;
save_bp = bp;
bp = bp->b_cont;
}
if (save_bp == NULL)
break;
n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
save_bp->b_wptr -= n;
len -= n;
/*
* If this is not the first message
* and we have taken away everything
* from this message, remove it
*/
if ((save_bp != mp) &&
(save_bp->b_wptr == save_bp->b_rptr)) {
bcont = save_bp->b_cont;
freeb(save_bp);
prev_bp->b_cont = bcont;
}
}
}
return (1);
}
/*
* get number of data bytes in message
*/
size_t
msgdsize(mblk_t *bp)
{
size_t count = 0;
for (; bp; bp = bp->b_cont)
if (bp->b_datap->db_type == M_DATA) {
ASSERT(bp->b_wptr >= bp->b_rptr);
count += bp->b_wptr - bp->b_rptr;
}
return (count);
}
/*
* Get a message off head of queue
*
* If queue has no buffers then mark queue
* with QWANTR. (queue wants to be read by
* someone when data becomes available)
*
* If there is something to take off then do so.
* If queue falls below hi water mark turn off QFULL
* flag. Decrement weighted count of queue.
* Also turn off QWANTR because queue is being read.
*
* The queue count is maintained on a per-band basis.
* Priority band 0 (normal messages) uses q_count,
* q_lowat, etc. Non-zero priority bands use the
* fields in their respective qband structures
* (qb_count, qb_lowat, etc.) All messages appear
* on the same list, linked via their b_next pointers.
* q_first is the head of the list. q_count does
* not reflect the size of all the messages on the
* queue. It only reflects those messages in the
* normal band of flow. The one exception to this
* deals with high priority messages. They are in
* their own conceptual "band", but are accounted
* against q_count.
*
* If queue count is below the lo water mark and QWANTW
* is set, enable the closest backq which has a service
* procedure and turn off the QWANTW flag.
*
* getq could be built on top of rmvq, but isn't because
* of performance considerations.
*
* A note on the use of q_count and q_mblkcnt:
* q_count is the traditional byte count for messages that
* have been put on a queue. Documentation tells us that
* we shouldn't rely on that count, but some drivers/modules
* do. What was needed, however, is a mechanism to prevent
* runaway streams from consuming all of the resources,
* and particularly be able to flow control zero-length
* messages. q_mblkcnt is used for this purpose. It
* counts the number of mblk's that are being put on
* the queue. The intention here, is that each mblk should
* contain one byte of data and, for the purpose of
* flow-control, logically does. A queue will become
* full when EITHER of these values (q_count and q_mblkcnt)
* reach the highwater mark. It will clear when BOTH
* of them drop below the highwater mark. And it will
* backenable when BOTH of them drop below the lowwater
* mark.
* With this algorithm, a driver/module might be able
* to find a reasonably accurate q_count, and the
* framework can still try and limit resource usage.
*/
mblk_t *
getq(queue_t *q)
{
mblk_t *bp;
uchar_t band = 0;
bp = getq_noenab(q);
if (bp != NULL)
band = bp->b_band;
/*
* Inlined from qbackenable().
* Quick check without holding the lock.
*/
if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
return (bp);
qbackenable(q, band);
return (bp);
}
/*
* Calculate number of data bytes in a single data message block taking
* multidata messages into account.
*/
#define ADD_MBLK_SIZE(mp, size) \
if (DB_TYPE(mp) != M_MULTIDATA) { \
(size) += MBLKL(mp); \
} else { \
uint_t pinuse; \
\
mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
(size) += pinuse; \
}
/*
* Like getq() but does not backenable. This is used by the stream
* head when a putback() is likely. The caller must call qbackenable()
* after it is done with accessing the queue.
*/
mblk_t *
getq_noenab(queue_t *q)
{
mblk_t *bp;
mblk_t *tmp;
qband_t *qbp;
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
/* freezestr should allow its caller to call getq/putq */
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if ((bp = q->q_first) == 0) {
q->q_flag |= QWANTR;
} else {
if ((q->q_first = bp->b_next) == NULL)
q->q_last = NULL;
else
q->q_first->b_prev = NULL;
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
ADD_MBLK_SIZE(tmp, bytecnt);
mblkcnt++;
}
if (bp->b_band == 0) {
q->q_count -= bytecnt;
q->q_mblkcnt -= mblkcnt;
if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat))) {
q->q_flag &= ~QFULL;
}
} else {
int i;
ASSERT(bp->b_band <= q->q_nband);
ASSERT(q->q_bandp != NULL);
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
i = bp->b_band;
while (--i > 0)
qbp = qbp->qb_next;
if (qbp->qb_first == qbp->qb_last) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
} else {
qbp->qb_first = bp->b_next;
}
qbp->qb_count -= bytecnt;
qbp->qb_mblkcnt -= mblkcnt;
if (qbp->qb_mblkcnt == 0 ||
((qbp->qb_count < qbp->qb_hiwat) &&
(qbp->qb_mblkcnt < qbp->qb_hiwat))) {
qbp->qb_flag &= ~QB_FULL;
}
}
q->q_flag &= ~QWANTR;
bp->b_next = NULL;
bp->b_prev = NULL;
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
return (bp);
}
/*
* Determine if a backenable is needed after removing a message in the
* specified band.
* NOTE: This routine assumes that something like getq_noenab() has been
* already called.
*
* For the read side it is ok to hold sd_lock across calling this (and the
* stream head often does).
* But for the write side strwakeq might be invoked and it acquires sd_lock.
*/
void
qbackenable(queue_t *q, uchar_t band)
{
int backenab = 0;
qband_t *qbp;
kthread_id_t freezer;
ASSERT(q);
ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
/*
* Quick check without holding the lock.
* OK since after getq() has lowered the q_count these flags
* would not change unless either the qbackenable() is done by
* another thread (which is ok) or the queue has gotten QFULL
* in which case another backenable will take place when the queue
* drops below q_lowat.
*/
if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
return;
/* freezestr should allow its caller to call getq/putq */
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (band == 0) {
if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
q->q_mblkcnt < q->q_lowat)) {
backenab = q->q_flag & (QWANTW|QWANTWSYNC);
}
} else {
int i;
ASSERT((unsigned)band <= q->q_nband);
ASSERT(q->q_bandp != NULL);
qbp = q->q_bandp;
i = band;
while (--i > 0)
qbp = qbp->qb_next;
if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
qbp->qb_mblkcnt < qbp->qb_lowat)) {
backenab = qbp->qb_flag & QB_WANTW;
}
}
if (backenab == 0) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return;
}
/* Have to drop the lock across strwakeq and backenable */
if (backenab & QWANTWSYNC)
q->q_flag &= ~QWANTWSYNC;
if (backenab & (QWANTW|QB_WANTW)) {
if (band != 0)
qbp->qb_flag &= ~QB_WANTW;
else {
q->q_flag &= ~QWANTW;
}
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
if (backenab & QWANTWSYNC)
strwakeq(q, QWANTWSYNC);
if (backenab & (QWANTW|QB_WANTW))
backenable(q, band);
}
/*
* Remove a message from a queue. The queue count and other
* flow control parameters are adjusted and the back queue
* enabled if necessary.
*
* rmvq can be called with the stream frozen, but other utility functions
* holding QLOCK, and by streams modules without any locks/frozen.
*/
void
rmvq(queue_t *q, mblk_t *mp)
{
ASSERT(mp != NULL);
rmvq_noenab(q, mp);
if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
/*
* qbackenable can handle a frozen stream but not a "random"
* qlock being held. Drop lock across qbackenable.
*/
mutex_exit(QLOCK(q));
qbackenable(q, mp->b_band);
mutex_enter(QLOCK(q));
} else {
qbackenable(q, mp->b_band);
}
}
/*
* Like rmvq() but without any backenabling.
* This exists to handle SR_CONSOL_DATA in strrput().
*/
void
rmvq_noenab(queue_t *q, mblk_t *mp)
{
mblk_t *tmp;
int i;
qband_t *qbp = NULL;
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
freezer = curthread;
} else
mutex_enter(QLOCK(q));
ASSERT(mp->b_band <= q->q_nband);
if (mp->b_band != 0) { /* Adjust band pointers */
ASSERT(q->q_bandp != NULL);
qbp = q->q_bandp;
i = mp->b_band;
while (--i > 0)
qbp = qbp->qb_next;
if (mp == qbp->qb_first) {
if (mp->b_next && mp->b_band == mp->b_next->b_band)
qbp->qb_first = mp->b_next;
else
qbp->qb_first = NULL;
}
if (mp == qbp->qb_last) {
if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
qbp->qb_last = mp->b_prev;
else
qbp->qb_last = NULL;
}
}
/*
* Remove the message from the list.
*/
if (mp->b_prev)
mp->b_prev->b_next = mp->b_next;
else
q->q_first = mp->b_next;
if (mp->b_next)
mp->b_next->b_prev = mp->b_prev;
else
q->q_last = mp->b_prev;
mp->b_next = NULL;
mp->b_prev = NULL;
/* Get the size of the message for q_count accounting */
for (tmp = mp; tmp; tmp = tmp->b_cont) {
ADD_MBLK_SIZE(tmp, bytecnt);
mblkcnt++;
}
if (mp->b_band == 0) { /* Perform q_count accounting */
q->q_count -= bytecnt;
q->q_mblkcnt -= mblkcnt;
if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
(q->q_mblkcnt < q->q_hiwat))) {
q->q_flag &= ~QFULL;
}
} else { /* Perform qb_count accounting */
qbp->qb_count -= bytecnt;
qbp->qb_mblkcnt -= mblkcnt;
if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
(qbp->qb_mblkcnt < qbp->qb_hiwat))) {
qbp->qb_flag &= ~QB_FULL;
}
}
if (freezer != curthread)
mutex_exit(QLOCK(q));
STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
}
/*
* Empty a queue.
* If flag is set, remove all messages. Otherwise, remove
* only non-control messages. If queue falls below its low
* water mark, and QWANTW is set, enable the nearest upstream
* service procedure.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushq did not have a check
* for q_lowat == 0 in the backenabling test.
*
* pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
* if one exists on the queue.
*/
void
flushq_common(queue_t *q, int flag, int pcproto_flag)
{
mblk_t *mp, *nmp;
qband_t *qbp;
int backenab = 0;
unsigned char bpri;
unsigned char qbf[NBAND]; /* band flushing backenable flags */
if (q->q_first == NULL)
return;
mutex_enter(QLOCK(q));
mp = q->q_first;
q->q_first = NULL;
q->q_last = NULL;
q->q_count = 0;
q->q_mblkcnt = 0;
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
qbp->qb_count = 0;
qbp->qb_mblkcnt = 0;
qbp->qb_flag &= ~QB_FULL;
}
q->q_flag &= ~QFULL;
mutex_exit(QLOCK(q));
while (mp) {
nmp = mp->b_next;
mp->b_next = mp->b_prev = NULL;
STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
(void) putq(q, mp);
else if (flag || datamsg(mp->b_datap->db_type))
freemsg(mp);
else
(void) putq(q, mp);
mp = nmp;
}
bpri = 1;
mutex_enter(QLOCK(q));
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
if ((qbp->qb_flag & QB_WANTW) &&
(((qbp->qb_count < qbp->qb_lowat) &&
(qbp->qb_mblkcnt < qbp->qb_lowat)) ||
qbp->qb_lowat == 0)) {
qbp->qb_flag &= ~QB_WANTW;
backenab = 1;
qbf[bpri] = 1;
} else
qbf[bpri] = 0;
bpri++;
}
ASSERT(bpri == (unsigned char)(q->q_nband + 1));
if ((q->q_flag & QWANTW) &&
(((q->q_count < q->q_lowat) &&
(q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
q->q_flag &= ~QWANTW;
backenab = 1;
qbf[0] = 1;
} else
qbf[0] = 0;
/*
* If any band can now be written to, and there is a writer
* for that band, then backenable the closest service procedure.
*/
if (backenab) {
mutex_exit(QLOCK(q));
for (bpri = q->q_nband; bpri != 0; bpri--)
if (qbf[bpri])
backenable(q, bpri);
if (qbf[0])
backenable(q, 0);
} else
mutex_exit(QLOCK(q));
}
/*
* The real flushing takes place in flushq_common. This is done so that
* a flag which specifies whether or not M_PCPROTO messages should be flushed
* or not. Currently the only place that uses this flag is the stream head.
*/
void
flushq(queue_t *q, int flag)
{
flushq_common(q, flag, 0);
}
/*
* Flush the queue of messages of the given priority band.
* There is some duplication of code between flushq and flushband.
* This is because we want to optimize the code as much as possible.
* The assumption is that there will be more messages in the normal
* (priority 0) band than in any other.
*
* Historical note: when merging the M_FLUSH code in strrput with this
* code one difference was discovered. flushband had an extra check for
* did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
* case. That check does not match the man page for flushband and was not
* in the strrput flush code hence it was removed.
*/
void
flushband(queue_t *q, unsigned char pri, int flag)
{
mblk_t *mp;
mblk_t *nmp;
mblk_t *last;
qband_t *qbp;
int band;
ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
if (pri > q->q_nband) {
return;
}
mutex_enter(QLOCK(q));
if (pri == 0) {
mp = q->q_first;
q->q_first = NULL;
q->q_last = NULL;
q->q_count = 0;
q->q_mblkcnt = 0;
for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
qbp->qb_first = NULL;
qbp->qb_last = NULL;
qbp->qb_count = 0;
qbp->qb_mblkcnt = 0;
qbp->qb_flag &= ~QB_FULL;
}
q->q_flag &= ~QFULL;
mutex_exit(QLOCK(q));
while (mp) {
nmp = mp->b_next;
mp->b_next = mp->b_prev = NULL;
if ((mp->b_band == 0) &&
((flag == FLUSHALL) ||
datamsg(mp->b_datap->db_type)))
freemsg(mp);
else
(void) putq(q, mp);
mp = nmp;
}
mutex_enter(QLOCK(q));
if ((q->q_flag & QWANTW) &&
(((q->q_count < q->q_lowat) &&
(q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
q->q_flag &= ~QWANTW;
mutex_exit(QLOCK(q));
backenable(q, pri);
} else
mutex_exit(QLOCK(q));
} else { /* pri != 0 */
boolean_t flushed = B_FALSE;
band = pri;
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
while (--band > 0)
qbp = qbp->qb_next;
mp = qbp->qb_first;
if (mp == NULL) {
mutex_exit(QLOCK(q));
return;
}
last = qbp->qb_last->b_next;
/*
* rmvq_noenab() and freemsg() are called for each mblk that
* meets the criteria. The loop is executed until the last
* mblk has been processed.
*/
while (mp != last) {
ASSERT(mp->b_band == pri);
nmp = mp->b_next;
if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
rmvq_noenab(q, mp);
freemsg(mp);
flushed = B_TRUE;
}
mp = nmp;
}
mutex_exit(QLOCK(q));
/*
* If any mblk(s) has been freed, we know that qbackenable()
* will need to be called.
*/
if (flushed)
qbackenable(q, pri);
}
}
/*
* Return 1 if the queue is not full. If the queue is full, return
* 0 (may not put message) and set QWANTW flag (caller wants to write
* to the queue).
*/
int
canput(queue_t *q)
{
TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
/* this is for loopback transports, they should not do a canput */
ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
if (!(q->q_flag & QFULL)) {
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
return (1);
}
mutex_enter(QLOCK(q));
if (q->q_flag & QFULL) {
q->q_flag |= QWANTW;
mutex_exit(QLOCK(q));
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
return (0);
}
mutex_exit(QLOCK(q));
TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
return (1);
}
/*
* This is the new canput for use with priority bands. Return 1 if the
* band is not full. If the band is full, return 0 (may not put message)
* and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
* write to the queue).
*/
int
bcanput(queue_t *q, unsigned char pri)
{
qband_t *qbp;
TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
if (!q)
return (0);
/* Find next forward module that has a service procedure */
q = q->q_nfsrv;
mutex_enter(QLOCK(q));
if (pri == 0) {
if (q->q_flag & QFULL) {
q->q_flag |= QWANTW;
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
} else { /* pri != 0 */
if (pri > q->q_nband) {
/*
* No band exists yet, so return success.
*/
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 1);
return (1);
}
qbp = q->q_bandp;
while (--pri)
qbp = qbp->qb_next;
if (qbp->qb_flag & QB_FULL) {
qbp->qb_flag |= QB_WANTW;
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 0);
return (0);
}
}
mutex_exit(QLOCK(q));
TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
"bcanput:%p %X %d", q, pri, 1);
return (1);
}
/*
* Put a message on a queue.
*
* Messages are enqueued on a priority basis. The priority classes
* are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
* and B_NORMAL (type < QPCTL && band == 0).
*
* Add appropriate weighted data block sizes to queue count.
* If queue hits high water mark then set QFULL flag.
*
* If QNOENAB is not set (putq is allowed to enable the queue),
* enable the queue only if the message is PRIORITY,
* or the QWANTR flag is set (indicating that the service procedure
* is ready to read the queue. This implies that a service
* procedure must NEVER put a high priority message back on its own
* queue, as this would result in an infinite loop (!).
*/
int
putq(queue_t *q, mblk_t *bp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(bp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
if (mcls == QPCTL) {
if (bp->b_band != 0)
bp->b_band = 0; /* force to be correct */
} else if (bp->b_band != 0) {
int i;
qband_t **qbpp;
if (bp->b_band > q->q_nband) {
/*
* The qband structure for this priority band is
* not on the queue yet, so we have to allocate
* one on the fly. It would be wasteful to
* associate the qband structures with every
* queue when the queues are allocated. This is
* because most queues will only need the normal
* band of flow which can be described entirely
* by the queue itself.
*/
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (bp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
ASSERT(MUTEX_HELD(QLOCK(q)));
qbp = q->q_bandp;
i = bp->b_band;
while (--i)
qbp = qbp->qb_next;
}
/*
* If queue is empty, add the message and initialize the pointers.
* Otherwise, adjust message pointers and queue pointers based on
* the type of the message and where it belongs on the queue. Some
* code is duplicated to minimize the number of conditionals and
* hopefully minimize the amount of time this routine takes.
*/
if (!q->q_first) {
bp->b_next = NULL;
bp->b_prev = NULL;
q->q_first = bp;
q->q_last = bp;
if (qbp) {
qbp->qb_first = bp;
qbp->qb_last = bp;
}
} else if (!qbp) { /* bp->b_band == 0 */
/*
* If queue class of message is less than or equal to
* that of the last one on the queue, tack on to the end.
*/
tmp = q->q_last;
if (mcls <= (int)queclass(tmp)) {
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while ((int)queclass(tmp) >= mcls)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
} else { /* bp->b_band != 0 */
if (qbp->qb_first) {
tmp = qbp->qb_last;
/*
* Insert bp after the last message in this band.
*/
bp->b_next = tmp->b_next;
if (tmp->b_next)
tmp->b_next->b_prev = bp;
else
q->q_last = bp;
bp->b_prev = tmp;
tmp->b_next = bp;
} else {
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) ||
(bp->b_band <= tmp->b_band)) {
/*
* Tack bp on end of queue.
*/
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band >= bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
qbp->qb_first = bp;
}
qbp->qb_last = bp;
}
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
ADD_MBLK_SIZE(tmp, bytecnt);
mblkcnt++;
}
if (qbp) {
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
if ((mcls > QNORM) ||
(canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Put stuff back at beginning of Q according to priority order.
* See comment on putq above for details.
*/
int
putbq(queue_t *q, mblk_t *bp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(bp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
ASSERT(q && bp);
ASSERT(bp->b_next == NULL);
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
/*
* Make sanity checks and if qband structure is not yet
* allocated, do so.
*/
if (mcls == QPCTL) {
if (bp->b_band != 0)
bp->b_band = 0; /* force to be correct */
} else if (bp->b_band != 0) {
int i;
qband_t **qbpp;
if (bp->b_band > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (bp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = bp->b_band;
while (--i)
qbp = qbp->qb_next;
}
/*
* If queue is empty or if message is high priority,
* place on the front of the queue.
*/
tmp = q->q_first;
if ((!tmp) || (mcls == QPCTL)) {
bp->b_next = tmp;
if (tmp)
tmp->b_prev = bp;
else
q->q_last = bp;
q->q_first = bp;
bp->b_prev = NULL;
if (qbp) {
qbp->qb_first = bp;
qbp->qb_last = bp;
}
} else if (qbp) { /* bp->b_band != 0 */
tmp = qbp->qb_first;
if (tmp) {
/*
* Insert bp before the first message in this band.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
} else {
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) ||
(bp->b_band < tmp->b_band)) {
/*
* Tack bp on end of queue.
*/
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band > bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
qbp->qb_last = bp;
}
qbp->qb_first = bp;
} else { /* bp->b_band == 0 && !QPCTL */
/*
* If the queue class or band is less than that of the last
* message on the queue, tack bp on the end of the queue.
*/
tmp = q->q_last;
if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
bp->b_next = NULL;
bp->b_prev = tmp;
tmp->b_next = bp;
q->q_last = bp;
} else {
tmp = q->q_first;
while (tmp->b_datap->db_type >= QPCTL)
tmp = tmp->b_next;
while (tmp->b_band > bp->b_band)
tmp = tmp->b_next;
/*
* Insert bp before tmp.
*/
bp->b_next = tmp;
bp->b_prev = tmp->b_prev;
if (tmp->b_prev)
tmp->b_prev->b_next = bp;
else
q->q_first = bp;
tmp->b_prev = bp;
}
}
/* Get message byte count for q_count accounting */
for (tmp = bp; tmp; tmp = tmp->b_cont) {
ADD_MBLK_SIZE(tmp, bytecnt);
mblkcnt++;
}
if (qbp) {
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Insert a message before an existing message on the queue. If the
* existing message is NULL, the new messages is placed on the end of
* the queue. The queue class of the new message is ignored. However,
* the priority band of the new message must adhere to the following
* ordering:
*
* emp->b_prev->b_band >= mp->b_band >= emp->b_band.
*
* All flow control parameters are updated.
*
* insq can be called with the stream frozen, but other utility functions
* holding QLOCK, and by streams modules without any locks/frozen.
*/
int
insq(queue_t *q, mblk_t *emp, mblk_t *mp)
{
mblk_t *tmp;
qband_t *qbp = NULL;
int mcls = (int)queclass(mp);
kthread_id_t freezer;
int bytecnt = 0, mblkcnt = 0;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else if (MUTEX_HELD(QLOCK(q))) {
/* Don't drop lock on exit */
freezer = curthread;
} else
mutex_enter(QLOCK(q));
if (mcls == QPCTL) {
if (mp->b_band != 0)
mp->b_band = 0; /* force to be correct */
if (emp && emp->b_prev &&
(emp->b_prev->b_datap->db_type < QPCTL))
goto badord;
}
if (emp) {
if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
(emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
(emp->b_prev->b_band < mp->b_band))) {
goto badord;
}
} else {
tmp = q->q_last;
if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
badord:
cmn_err(CE_WARN,
"insq: attempt to insert message out of order "
"on q %p", (void *)q);
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
}
if (mp->b_band != 0) {
int i;
qband_t **qbpp;
if (mp->b_band > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (mp->b_band > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (0);
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = mp->b_band;
while (--i)
qbp = qbp->qb_next;
}
if ((mp->b_next = emp) != NULL) {
if ((mp->b_prev = emp->b_prev) != NULL)
emp->b_prev->b_next = mp;
else
q->q_first = mp;
emp->b_prev = mp;
} else {
if ((mp->b_prev = q->q_last) != NULL)
q->q_last->b_next = mp;
else
q->q_first = mp;
q->q_last = mp;
}
/* Get mblk and byte count for q_count accounting */
for (tmp = mp; tmp; tmp = tmp->b_cont) {
ADD_MBLK_SIZE(tmp, bytecnt);
mblkcnt++;
}
if (qbp) { /* adjust qband pointers and count */
if (!qbp->qb_first) {
qbp->qb_first = mp;
qbp->qb_last = mp;
} else {
if (mp->b_prev == NULL || (mp->b_prev != NULL &&
(mp->b_prev->b_band != mp->b_band)))
qbp->qb_first = mp;
else if (mp->b_next == NULL || (mp->b_next != NULL &&
(mp->b_next->b_band != mp->b_band)))
qbp->qb_last = mp;
}
qbp->qb_count += bytecnt;
qbp->qb_mblkcnt += mblkcnt;
if ((qbp->qb_count >= qbp->qb_hiwat) ||
(qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
qbp->qb_flag |= QB_FULL;
}
} else {
q->q_count += bytecnt;
q->q_mblkcnt += mblkcnt;
if ((q->q_count >= q->q_hiwat) ||
(q->q_mblkcnt >= q->q_hiwat)) {
q->q_flag |= QFULL;
}
}
STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
if (canenable(q) && (q->q_flag & QWANTR))
qenable_locked(q);
ASSERT(MUTEX_HELD(QLOCK(q)));
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (1);
}
/*
* Create and put a control message on queue.
*/
int
putctl(queue_t *q, int type)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
(bp = allocb_tryhard(0)) == NULL)
return (0);
bp->b_datap->db_type = (unsigned char) type;
put(q, bp);
return (1);
}
/*
* Control message with a single-byte parameter
*/
int
putctl1(queue_t *q, int type, int param)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
(bp = allocb_tryhard(1)) == NULL)
return (0);
bp->b_datap->db_type = (unsigned char)type;
*bp->b_wptr++ = (unsigned char)param;
put(q, bp);
return (1);
}
int
putnextctl1(queue_t *q, int type, int param)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
((bp = allocb_tryhard(1)) == NULL))
return (0);
bp->b_datap->db_type = (unsigned char)type;
*bp->b_wptr++ = (unsigned char)param;
putnext(q, bp);
return (1);
}
int
putnextctl(queue_t *q, int type)
{
mblk_t *bp;
if ((datamsg(type) && (type != M_DELAY)) ||
((bp = allocb_tryhard(0)) == NULL))
return (0);
bp->b_datap->db_type = (unsigned char)type;
putnext(q, bp);
return (1);
}
/*
* Return the queue upstream from this one
*/
queue_t *
backq(queue_t *q)
{
q = _OTHERQ(q);
if (q->q_next) {
q = q->q_next;
return (_OTHERQ(q));
}
return (NULL);
}
/*
* Send a block back up the queue in reverse from this
* one (e.g. to respond to ioctls)
*/
void
qreply(queue_t *q, mblk_t *bp)
{
ASSERT(q && bp);
putnext(_OTHERQ(q), bp);
}
/*
* Streams Queue Scheduling
*
* Queues are enabled through qenable() when they have messages to
* process. They are serviced by queuerun(), which runs each enabled
* queue's service procedure. The call to queuerun() is processor
* dependent - the general principle is that it be run whenever a queue
* is enabled but before returning to user level. For system calls,
* the function runqueues() is called if their action causes a queue
* to be enabled. For device interrupts, queuerun() should be
* called before returning from the last level of interrupt. Beyond
* this, no timing assumptions should be made about queue scheduling.
*/
/*
* Enable a queue: put it on list of those whose service procedures are
* ready to run and set up the scheduling mechanism.
* The broadcast is done outside the mutex -> to avoid the woken thread
* from contending with the mutex. This is OK 'cos the queue has been
* enqueued on the runlist and flagged safely at this point.
*/
void
qenable(queue_t *q)
{
mutex_enter(QLOCK(q));
qenable_locked(q);
mutex_exit(QLOCK(q));
}
/*
* Return number of messages on queue
*/
int
qsize(queue_t *qp)
{
int count = 0;
mblk_t *mp;
mutex_enter(QLOCK(qp));
for (mp = qp->q_first; mp; mp = mp->b_next)
count++;
mutex_exit(QLOCK(qp));
return (count);
}
/*
* noenable - set queue so that putq() will not enable it.
* enableok - set queue so that putq() can enable it.
*/
void
noenable(queue_t *q)
{
mutex_enter(QLOCK(q));
q->q_flag |= QNOENB;
mutex_exit(QLOCK(q));
}
void
enableok(queue_t *q)
{
mutex_enter(QLOCK(q));
q->q_flag &= ~QNOENB;
mutex_exit(QLOCK(q));
}
/*
* Set queue fields.
*/
int
strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
{
qband_t *qbp = NULL;
queue_t *wrq;
int error = 0;
kthread_id_t freezer;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (what >= QBAD) {
error = EINVAL;
goto done;
}
if (pri != 0) {
int i;
qband_t **qbpp;
if (pri > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (pri > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
error = EAGAIN;
goto done;
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = pri;
while (--i)
qbp = qbp->qb_next;
}
switch (what) {
case QHIWAT:
if (qbp)
qbp->qb_hiwat = (size_t)val;
else
q->q_hiwat = (size_t)val;
break;
case QLOWAT:
if (qbp)
qbp->qb_lowat = (size_t)val;
else
q->q_lowat = (size_t)val;
break;
case QMAXPSZ:
if (qbp)
error = EINVAL;
else
q->q_maxpsz = (ssize_t)val;
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
wrq = STREAM(q)->sd_wrq;
if (q != wrq->q_next)
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
if (freezer != curthread) {
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(wrq));
}
ASSERT(MUTEX_HELD(QLOCK(wrq)));
if (strmsgsz != 0) {
if (val == INFPSZ)
val = strmsgsz;
else {
if (STREAM(q)->sd_vnode->v_type == VFIFO)
val = MIN(PIPE_BUF, val);
else
val = MIN(strmsgsz, val);
}
}
STREAM(q)->sd_qn_maxpsz = val;
if (freezer != curthread) {
mutex_exit(QLOCK(wrq));
mutex_enter(QLOCK(q));
}
break;
case QMINPSZ:
if (qbp)
error = EINVAL;
else
q->q_minpsz = (ssize_t)val;
/*
* Performance concern, strwrite looks at the module below
* the stream head for the maxpsz each time it does a write
* we now cache it at the stream head. Check to see if this
* queue is sitting directly below the stream head.
*/
wrq = STREAM(q)->sd_wrq;
if (q != wrq->q_next)
break;
/*
* If the stream is not frozen drop the current QLOCK and
* acquire the sd_wrq QLOCK which protects sd_qn_*
*/
if (freezer != curthread) {
mutex_exit(QLOCK(q));
mutex_enter(QLOCK(wrq));
}
STREAM(q)->sd_qn_minpsz = (ssize_t)val;
if (freezer != curthread) {
mutex_exit(QLOCK(wrq));
mutex_enter(QLOCK(q));
}
break;
case QSTRUIOT:
if (qbp)
error = EINVAL;
else
q->q_struiot = (ushort_t)val;
break;
case QCOUNT:
case QFIRST:
case QLAST:
case QFLAG:
error = EPERM;
break;
default:
error = EINVAL;
break;
}
done:
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (error);
}
/*
* Get queue fields.
*/
int
strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
{
qband_t *qbp = NULL;
int error = 0;
kthread_id_t freezer;
freezer = STREAM(q)->sd_freezer;
if (freezer == curthread) {
ASSERT(frozenstr(q));
ASSERT(MUTEX_HELD(QLOCK(q)));
} else
mutex_enter(QLOCK(q));
if (what >= QBAD) {
error = EINVAL;
goto done;
}
if (pri != 0) {
int i;
qband_t **qbpp;
if (pri > q->q_nband) {
qbpp = &q->q_bandp;
while (*qbpp)
qbpp = &(*qbpp)->qb_next;
while (pri > q->q_nband) {
if ((*qbpp = allocband()) == NULL) {
error = EAGAIN;
goto done;
}
(*qbpp)->qb_hiwat = q->q_hiwat;
(*qbpp)->qb_lowat = q->q_lowat;
q->q_nband++;
qbpp = &(*qbpp)->qb_next;
}
}
qbp = q->q_bandp;
i = pri;
while (--i)
qbp = qbp->qb_next;
}
switch (what) {
case QHIWAT:
if (qbp)
*(size_t *)valp = qbp->qb_hiwat;
else
*(size_t *)valp = q->q_hiwat;
break;
case QLOWAT:
if (qbp)
*(size_t *)valp = qbp->qb_lowat;
else
*(size_t *)valp = q->q_lowat;
break;
case QMAXPSZ:
if (qbp)
error = EINVAL;
else
*(ssize_t *)valp = q->q_maxpsz;
break;
case QMINPSZ:
if (qbp)
error = EINVAL;
else
*(ssize_t *)valp = q->q_minpsz;
break;
case QCOUNT:
if (qbp)
*(size_t *)valp = qbp->qb_count;
else
*(size_t *)valp = q->q_count;
break;
case QFIRST:
if (qbp)
*(mblk_t **)valp = qbp->qb_first;
else
*(mblk_t **)valp = q->q_first;
break;
case QLAST:
if (qbp)
*(mblk_t **)valp = qbp->qb_last;
else
*(mblk_t **)valp = q->q_last;
break;
case QFLAG:
if (qbp)
*(uint_t *)valp = qbp->qb_flag;
else
*(uint_t *)valp = q->q_flag;
break;
case QSTRUIOT:
if (qbp)
error = EINVAL;
else
*(short *)valp = q->q_struiot;
break;
default:
error = EINVAL;
break;
}
done:
if (freezer != curthread)
mutex_exit(QLOCK(q));
return (error);
}
/*
* Function awakes all in cvwait/sigwait/pollwait, on one of:
* QWANTWSYNC or QWANTR or QWANTW,
*
* Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
* deferred wakeup will be done. Also if strpoll() in progress then a
* deferred pollwakeup will be done.
*/
void
strwakeq(queue_t *q, int flag)
{
stdata_t *stp = STREAM(q);
pollhead_t *pl;
mutex_enter(&stp->sd_lock);
pl = &stp->sd_pollist;
if (flag & QWANTWSYNC) {
ASSERT(!(q->q_flag & QREADR));
if (stp->sd_flag & WSLEEP) {
stp->sd_flag &= ~WSLEEP;
cv_broadcast(&stp->sd_wrq->q_wait);
} else {
stp->sd_wakeq |= WSLEEP;
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLWRNORM);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_WRNORM)
strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
} else if (flag & QWANTR) {
if (stp->sd_flag & RSLEEP) {
stp->sd_flag &= ~RSLEEP;
cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
} else {
stp->sd_wakeq |= RSLEEP;
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLIN | POLLRDNORM);
mutex_enter(&stp->sd_lock);
{
int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
if (events)
strsendsig(stp->sd_siglist, events, 0, 0);
}
} else {
if (stp->sd_flag & WSLEEP) {
stp->sd_flag &= ~WSLEEP;
cv_broadcast(&stp->sd_wrq->q_wait);
}
mutex_exit(&stp->sd_lock);
pollwakeup(pl, POLLWRNORM);
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & S_WRNORM)
strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
}
mutex_exit(&stp->sd_lock);
}
int
struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
{
stdata_t *stp = STREAM(q);
int typ = STRUIOT_STANDARD;
uio_t *uiop = &dp->d_uio;
dblk_t *dbp;
ssize_t uiocnt;
ssize_t cnt;
unsigned char *ptr;
ssize_t resid;
int error = 0;
on_trap_data_t otd;
queue_t *stwrq;
/*
* Plumbing may change while taking the type so store the
* queue in a temporary variable. It doesn't matter even
* if the we take the type from the previous plumbing,
* that's because if the plumbing has changed when we were
* holding the queue in a temporary variable, we can continue
* processing the message the way it would have been processed
* in the old plumbing, without any side effects but a bit
* extra processing for partial ip header checksum.
*
* This has been done to avoid holding the sd_lock which is
* very hot.
*/
stwrq = stp->sd_struiowrq;
if (stwrq)
typ = stwrq->q_struiot;
for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
dbp = mp->b_datap;
ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
cnt = MIN(uiocnt, uiop->uio_resid);
if (!(dbp->db_struioflag & STRUIO_SPEC) ||
(dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
/*
* Either this mblk has already been processed
* or there is no more room in this mblk (?).
*/
continue;
}
switch (typ) {
case STRUIOT_STANDARD:
if (noblock) {
if (on_trap(&otd, OT_DATA_ACCESS)) {
no_trap();
error = EWOULDBLOCK;
goto out;
}
}
if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
if (noblock)
no_trap();
goto out;
}
if (noblock)
no_trap();
break;
default:
error = EIO;
goto out;
}
dbp->db_struioflag |= STRUIO_DONE;
dbp->db_cksumstuff += cnt;
}
out:
if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
/*
* A fault has occured and some bytes were moved to the
* current mblk, the uio_t has already been updated by
* the appropriate uio routine, so also update the mblk
* to reflect this in case this same mblk chain is used
* again (after the fault has been handled).
*/
uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
if (uiocnt >= resid)
dbp->db_cksumstuff += resid;
}
return (error);
}
/*
* Try to enter queue synchronously. Any attempt to enter a closing queue will
* fails. The qp->q_rwcnt keeps track of the number of successful entries so
* that removeq() will not try to close the queue while a thread is inside the
* queue.
*/
static boolean_t
rwnext_enter(queue_t *qp)
{
mutex_enter(QLOCK(qp));
if (qp->q_flag & QWCLOSE) {
mutex_exit(QLOCK(qp));
return (B_FALSE);
}
qp->q_rwcnt++;
ASSERT(qp->q_rwcnt != 0);
mutex_exit(QLOCK(qp));
return (B_TRUE);
}
/*
* Decrease the count of threads running in sync stream queue and wake up any
* threads blocked in removeq().
*/
static void
rwnext_exit(queue_t *qp)
{
mutex_enter(QLOCK(qp));
qp->q_rwcnt--;
if (qp->q_flag & QWANTRMQSYNC) {
qp->q_flag &= ~QWANTRMQSYNC;
cv_broadcast(&qp->q_wait);
}
mutex_exit(QLOCK(qp));
}
/*
* The purpose of rwnext() is to call the rw procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue sq_count is incremented and it does
* not matter if any regular put entrypoints have been already entered. We
* can't increment one of the sq_putcounts (instead of sq_count) because
* qwait_rw won't know which counter to decrement.
*
* It would be reasonable to add the lockless FASTPUT logic.
*/
int
rwnext(queue_t *qp, struiod_t *dp)
{
queue_t *nqp;
syncq_t *sq;
uint16_t count;
uint16_t flags;
struct qinit *qi;
int (*proc)();
struct stdata *stp;
int isread;
int rval;
stp = STREAM(qp);
/*
* Prevent q_next from changing by holding sd_lock until acquiring
* SQLOCK. Note that a read-side rwnext from the streamhead will
* already have sd_lock acquired. In either case sd_lock is always
* released after acquiring SQLOCK.
*
* The streamhead read-side holding sd_lock when calling rwnext is
* required to prevent a race condition were M_DATA mblks flowing
* up the read-side of the stream could be bypassed by a rwnext()
* down-call. In this case sd_lock acts as the streamhead perimeter.
*/
if ((nqp = _WR(qp)) == qp) {
isread = 0;
mutex_enter(&stp->sd_lock);
qp = nqp->q_next;
} else {
isread = 1;
if (nqp != stp->sd_wrq)
/* Not streamhead */
mutex_enter(&stp->sd_lock);
qp = _RD(nqp->q_next);
}
qi = qp->q_qinfo;
if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
/*
* Not a synchronous module or no r/w procedure for this
* queue, so just return EINVAL and let the caller handle it.
*/
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
if (rwnext_enter(qp) == B_FALSE) {
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
sq = qp->q_syncq;
mutex_enter(SQLOCK(sq));
mutex_exit(&stp->sd_lock);
count = sq->sq_count;
flags = sq->sq_flags;
ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
/*
* if this queue is being closed, return.
*/
if (qp->q_flag & QWCLOSE) {
mutex_exit(SQLOCK(sq));
rwnext_exit(qp);
return (EINVAL);
}
/*
* Wait until we can enter the inner perimeter.
*/
sq->sq_flags = flags | SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
flags = sq->sq_flags;
}
if (isread == 0 && stp->sd_struiowrq == NULL ||
isread == 1 && stp->sd_struiordq == NULL) {
/*
* Stream plumbing changed while waiting for inner perimeter
* so just return EINVAL and let the caller handle it.
*/
mutex_exit(SQLOCK(sq));
rwnext_exit(qp);
return (EINVAL);
}
if (!(flags & SQ_CIPUT))
sq->sq_flags = flags | SQ_EXCL;
sq->sq_count = count + 1;
ASSERT(sq->sq_count != 0); /* Wraparound */
/*
* Note: The only message ordering guarantee that rwnext() makes is
* for the write queue flow-control case. All others (r/w queue
* with q_count > 0 (or q_first != 0)) are the resposibilty of
* the queue's rw procedure. This could be genralized here buy
* running the queue's service procedure, but that wouldn't be
* the most efficent for all cases.
*/
mutex_exit(SQLOCK(sq));
if (! isread && (qp->q_flag & QFULL)) {
/*
* Write queue may be flow controlled. If so,
* mark the queue for wakeup when it's not.
*/
mutex_enter(QLOCK(qp));
if (qp->q_flag & QFULL) {
qp->q_flag |= QWANTWSYNC;
mutex_exit(QLOCK(qp));
rval = EWOULDBLOCK;
goto out;
}
mutex_exit(QLOCK(qp));
}
if (! isread && dp->d_mp)
STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
dp->d_mp->b_datap->db_base);
rval = (*proc)(qp, dp);
if (isread && dp->d_mp)
STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
out:
/*
* The queue is protected from being freed by sq_count, so it is
* safe to call rwnext_exit and reacquire SQLOCK(sq).
*/
rwnext_exit(qp);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
ASSERT(sq->sq_count != 0);
sq->sq_count--;
if (flags & SQ_TAIL) {
putnext_tail(sq, qp, flags);
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
ASSERT(flags & SQ_TAIL);
return (rval);
}
ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
sq->sq_flags = flags & ~SQ_EXCL;
if (sq->sq_flags & SQ_WANTWAKEUP) {
sq->sq_flags &= ~SQ_WANTWAKEUP;
cv_broadcast(&sq->sq_wait);
}
mutex_exit(SQLOCK(sq));
return (rval);
}
/*
* The purpose of infonext() is to call the info procedure of the next
* (downstream) modules queue.
*
* treated as put entrypoint for perimeter syncronization.
*
* There's no need to grab sq_putlocks here (which only exist for CIPUT
* sync queues). If it is CIPUT sync queue regular sq_count is incremented and
* it does not matter if any regular put entrypoints have been already
* entered.
*/
int
infonext(queue_t *qp, infod_t *idp)
{
queue_t *nqp;
syncq_t *sq;
uint16_t count;
uint16_t flags;
struct qinit *qi;
int (*proc)();
struct stdata *stp;
int rval;
stp = STREAM(qp);
/*
* Prevent q_next from changing by holding sd_lock until
* acquiring SQLOCK.
*/
mutex_enter(&stp->sd_lock);
if ((nqp = _WR(qp)) == qp) {
qp = nqp->q_next;
} else {
qp = _RD(nqp->q_next);
}
qi = qp->q_qinfo;
if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
mutex_exit(&stp->sd_lock);
return (EINVAL);
}
sq = qp->q_syncq;
mutex_enter(SQLOCK(sq));
mutex_exit(&stp->sd_lock);
count = sq->sq_count;
flags = sq->sq_flags;
ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
/*
* Wait until we can enter the inner perimeter.
*/
sq->sq_flags = flags | SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
flags = sq->sq_flags;
}
if (! (flags & SQ_CIPUT))
sq->sq_flags = flags | SQ_EXCL;
sq->sq_count = count + 1;
ASSERT(sq->sq_count != 0); /* Wraparound */
mutex_exit(SQLOCK(sq));
rval = (*proc)(qp, idp);
mutex_enter(SQLOCK(sq));
flags = sq->sq_flags;
ASSERT(sq->sq_count != 0);
sq->sq_count--;
if (flags & SQ_TAIL) {
putnext_tail(sq, qp, flags);
/*
* The only purpose of this ASSERT is to preserve calling stack
* in DEBUG kernel.
*/
ASSERT(flags & SQ_TAIL);
return (rval);
}
ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
/*
* XXXX
* I am not certain the next comment is correct here. I need to consider
* why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
* might cause other problems. It just might be safer to drop it if
* !SQ_CIPUT because that is when we set it.
*/
/*
* Safe to always drop SQ_EXCL:
* Not SQ_CIPUT means we set SQ_EXCL above
* For SQ_CIPUT SQ_EXCL will only be set if the put procedure
* did a qwriter(INNER) in which case nobody else
* is in the inner perimeter and we are exiting.
*
* I would like to make the following assertion:
*
* ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
* sq->sq_count == 0);
*
* which indicates that if we are both putshared and exclusive,
* we became exclusive while executing the putproc, and the only
* claim on the syncq was the one we dropped a few lines above.
* But other threads that enter putnext while the syncq is exclusive
* need to make a claim as they may need to drop SQLOCK in the
* has_writers case to avoid deadlocks. If these threads are
* delayed or preempted, it is possible that the writer thread can
* find out that there are other claims making the (sq_count == 0)
* test invalid.
*/
sq->sq_flags = flags & ~SQ_EXCL;
mutex_exit(SQLOCK(sq));
return (rval);
}
/*
* Return nonzero if the queue is responsible for struio(), else return 0.
*/
int
isuioq(queue_t *q)
{
if (q->q_flag & QREADR)
return (STREAM(q)->sd_struiordq == q);
else
return (STREAM(q)->sd_struiowrq == q);
}
#if defined(__sparc)
int disable_putlocks = 0;
#else
int disable_putlocks = 1;
#endif
/*
* called by create_putlock.
*/
static void
create_syncq_putlocks(queue_t *q)
{
syncq_t *sq = q->q_syncq;
ciputctrl_t *cip;
int i;
ASSERT(sq != NULL);
ASSERT(disable_putlocks == 0);
ASSERT(n_ciputctrl >= min_n_ciputctrl);
ASSERT(ciputctrl_cache != NULL);
if (!(sq->sq_type & SQ_CIPUT))
return;
for (i = 0; i <= 1; i++) {
if (sq->sq_ciputctrl == NULL) {
cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
mutex_enter(SQLOCK(sq));
if (sq->sq_ciputctrl != NULL) {
mutex_exit(SQLOCK(sq));
kmem_cache_free(ciputctrl_cache, cip);
} else {
ASSERT(sq->sq_nciputctrl == 0);
sq->sq_nciputctrl = n_ciputctrl - 1;
/*
* putnext checks sq_ciputctrl without holding
* SQLOCK. if it is not NULL putnext assumes
* sq_nciputctrl is initialized. membar below
* insures that.
*/
membar_producer();
sq->sq_ciputctrl = cip;
mutex_exit(SQLOCK(sq));
}
}
ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
if (i == 1)
break;
q = _OTHERQ(q);
if (!(q->q_flag & QPERQ)) {
ASSERT(sq == q->q_syncq);
break;
}
ASSERT(q->q_syncq != NULL);
ASSERT(sq != q->q_syncq);
sq = q->q_syncq;
ASSERT(sq->sq_type & SQ_CIPUT);
}
}
/*
* If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
* syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
* the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
* starting from q and down to the driver.
*
* This should be called after the affected queues are part of stream
* geometry. It should be called from driver/module open routine after
* qprocson() call. It is also called from nfs syscall where it is known that
* stream is configured and won't change its geometry during create_putlock
* call.
*
* caller normally uses 0 value for the stream argument to speed up MT putnext
* into the perimeter of q for example because its perimeter is per module
* (e.g. IP).
*
* caller normally uses non 0 value for the stream argument to hint the system
* that the stream of q is a very contended global system stream
* (e.g. NFS/UDP) and the part of the stream from q to the driver is
* particularly MT hot.
*
* Caller insures stream plumbing won't happen while we are here and therefore
* q_next can be safely used.
*/
void
create_putlocks(queue_t *q, int stream)
{
ciputctrl_t *cip;
struct stdata *stp = STREAM(q);
q = _WR(q);
ASSERT(stp != NULL);
if (disable_putlocks != 0)
return;
if (n_ciputctrl < min_n_ciputctrl)
return;
ASSERT(ciputctrl_cache != NULL);
if (stream != 0 && stp->sd_ciputctrl == NULL) {
cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
mutex_enter(&stp->sd_lock);
if (stp->sd_ciputctrl != NULL) {
mutex_exit(&stp->sd_lock);
kmem_cache_free(ciputctrl_cache, cip);
} else {
ASSERT(stp->sd_nciputctrl == 0);
stp->sd_nciputctrl = n_ciputctrl - 1;
/*
* putnext checks sd_ciputctrl without holding
* sd_lock. if it is not NULL putnext assumes
* sd_nciputctrl is initialized. membar below
* insures that.
*/
membar_producer();
stp->sd_ciputctrl = cip;
mutex_exit(&stp->sd_lock);
}
}
ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
while (_SAMESTR(q)) {
create_syncq_putlocks(q);
if (stream == 0)
return;
q = q->q_next;
}
ASSERT(q != NULL);
create_syncq_putlocks(q);
}
/*
* STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
* through a stream.
*
* Data currently record per event is a hrtime stamp, queue address, event
* type, and a per type datum. Much of the STREAMS framework is instrumented
* for automatic flow tracing (when enabled). Events can be defined and used
* by STREAMS modules and drivers.
*
* Global objects:
*
* str_ftevent() - Add a flow-trace event to a dblk.
* str_ftfree() - Free flow-trace data
*
* Local objects:
*
* fthdr_cache - pointer to the kmem cache for trace header.
* ftblk_cache - pointer to the kmem cache for trace data blocks.
*/
int str_ftnever = 1; /* Don't do STREAMS flow tracing */
void
str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
{
ftblk_t *bp = hp->tail;
ftblk_t *nbp;
ftevnt_t *ep;
int ix, nix;
ASSERT(hp != NULL);
for (;;) {
if ((ix = bp->ix) == FTBLK_EVNTS) {
/*
* Tail doesn't have room, so need a new tail.
*
* To make this MT safe, first, allocate a new
* ftblk, and initialize it. To make life a
* little easier, reserve the first slot (mostly
* by making ix = 1). When we are finished with
* the initialization, CAS this pointer to the
* tail. If this succeeds, this is the new
* "next" block. Otherwise, another thread
* got here first, so free the block and start
* again.
*/
if (!(nbp = kmem_cache_alloc(ftblk_cache,
KM_NOSLEEP))) {
/* no mem, so punt */
str_ftnever++;
/* free up all flow data? */
return;
}
nbp->nxt = NULL;
nbp->ix = 1;
/*
* Just in case there is another thread about
* to get the next index, we need to make sure
* the value is there for it.
*/
membar_producer();
if (casptr(&hp->tail, bp, nbp) == bp) {
/* CAS was successful */
bp->nxt = nbp;
membar_producer();
bp = nbp;
ix = 0;
goto cas_good;
} else {
kmem_cache_free(ftblk_cache, nbp);
bp = hp->tail;
continue;
}
}
nix = ix + 1;
if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
cas_good:
if (curthread != hp->thread) {
hp->thread = curthread;
evnt |= FTEV_CS;
}
if (CPU->cpu_seqid != hp->cpu_seqid) {
hp->cpu_seqid = CPU->cpu_seqid;
evnt |= FTEV_PS;
}
ep = &bp->ev[ix];
break;
}
}
if (evnt & FTEV_QMASK) {
queue_t *qp = p;
/*
* It is possible that the module info is broke
* (as is logsubr.c at this comment writing).
* Instead of panicing or doing other unmentionables,
* we shall put a dummy name as the mid, and continue.
*/
if (qp->q_qinfo == NULL)
ep->mid = "NONAME";
else
ep->mid = qp->q_qinfo->qi_minfo->mi_idname;
if (!(qp->q_flag & QREADR))
evnt |= FTEV_ISWR;
} else {
ep->mid = (char *)p;
}
ep->ts = gethrtime();
ep->evnt = evnt;
ep->data = data;
hp->hash = (hp->hash << 9) + hp->hash;
hp->hash += (evnt << 16) | data;
hp->hash += (uintptr_t)ep->mid;
}
/*
* Free flow-trace data.
*/
void
str_ftfree(dblk_t *dbp)
{
fthdr_t *hp = dbp->db_fthdr;
ftblk_t *bp = &hp->first;
ftblk_t *nbp;
if (bp != hp->tail || bp->ix != 0) {
/*
* Clear out the hash, have the tail point to itself, and free
* any continuation blocks.
*/
bp = hp->first.nxt;
hp->tail = &hp->first;
hp->hash = 0;
hp->first.nxt = NULL;
hp->first.ix = 0;
while (bp != NULL) {
nbp = bp->nxt;
kmem_cache_free(ftblk_cache, bp);
bp = nbp;
}
}
kmem_cache_free(fthdr_cache, hp);
dbp->db_fthdr = NULL;
}