/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2004 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
/*
* STREAMS Buffering module
*
* This streams module collects incoming messages from modules below
* it on the stream and buffers them up into a smaller number of
* aggregated messages. Its main purpose is to reduce overhead by
* cutting down on the number of read (or getmsg) calls its client
* user process makes.
* - only M_DATA is buffered.
* - multithreading assumes configured as D_MTQPAIR
* - packets are lost only if flag SB_NO_HEADER is clear and buffer
* allocation fails.
* - in order message transmission. This is enforced for messages other
* than high priority messages.
* - zero length messages on the read side are not passed up the
* stream but used internally for synchronization.
* FLAGS:
* - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
* (conversion is the default for backwards compatibility
* hence the negative logic).
* - SB_NO_HEADER - no headers in buffered data.
* (adding headers is the default for backwards compatibility
* hence the negative logic).
* - SB_DEFER_CHUNK - provides improved response time in question-answer
* applications. Buffering is not enabled until the second message
* is received on the read side within the sb_ticks interval.
* This option will often be used in combination with flag SB_SEND_ON_WRITE.
* - SB_SEND_ON_WRITE - a write message results in any pending buffered read
* data being immediately sent upstream.
* - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
* the blocked flow condition downstream. If this flag is clear (default)
* messages will be dropped if the upstream flow is blocked.
*/
#include <sys/types.h>
#include <sys/errno.h>
#include <sys/debug.h>
#include <sys/stropts.h>
#include <sys/time.h>
#include <sys/stream.h>
#include <sys/conf.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>
#include <sys/kmem.h>
#include <sys/strsun.h>
#include <sys/bufmod.h>
#include <sys/modctl.h>
#include <sys/isa_defs.h>
/*
* Per-Stream state information.
*
* If sb_ticks is negative, we don't deliver chunks until they're
* full. If it's zero, we deliver every packet as it arrives. (In
* this case we force sb_chunk to zero, to make the implementation
* easier.) Otherwise, sb_ticks gives the number of ticks in a
* buffering interval. The interval begins when the a read side data
* message is received and a timeout is not active. If sb_snap is
* zero, no truncation of the msg is done.
*/
struct sb {
queue_t *sb_rq; /* our rq */
mblk_t *sb_mp; /* partial chunk */
mblk_t *sb_head; /* pre-allocated space for the next header */
mblk_t *sb_tail; /* first mblk of last message appended */
uint_t sb_mlen; /* sb_mp length */
uint_t sb_mcount; /* input msg count in sb_mp */
uint_t sb_chunk; /* max chunk size */
clock_t sb_ticks; /* timeout interval */
timeout_id_t sb_timeoutid; /* qtimeout() id */
uint_t sb_drops; /* cumulative # discarded msgs */
uint_t sb_snap; /* snapshot length */
uint_t sb_flags; /* flags field */
uint_t sb_state; /* state variable */
};
/*
* Function prototypes.
*/
static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
static int sbclose(queue_t *, int, cred_t *);
static void sbwput(queue_t *, mblk_t *);
static void sbrput(queue_t *, mblk_t *);
static void sbrsrv(queue_t *);
static void sbioctl(queue_t *, mblk_t *);
static void sbaddmsg(queue_t *, mblk_t *);
static void sbtick(void *);
static void sbclosechunk(struct sb *);
static void sbsendit(queue_t *, mblk_t *);
static struct module_info sb_minfo = {
21, /* mi_idnum */
"bufmod", /* mi_idname */
0, /* mi_minpsz */
INFPSZ, /* mi_maxpsz */
1, /* mi_hiwat */
0 /* mi_lowat */
};
static struct qinit sb_rinit = {
(int (*)())sbrput, /* qi_putp */
(int (*)())sbrsrv, /* qi_srvp */
sbopen, /* qi_qopen */
sbclose, /* qi_qclose */
NULL, /* qi_qadmin */
&sb_minfo, /* qi_minfo */
NULL /* qi_mstat */
};
static struct qinit sb_winit = {
(int (*)())sbwput, /* qi_putp */
NULL, /* qi_srvp */
NULL, /* qi_qopen */
NULL, /* qi_qclose */
NULL, /* qi_qadmin */
&sb_minfo, /* qi_minfo */
NULL /* qi_mstat */
};
static struct streamtab sb_info = {
&sb_rinit, /* st_rdinit */
&sb_winit, /* st_wrinit */
NULL, /* st_muxrinit */
NULL /* st_muxwinit */
};
/*
* This is the loadable module wrapper.
*/
static struct fmodsw fsw = {
"bufmod",
&sb_info,
D_MTQPAIR | D_MP
};
/*
* Module linkage information for the kernel.
*/
static struct modlstrmod modlstrmod = {
&mod_strmodops, "streams buffer mod", &fsw
};
static struct modlinkage modlinkage = {
MODREV_1, &modlstrmod, NULL
};
int
_init(void)
{
return (mod_install(&modlinkage));
}
int
_fini(void)
{
return (mod_remove(&modlinkage));
}
int
_info(struct modinfo *modinfop)
{
return (mod_info(&modlinkage, modinfop));
}
/* ARGSUSED */
static int
sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
{
struct sb *sbp;
ASSERT(rq);
if (sflag != MODOPEN)
return (EINVAL);
if (rq->q_ptr)
return (0);
/*
* Allocate and initialize per-Stream structure.
*/
sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
sbp->sb_rq = rq;
sbp->sb_ticks = -1;
sbp->sb_chunk = SB_DFLT_CHUNK;
sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
sbp->sb_mlen = 0;
sbp->sb_mcount = 0;
sbp->sb_timeoutid = 0;
sbp->sb_drops = 0;
sbp->sb_snap = 0;
sbp->sb_flags = 0;
sbp->sb_state = 0;
rq->q_ptr = WR(rq)->q_ptr = sbp;
qprocson(rq);
return (0);
}
/* ARGSUSED1 */
static int
sbclose(queue_t *rq, int flag, cred_t *credp)
{
struct sb *sbp = (struct sb *)rq->q_ptr;
ASSERT(sbp);
qprocsoff(rq);
/*
* Cancel an outstanding timeout
*/
if (sbp->sb_timeoutid != 0) {
(void) quntimeout(rq, sbp->sb_timeoutid);
sbp->sb_timeoutid = 0;
}
/*
* Free the current chunk.
*/
if (sbp->sb_mp) {
freemsg(sbp->sb_mp);
sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
sbp->sb_mlen = 0;
}
/*
* Free the per-Stream structure.
*/
kmem_free((caddr_t)sbp, sizeof (struct sb));
rq->q_ptr = WR(rq)->q_ptr = NULL;
return (0);
}
/*
* the correction factor is introduced to compensate for
* whatever assumptions the modules below have made about
* how much traffic is flowing through the stream and the fact
* that bufmod may be snipping messages with the sb_snap length.
*/
#define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
#define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
static void
sbioc(queue_t *wq, mblk_t *mp)
{
struct iocblk *iocp;
struct sb *sbp = (struct sb *)wq->q_ptr;
clock_t ticks;
mblk_t *mop;
iocp = (struct iocblk *)mp->b_rptr;
switch (iocp->ioc_cmd) {
case SBIOCGCHUNK:
case SBIOCGSNAP:
case SBIOCGFLAGS:
case SBIOCGTIME:
miocack(wq, mp, 0, 0);
return;
case SBIOCSTIME:
#ifdef _SYSCALL32_IMPL
if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
struct timeval32 *t32;
t32 = (struct timeval32 *)mp->b_cont->b_rptr;
if (t32->tv_sec < 0 || t32->tv_usec < 0) {
miocnak(wq, mp, 0, EINVAL);
break;
}
ticks = TIMEVAL_TO_TICK(t32);
} else
#endif /* _SYSCALL32_IMPL */
{
struct timeval *tb;
tb = (struct timeval *)mp->b_cont->b_rptr;
if (tb->tv_sec < 0 || tb->tv_usec < 0) {
miocnak(wq, mp, 0, EINVAL);
break;
}
ticks = TIMEVAL_TO_TICK(tb);
}
sbp->sb_ticks = ticks;
if (ticks == 0)
sbp->sb_chunk = 0;
miocack(wq, mp, 0, 0);
sbclosechunk(sbp);
return;
case SBIOCSCHUNK:
/*
* set up hi/lo water marks on stream head read queue.
* unlikely to run out of resources. Fix at later date.
*/
if ((mop = allocb(sizeof (struct stroptions),
BPRI_MED)) != NULL) {
struct stroptions *sop;
uint_t chunk;
chunk = *(uint_t *)mp->b_cont->b_rptr;
mop->b_datap->db_type = M_SETOPTS;
mop->b_wptr += sizeof (struct stroptions);
sop = (struct stroptions *)mop->b_rptr;
sop->so_flags = SO_HIWAT | SO_LOWAT;
sop->so_hiwat = SNIT_HIWAT(chunk, 1);
sop->so_lowat = SNIT_LOWAT(chunk, 1);
qreply(wq, mop);
}
sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
sbclosechunk(sbp);
return;
case SBIOCSFLAGS:
sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
return;
case SBIOCSSNAP:
/*
* if chunking dont worry about effects of
* snipping of message size on head flow control
* since it has a relatively small bearing on the
* data rate onto the streamn head.
*/
if (!sbp->sb_chunk) {
/*
* set up hi/lo water marks on stream head read queue.
* unlikely to run out of resources. Fix at later date.
*/
if ((mop = allocb(sizeof (struct stroptions),
BPRI_MED)) != NULL) {
struct stroptions *sop;
uint_t snap;
int fudge;
snap = *(uint_t *)mp->b_cont->b_rptr;
mop->b_datap->db_type = M_SETOPTS;
mop->b_wptr += sizeof (struct stroptions);
sop = (struct stroptions *)mop->b_rptr;
sop->so_flags = SO_HIWAT | SO_LOWAT;
fudge = snap <= 100 ? 4 :
snap <= 400 ? 2 :
1;
sop->so_hiwat = SNIT_HIWAT(snap, fudge);
sop->so_lowat = SNIT_LOWAT(snap, fudge);
qreply(wq, mop);
}
}
sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
return;
default:
ASSERT(0);
return;
}
}
/*
* Write-side put procedure. Its main task is to detect ioctls
* for manipulating the buffering state and hand them to sbioctl.
* Other message types are passed on through.
*/
static void
sbwput(queue_t *wq, mblk_t *mp)
{
struct sb *sbp = (struct sb *)wq->q_ptr;
struct copyresp *resp;
if (sbp->sb_flags & SB_SEND_ON_WRITE)
sbclosechunk(sbp);
switch (mp->b_datap->db_type) {
case M_IOCTL:
sbioctl(wq, mp);
break;
case M_IOCDATA:
resp = (struct copyresp *)mp->b_rptr;
if (resp->cp_rval) {
/*
* Just free message on failure.
*/
freemsg(mp);
break;
}
switch (resp->cp_cmd) {
case SBIOCSTIME:
case SBIOCSCHUNK:
case SBIOCSFLAGS:
case SBIOCSSNAP:
case SBIOCGTIME:
case SBIOCGCHUNK:
case SBIOCGSNAP:
case SBIOCGFLAGS:
sbioc(wq, mp);
break;
default:
putnext(wq, mp);
break;
}
break;
default:
putnext(wq, mp);
break;
}
}
/*
* Read-side put procedure. It's responsible for buffering up incoming
* messages and grouping them into aggregates according to the current
* buffering parameters.
*/
static void
sbrput(queue_t *rq, mblk_t *mp)
{
struct sb *sbp = (struct sb *)rq->q_ptr;
ASSERT(sbp);
switch (mp->b_datap->db_type) {
case M_PROTO:
if (sbp->sb_flags & SB_NO_PROTO_CVT) {
sbclosechunk(sbp);
sbsendit(rq, mp);
break;
} else {
/*
* Convert M_PROTO to M_DATA.
*/
mp->b_datap->db_type = M_DATA;
}
/* FALLTHRU */
case M_DATA:
if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
!(sbp->sb_state & SB_FRCVD)) {
sbclosechunk(sbp);
sbsendit(rq, mp);
sbp->sb_state |= SB_FRCVD;
} else
sbaddmsg(rq, mp);
if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
sbp, sbp->sb_ticks);
break;
case M_FLUSH:
if (*mp->b_rptr & FLUSHR) {
/*
* Reset timeout, flush the chunk currently in
* progress, and start a new chunk.
*/
if (sbp->sb_timeoutid) {
(void) quntimeout(sbp->sb_rq,
sbp->sb_timeoutid);
sbp->sb_timeoutid = 0;
}
if (sbp->sb_mp) {
freemsg(sbp->sb_mp);
sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
sbp->sb_mlen = 0;
sbp->sb_mcount = 0;
}
flushq(rq, FLUSHALL);
}
putnext(rq, mp);
break;
case M_CTL:
/*
* Zero-length M_CTL means our timeout() popped.
*/
if (MBLKL(mp) == 0) {
freemsg(mp);
sbclosechunk(sbp);
} else {
sbclosechunk(sbp);
sbsendit(rq, mp);
}
break;
default:
if (mp->b_datap->db_type <= QPCTL) {
sbclosechunk(sbp);
sbsendit(rq, mp);
} else {
/* Note: out of band */
putnext(rq, mp);
}
break;
}
}
/*
* read service procedure.
*/
/* ARGSUSED */
static void
sbrsrv(queue_t *rq)
{
mblk_t *mp;
/*
* High priority messages shouldn't get here but if
* one does, jam it through to avoid infinite loop.
*/
while ((mp = getq(rq)) != NULL) {
if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
/* should only get here if SB_NO_SROPS */
(void) putbq(rq, mp);
return;
}
putnext(rq, mp);
}
}
/*
* Handle write-side M_IOCTL messages.
*/
static void
sbioctl(queue_t *wq, mblk_t *mp)
{
struct sb *sbp = (struct sb *)wq->q_ptr;
struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
struct timeval *t;
clock_t ticks;
mblk_t *mop;
int transparent = iocp->ioc_count;
mblk_t *datamp;
int error;
switch (iocp->ioc_cmd) {
case SBIOCSTIME:
if (iocp->ioc_count == TRANSPARENT) {
#ifdef _SYSCALL32_IMPL
if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
mcopyin(mp, NULL, sizeof (struct timeval32),
NULL);
} else
#endif /* _SYSCALL32_IMPL */
{
mcopyin(mp, NULL, sizeof (*t), NULL);
}
qreply(wq, mp);
} else {
/*
* Verify argument length.
*/
#ifdef _SYSCALL32_IMPL
if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
struct timeval32 *t32;
error = miocpullup(mp,
sizeof (struct timeval32));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
t32 = (struct timeval32 *)mp->b_cont->b_rptr;
if (t32->tv_sec < 0 || t32->tv_usec < 0) {
miocnak(wq, mp, 0, EINVAL);
break;
}
ticks = TIMEVAL_TO_TICK(t32);
} else
#endif /* _SYSCALL32_IMPL */
{
error = miocpullup(mp, sizeof (struct timeval));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
t = (struct timeval *)mp->b_cont->b_rptr;
if (t->tv_sec < 0 || t->tv_usec < 0) {
miocnak(wq, mp, 0, EINVAL);
break;
}
ticks = TIMEVAL_TO_TICK(t);
}
sbp->sb_ticks = ticks;
if (ticks == 0)
sbp->sb_chunk = 0;
miocack(wq, mp, 0, 0);
sbclosechunk(sbp);
}
break;
case SBIOCGTIME: {
struct timeval *t;
/*
* Verify argument length.
*/
if (transparent != TRANSPARENT) {
#ifdef _SYSCALL32_IMPL
if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
error = miocpullup(mp,
sizeof (struct timeval32));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
} else
#endif /* _SYSCALL32_IMPL */
error = miocpullup(mp, sizeof (struct timeval));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
}
/*
* If infinite timeout, return range error
* for the ioctl.
*/
if (sbp->sb_ticks < 0) {
miocnak(wq, mp, 0, ERANGE);
break;
}
#ifdef _SYSCALL32_IMPL
if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
struct timeval32 *t32;
if (transparent == TRANSPARENT) {
datamp = allocb(sizeof (*t32), BPRI_MED);
if (datamp == NULL) {
miocnak(wq, mp, 0, EAGAIN);
break;
}
mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
}
t32 = (struct timeval32 *)mp->b_cont->b_rptr;
TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
if (transparent == TRANSPARENT)
qreply(wq, mp);
else
miocack(wq, mp, sizeof (*t32), 0);
} else
#endif /* _SYSCALL32_IMPL */
{
if (transparent == TRANSPARENT) {
datamp = allocb(sizeof (*t), BPRI_MED);
if (datamp == NULL) {
miocnak(wq, mp, 0, EAGAIN);
break;
}
mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
}
t = (struct timeval *)mp->b_cont->b_rptr;
TICK_TO_TIMEVAL(sbp->sb_ticks, t);
if (transparent == TRANSPARENT)
qreply(wq, mp);
else
miocack(wq, mp, sizeof (*t), 0);
}
break;
}
case SBIOCCTIME:
sbp->sb_ticks = -1;
miocack(wq, mp, 0, 0);
break;
case SBIOCSCHUNK:
if (iocp->ioc_count == TRANSPARENT) {
mcopyin(mp, NULL, sizeof (uint_t), NULL);
qreply(wq, mp);
} else {
/*
* Verify argument length.
*/
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
/*
* set up hi/lo water marks on stream head read queue.
* unlikely to run out of resources. Fix at later date.
*/
if ((mop = allocb(sizeof (struct stroptions),
BPRI_MED)) != NULL) {
struct stroptions *sop;
uint_t chunk;
chunk = *(uint_t *)mp->b_cont->b_rptr;
mop->b_datap->db_type = M_SETOPTS;
mop->b_wptr += sizeof (struct stroptions);
sop = (struct stroptions *)mop->b_rptr;
sop->so_flags = SO_HIWAT | SO_LOWAT;
sop->so_hiwat = SNIT_HIWAT(chunk, 1);
sop->so_lowat = SNIT_LOWAT(chunk, 1);
qreply(wq, mop);
}
sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
sbclosechunk(sbp);
}
break;
case SBIOCGCHUNK:
/*
* Verify argument length.
*/
if (transparent != TRANSPARENT) {
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
}
if (transparent == TRANSPARENT) {
datamp = allocb(sizeof (uint_t), BPRI_MED);
if (datamp == NULL) {
miocnak(wq, mp, 0, EAGAIN);
break;
}
mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
}
*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
if (transparent == TRANSPARENT)
qreply(wq, mp);
else
miocack(wq, mp, sizeof (uint_t), 0);
break;
case SBIOCSSNAP:
if (iocp->ioc_count == TRANSPARENT) {
mcopyin(mp, NULL, sizeof (uint_t), NULL);
qreply(wq, mp);
} else {
/*
* Verify argument length.
*/
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
/*
* if chunking dont worry about effects of
* snipping of message size on head flow control
* since it has a relatively small bearing on the
* data rate onto the streamn head.
*/
if (!sbp->sb_chunk) {
/*
* set up hi/lo water marks on stream
* head read queue. unlikely to run out
* of resources. Fix at later date.
*/
if ((mop = allocb(sizeof (struct stroptions),
BPRI_MED)) != NULL) {
struct stroptions *sop;
uint_t snap;
int fudge;
snap = *(uint_t *)mp->b_cont->b_rptr;
mop->b_datap->db_type = M_SETOPTS;
mop->b_wptr += sizeof (*sop);
sop = (struct stroptions *)mop->b_rptr;
sop->so_flags = SO_HIWAT | SO_LOWAT;
fudge = (snap <= 100) ? 4 :
(snap <= 400) ? 2 : 1;
sop->so_hiwat = SNIT_HIWAT(snap, fudge);
sop->so_lowat = SNIT_LOWAT(snap, fudge);
qreply(wq, mop);
}
}
sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
}
break;
case SBIOCGSNAP:
/*
* Verify argument length
*/
if (transparent != TRANSPARENT) {
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
}
if (transparent == TRANSPARENT) {
datamp = allocb(sizeof (uint_t), BPRI_MED);
if (datamp == NULL) {
miocnak(wq, mp, 0, EAGAIN);
break;
}
mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
}
*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
if (transparent == TRANSPARENT)
qreply(wq, mp);
else
miocack(wq, mp, sizeof (uint_t), 0);
break;
case SBIOCSFLAGS:
/*
* set the flags.
*/
if (iocp->ioc_count == TRANSPARENT) {
mcopyin(mp, NULL, sizeof (uint_t), NULL);
qreply(wq, mp);
} else {
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
miocack(wq, mp, 0, 0);
}
break;
case SBIOCGFLAGS:
/*
* Verify argument length
*/
if (transparent != TRANSPARENT) {
error = miocpullup(mp, sizeof (uint_t));
if (error != 0) {
miocnak(wq, mp, 0, error);
break;
}
}
if (transparent == TRANSPARENT) {
datamp = allocb(sizeof (uint_t), BPRI_MED);
if (datamp == NULL) {
miocnak(wq, mp, 0, EAGAIN);
break;
}
mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
}
*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
if (transparent == TRANSPARENT)
qreply(wq, mp);
else
miocack(wq, mp, sizeof (uint_t), 0);
break;
default:
putnext(wq, mp);
break;
}
}
/*
* Given a length l, calculate the amount of extra storage
* required to round it up to the next multiple of the alignment a.
*/
#define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
/*
* Calculate additional amount of space required for alignment.
*/
#define Align(l) RoundUpAmt(l, sizeof (ulong_t))
/*
* Smallest possible message size when headers are enabled.
* This is used to calculate whether a chunk is nearly full.
*/
#define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
/*
* Process a read-side M_DATA message.
*
* If the currently accumulating chunk doesn't have enough room
* for the message, close off the chunk, pass it upward, and start
* a new one. Then add the message to the current chunk, taking
* account of the possibility that the message's size exceeds the
* chunk size.
*
* If headers are enabled add an sb_hdr header and trailing alignment padding.
*
* To optimise performance the total number of msgbs should be kept
* to a minimum. This is achieved by using any remaining space in message N
* for both its own padding as well as the header of message N+1 if possible.
* If there's insufficient space we allocate one message to hold this 'wrapper'.
* (there's likely to be space beyond message N, since allocb would have
* rounded up the required size to one of the dblk_sizes).
*
*/
static void
sbaddmsg(queue_t *rq, mblk_t *mp)
{
struct sb *sbp;
struct timeval t;
struct sb_hdr hp;
mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
mblk_t *last; /* last mblk of current message */
size_t wrapperlen; /* length of header + padding */
size_t origlen; /* data length before truncation */
size_t pad; /* bytes required to align header */
sbp = (struct sb *)rq->q_ptr;
origlen = msgdsize(mp);
/*
* Truncate the message.
*/
if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
(adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
else
hp.sbh_totlen = hp.sbh_msglen = origlen;
if (sbp->sb_flags & SB_NO_HEADER) {
/*
* Would the inclusion of this message overflow the current
* chunk? If so close the chunk off and start a new one.
*/
if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
sbclosechunk(sbp);
/*
* First message too big for chunk - just send it up.
* This will always be true when we're not chunking.
*/
if (hp.sbh_totlen > sbp->sb_chunk) {
sbsendit(rq, mp);
return;
}
/*
* We now know that the msg will fit in the chunk.
* Link it onto the end of the chunk.
* Since linkb() walks the entire chain, we keep a pointer to
* the first mblk of the last msgb added and call linkb on that
* that last message, rather than performing the
* O(n) linkb() operation on the whole chain.
* sb_head isn't needed in this SB_NO_HEADER mode.
*/
if (sbp->sb_mp)
linkb(sbp->sb_tail, mp);
else
sbp->sb_mp = mp;
sbp->sb_tail = mp;
sbp->sb_mlen += hp.sbh_totlen;
sbp->sb_mcount++;
} else {
/* Timestamp must be done immediately */
uniqtime(&t);
TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
pad = Align(hp.sbh_totlen);
hp.sbh_totlen += sizeof (hp);
hp.sbh_totlen += pad;
/*
* Would the inclusion of this message overflow the current
* chunk? If so close the chunk off and start a new one.
*/
if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
sbclosechunk(sbp);
if (sbp->sb_head == NULL) {
/* Allocate leading header of new chunk */
sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
if (sbp->sb_head == NULL) {
/*
* Memory allocation failure.
* This will need to be revisited
* since using certain flag combinations
* can result in messages being dropped
* silently.
*/
freemsg(mp);
sbp->sb_drops++;
return;
}
sbp->sb_mp = sbp->sb_head;
}
/*
* Copy header into message
*/
hp.sbh_drops = sbp->sb_drops;
hp.sbh_origlen = origlen;
(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
sbp->sb_head->b_wptr += sizeof (hp);
ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
/*
* Join message to the chunk
*/
linkb(sbp->sb_head, mp);
sbp->sb_mcount++;
sbp->sb_mlen += hp.sbh_totlen;
/*
* If the first message alone is too big for the chunk close
* the chunk now.
* If the next message would immediately cause the chunk to
* overflow we may as well close the chunk now. The next
* message is certain to be at least SMALLEST_MESSAGE size.
*/
if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
sbclosechunk(sbp);
return;
}
/*
* Find space for the wrapper. The wrapper consists of:
*
* 1) Padding for this message (this is to ensure each header
* begins on an 8 byte boundary in the userland buffer).
*
* 2) Space for the next message's header, in case the next
* next message will fit in this chunk.
*
* It may be possible to append the wrapper to the last mblk
* of the message, but only if we 'own' the data. If the dblk
* has been shared through dupmsg() we mustn't alter it.
*/
wrapperlen = (sizeof (hp) + pad);
/* Is there space for the wrapper beyond the message's data ? */
for (last = mp; last->b_cont; last = last->b_cont)
;
if ((wrapperlen <= MBLKTAIL(last)) &&
(last->b_datap->db_ref == 1)) {
if (pad > 0) {
/*
* Pad with zeroes to the next pointer boundary
* (we don't want to disclose kernel data to
* users), then advance wptr.
*/
(void) memset(last->b_wptr, 0, pad);
last->b_wptr += pad;
}
/* Remember where to write the header information */
sbp->sb_head = last;
} else {
/* Have to allocate additional space for the wrapper */
wrapper = allocb(wrapperlen, BPRI_MED);
if (wrapper == NULL) {
sbclosechunk(sbp);
return;
}
if (pad > 0) {
/*
* Pad with zeroes (we don't want to disclose
* kernel data to users).
*/
(void) memset(wrapper->b_wptr, 0, pad);
wrapper->b_wptr += pad;
}
/* Link the wrapper msg onto the end of the chunk */
linkb(mp, wrapper);
/* Remember to write the next header in this wrapper */
sbp->sb_head = wrapper;
}
}
}
/*
* Called from timeout().
* Signal a timeout by passing a zero-length M_CTL msg in the read-side
* to synchronize with any active module threads (open, close, wput, rput).
*/
static void
sbtick(void *arg)
{
struct sb *sbp = arg;
queue_t *rq;
ASSERT(sbp);
rq = sbp->sb_rq;
sbp->sb_timeoutid = 0; /* timeout has fired */
if (putctl(rq, M_CTL) == 0) /* failure */
sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
}
/*
* Close off the currently accumulating chunk and pass
* it upward. Takes care of resetting timers as well.
*
* This routine is called both directly and as a result
* of the chunk timeout expiring.
*/
static void
sbclosechunk(struct sb *sbp)
{
mblk_t *mp;
queue_t *rq;
ASSERT(sbp);
if (sbp->sb_timeoutid) {
(void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
sbp->sb_timeoutid = 0;
}
mp = sbp->sb_mp;
rq = sbp->sb_rq;
/*
* If there's currently a chunk in progress, close it off
* and try to send it up.
*/
if (mp) {
sbsendit(rq, mp);
}
/*
* Clear old chunk. Ready for new msgs.
*/
sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
sbp->sb_mlen = 0;
sbp->sb_mcount = 0;
if (sbp->sb_flags & SB_DEFER_CHUNK)
sbp->sb_state &= ~SB_FRCVD;
}
static void
sbsendit(queue_t *rq, mblk_t *mp)
{
struct sb *sbp = (struct sb *)rq->q_ptr;
if (!canputnext(rq)) {
if (sbp->sb_flags & SB_NO_DROPS)
(void) putq(rq, mp);
else {
freemsg(mp);
sbp->sb_drops += sbp->sb_mcount;
}
return;
}
/*
* If there are messages on the q already, keep
* queueing them since they need to be processed in order.
*/
if (qsize(rq) > 0) {
/* should only get here if SB_NO_DROPS */
(void) putq(rq, mp);
}
else
putnext(rq, mp);
}