squeue.c revision 4c06cebaab828bdb4d928a3a8ad3baf973b3e1a3
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2007 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
/*
*
* This is a general purpose high-performance serialization mechanism. It is
* similar to a taskq with a single worker thread, the difference is that it
* does not imply a context switch - the thread placing a request may actually
* process it. It is also biased for processing requests in interrupt context.
*
* Each squeue has a worker thread which may optionally be bound to a CPU.
*
* Only one thread may process requests from a given squeue at any time. This is
* called "entering" squeue.
*
* Each dispatched request is processed either by
*
* a) Dispatching thread or
* b) Some other thread that is currently processing squeue at the time of
* request or
* c) worker thread.
*
* INTERFACES:
*
* squeue_t *squeue_create(name, bind, wait, pri)
*
* name: symbolic name for squeue.
* wait: time to wait before waiking the worker thread after queueing
* request.
* bind: preferred CPU binding for the worker thread.
* pri: thread priority for the worker thread.
*
* This function never fails and may sleep. It returns a transparent pointer
* to the squeue_t structure that is passed to all other squeue operations.
*
* void squeue_bind(sqp, bind)
*
* Bind squeue worker thread to a CPU specified by the 'bind' argument. The
* 'bind' value of -1 binds to the preferred thread specified for
* squeue_create.
*
* NOTE: Any value of 'bind' other then -1 is not supported currently, but the
* API is present - in the future it may be useful to specify different
* binding.
*
* void squeue_unbind(sqp)
*
* Unbind the worker thread from its preferred CPU.
*
* void squeue_enter(*sqp, *mp, proc, arg, tag)
*
* Post a single request for processing. Each request consists of mblock 'mp',
* function 'proc' to execute and an argument 'arg' to pass to this
* function. The function is called as (*proc)(arg, mp, sqp); The tag is an
* arbitrary number from 0 to 255 which will be stored in mp to track exact
* caller of squeue_enter. The combination of function name and the tag should
* provide enough information to identify the caller.
*
* If no one is processing the squeue, squeue_enter() will call the function
* immediately. Otherwise it will add the request to the queue for later
* processing. Once the function is executed, the thread may continue
* executing all other requests pending on the queue.
*
* NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
* NOTE: The argument can be conn_t only. Ideally we'd like to have generic
* argument, but we want to drop connection reference count here - this
* improves tail-call optimizations.
* XXX: The arg should have type conn_t.
*
* void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
*
* Same as squeue_enter(), but the entering thread will only try to execute a
* single request. It will not continue executing any pending requests.
*
* void squeue_fill(*sqp, *mp, proc, arg, tag)
*
* Just place the request on the queue without trying to execute it. Arrange
* for the worker thread to process the request.
*
* void squeue_profile_enable(sqp)
* void squeue_profile_disable(sqp)
*
* Enable or disable profiling for specified 'sqp'. Profiling is only
* available when SQUEUE_PROFILE is set.
*
* void squeue_profile_reset(sqp)
*
* Reset all profiling information to zero. Profiling is only
* available when SQUEUE_PROFILE is set.
*
* void squeue_profile_start()
* void squeue_profile_stop()
*
* Globally enable or disabled profiling for all squeues.
*
* uintptr_t *squeue_getprivate(sqp, p)
*
* Each squeue keeps small amount of private data space available for various
* consumers. Current consumers include TCP and NCA. Other consumers need to
* add their private tag to the sqprivate_t enum. The private information is
* limited to an uintptr_t value. The squeue has no knowledge of its content
* and does not manage it in any way.
*
* The typical use may be a breakdown of data structures per CPU (since
* squeues are usually per CPU). See NCA for examples of use.
* Currently 'p' may have one legal value SQPRIVATE_TCP.
*
* processorid_t squeue_binding(sqp)
*
* Returns the CPU binding for a given squeue.
*
* TUNABALES:
*
* squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
* squeue. Note that this is approximation - squeues have no control on the
* time it takes to process each request. This limit is only checked
* between processing individual messages.
* Default: 20 ms.
*
* squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
* squeue. Note that this is approximation - squeues have no control on the
* time it takes to process each request. This limit is only checked
* between processing individual messages.
* Default: 10 ms.
*
* squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
* squeue. Note that this is approximation - squeues have no control on the
* time it takes to process each request. This limit is only checked
* between processing individual messages.
* Default: 10 ms.
*
* squeue_workerwait_ms: When worker thread is interrupted because workerdrain
* expired, how much time to wait before waking worker thread again.
* Default: 10 ms.
*/
#include <sys/condvar_impl.h>
#include <inet/ipclassifier.h>
#include <inet/udp_impl.h>
/*
* State flags.
* Note: The MDB IP module depends on the values of these flags.
*/
#include <sys/squeue_impl.h>
static void squeue_fire(void *);
#if SQUEUE_PROFILE
static kmutex_t squeue_kstat_lock;
static int squeue_kstat_update(kstat_t *, int);
#endif
#define SQUEUE_MSEC_TO_NSEC 1000000
int squeue_intrdrain_ms = 20;
int squeue_writerdrain_ms = 10;
int squeue_workerdrain_ms = 10;
int squeue_workerwait_ms = 10;
/* The values above converted to ticks or nano seconds */
static int squeue_intrdrain_ns = 0;
static int squeue_writerdrain_ns = 0;
static int squeue_workerdrain_ns = 0;
static int squeue_workerwait_tick = 0;
/*
* The minimum packet queued when worker thread doing the drain triggers
* polling (if squeue allows it). The choice of 3 is arbitrary. You
* definitely don't want it to be 1 since that will trigger polling
* on very low loads as well (ssh seems to do be one such example
* where packet flow was very low yet somehow 1 packet ended up getting
* queued and worker thread fires every 10ms and blanking also gets
* triggered.
*/
int squeue_worker_poll_min = 3;
#if SQUEUE_PROFILE
/*
* Set to B_TRUE to enable profiling.
*/
static int squeue_profile = B_FALSE;
struct squeue_kstat {
} squeue_kstat = {
{ "count", KSTAT_DATA_UINT64 },
{ "max_qlen", KSTAT_DATA_UINT64 },
{ "packets_worker", KSTAT_DATA_UINT64 },
{ "packets_intr", KSTAT_DATA_UINT64 },
{ "packets_other", KSTAT_DATA_UINT64 },
{ "queued_intr", KSTAT_DATA_UINT64 },
{ "queued_other", KSTAT_DATA_UINT64 },
{ "ndrains_worker", KSTAT_DATA_UINT64 },
{ "ndrains_intr", KSTAT_DATA_UINT64 },
{ "ndrains_other", KSTAT_DATA_UINT64 },
{ "time_worker", KSTAT_DATA_UINT64 },
{ "time_intr", KSTAT_DATA_UINT64 },
{ "time_other", KSTAT_DATA_UINT64 },
};
#endif
#define SQUEUE_WORKER_WAKEUP(sqp) { \
\
/* \
* Queue isn't being processed, so take \
* any post enqueue actions needed before leaving. \
*/ \
if (tid != 0) { \
/* \
* Waiting for an enter() to process mblk(s). \
*/ \
\
/* \
* Times up and have a worker thread \
* waiting for work, so schedule it. \
*/ \
return; \
} \
return; \
return; \
/* \
* Wait up to sqp->sq_wait ms for an \
* enter() to process this queue. We \
* don't want to contend on timeout locks \
* with sq_lock held for performance reasons, \
* so drop the sq_lock before calling timeout \
* but we need to check if timeout is required \
* after re acquiring the sq_lock. Once \
* the sq_lock is dropped, someone else could \
* have processed the packet or the timeout could \
* have already fired. \
*/ \
/* Check again if we still need the timeout */ \
return; \
} else { \
} else { \
/* \
* The timer fired before we could \
* reacquire the sq_lock. squeue_fire \
* removes the SQS_TMO_PROG flag \
* and we don't need to do anything \
* else. \
*/ \
} \
} \
} else { \
/* \
* Schedule the worker thread. \
*/ \
} \
}
/* \
* Enque our mblk. \
*/ \
\
else \
}
/* \
* Enqueue our mblk chain. \
*/ \
\
else \
\
}
rx_ring->rr_max_blank_time), \
rx_ring->rr_max_pkt_cnt); \
}
rx_ring->rr_min_pkt_cnt); \
}
void
squeue_init(void)
{
}
/* ARGSUSED */
squeue_t *
{
#if SQUEUE_PROFILE
"net", KSTAT_TYPE_NAMED,
sizeof (squeue_kstat) / sizeof (kstat_named_t),
KSTAT_FLAG_VIRTUAL)) != NULL) {
}
#endif
return (sqp);
}
/* ARGSUSED */
void
{
return;
}
}
void
{
return;
}
}
/*
* squeue_enter() - enter squeue sqp with mblk mp (which can be
* a chain), while tail points to the end and cnt in number of
* mblks in the chain.
*
* For a chain of single packet (i.e. mp == tail), go through the
* fast path if no one is processing the squeue and nothing is queued.
*
* The proc and arg for each mblk is already stored in the mblk in
* appropriate places.
*/
void
{
int interrupt = servicing_interrupt();
void *arg;
#if SQUEUE_PROFILE
#endif
/*
* See if anything is already queued. If we are the
* first packet, do inline processing else queue the
* packet and do the drain.
*/
/*
* Fast-path, ok to process and nothing queued.
*/
/*
* We are the chain of 1 packet so
* go through this fast path.
*/
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_DEBUG
#endif
/*
* We processed inline our packet and
* nothing new has arrived. We are done.
*/
return;
/*
* If the current thread is not running
* on the CPU to which this squeue is bound,
* then don't allow it to drain.
*/
return;
}
} else {
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
}
/*
* We are here because either we couldn't do inline
* processing (because something was already queued),
* or we had a chanin of more than one packet,
* or something else arrived after we were done with
* inline processing.
*/
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
}
#endif
#if SQUEUE_DEBUG
#endif
if (interrupt) {
} else {
}
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_DEBUG
#endif
/*
* If we didn't do a complete drain, the worker
* thread was already signalled by squeue_drain.
*/
return;
} else {
/*
* Queue is already being processed. Just enqueue
* the packet and go away.
*/
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
return;
}
}
/*
* squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
*/
void
{
int interrupt = servicing_interrupt();
#if SQUEUE_PROFILE
#endif
#if SQUEUE_DEBUG
#endif
/*
* See if anything is already queued. If we are the
* first packet, do inline processing else queue the
* packet and do the drain.
*/
/*
* Fast-path, ok to process and nothing queued.
*/
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_DEBUG
#endif
/*
* We processed inline our packet and
* nothing new has arrived. We are done.
*/
return;
/*
* If the current thread is not running
* on the CPU to which this squeue is bound,
* then don't allow it to drain.
*/
return;
}
} else {
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
}
/*
* We are here because either we couldn't do inline
* processing (because something was already queued)
* or something else arrived after we were done with
* inline processing.
*/
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
}
#endif
#if SQUEUE_DEBUG
#endif
if (interrupt) {
} else {
}
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_DEBUG
#endif
/*
* If we didn't do a complete drain, the worker
* thread was already signalled by squeue_drain.
*/
return;
} else {
/*
* We let a thread processing a squeue reenter only
* once. This helps the case of incoming connection
* where a SYN-ACK-ACK that triggers the conn_ind
* doesn't have to queue the packet if listener and
* eager are on the same squeue. Also helps the
* loopback connection where the two ends are bound
* to the same squeue (which is typical on single
* CPU machines).
* We let the thread reenter only once for the fear
* of stack getting blown with multiple traversal.
*/
return;
}
/*
* Queue is already being processed. Just enqueue
* the packet and go away.
*/
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
return;
}
}
void
{
int interrupt = servicing_interrupt();
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
#endif
/*
* Fast-path, ok to process and nothing queued.
*/
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else
}
#endif
/*
* We processed inline our packet and
* nothing new has arrived. We are done.
*/
} else {
}
return;
} else {
/*
* We let a thread processing a squeue reenter only
* once. This helps the case of incoming connection
* where a SYN-ACK-ACK that triggers the conn_ind
* doesn't have to queue the packet if listener and
* eager are on the same squeue. Also helps the
* loopback connection where the two ends are bound
* to the same squeue (which is typical on single
* CPU machines).
* We let the thread reenter only once for the fear
* of stack getting blown with multiple traversal.
*/
return;
}
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
if (being_processed) {
/*
* Queue is already being processed.
* No need to do anything.
*/
return;
}
}
}
/*
* squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
* without processing the squeue.
*/
/* ARGSUSED */
void
{
#if SQUEUE_DEBUG
#endif
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (servicing_interrupt())
else
}
#endif
/*
* If queue is already being processed. No need to do anything.
*/
return;
}
}
/*
* PRIVATE FUNCTIONS
*/
static void
squeue_fire(void *arg)
{
return;
}
/*
* The timeout fired before we got a chance to set it.
* Process it anyway but remove the SQS_TMO_PROG so that
* the guy trying to set the timeout knows that it has
* already been processed.
*/
if (state & SQS_TMO_PROG)
}
}
static void
{
int interrupt = servicing_interrupt();
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else if (!(proc_type & SQS_WORKER))
else
}
#endif
/*
* We have backlog built up. Switch to polling mode if the
* device underneath allows it. Need to do it only for
* drain by non-interrupt thread so interrupts don't
* come and disrupt us in between. If its a interrupt thread,
* no need because most devices will not issue another
* interrupt till this one returns.
*/
}
if (tid != 0)
#if SQUEUE_DEBUG
#endif
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
if (interrupt)
else if (!(proc_type & SQS_WORKER))
else
}
#endif
}
#if SQUEUE_DEBUG
#endif
/* More arrived and time not expired */
goto again;
}
/*
* If we are not worker thread and we
* reached our time limit to do drain,
* signal the worker thread to pick
* up the work.
* If we were the worker thread, then
* we take a break to allow an interrupt
* or writer to pick up the load.
*/
if (proc_type != SQS_WORKER) {
}
}
/*
* Try to see if we can get a time estimate to process a packet.
* Do it only in interrupt context since less chance of context
* switch or pinning etc. to get a better estimate.
*/
/*
* If polling was turned on, turn it off and reduce the default
* interrupt blank interval as well to bring new packets in faster
* (reduces the latency when there is no backlog).
*/
}
}
static void
{
#if SQUEUE_PROFILE
#endif
for (;;) {
goto still_wait;
}
}
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
}
#endif
ASSERT(squeue_workerdrain_ns != 0);
/*
* Doing too much processing by worker thread
* in presense of interrupts can be sub optimal.
* Instead, once a drain is done by worker thread
* for squeue_writerdrain_ns (the reason we are
* here), we force wait for squeue_workerwait_tick
* before doing more processing even if sq_wait is
* set to 0.
*
* This can be counterproductive for performance
* if worker thread is the only means to process
* the packets (interrupts or writers are not
* allowed inside the squeue).
*/
/*
* Check again if we still need
* the timeout
*/
/* timeout not needed */
}
}
}
#if SQUEUE_PROFILE
if (SQ_PROFILING(sqp)) {
}
#endif
}
}
#if SQUEUE_PROFILE
static int
{
if (rw == KSTAT_WRITE)
return (EACCES);
#if SQUEUE_DEBUG
#endif
return (0);
}
#endif
void
{
}
void
{
}
void
{
#if SQUEUE_PROFILE
#endif
}
void
squeue_profile_start(void)
{
#if SQUEUE_PROFILE
#endif
}
void
squeue_profile_stop(void)
{
#if SQUEUE_PROFILE
#endif
}
{
ASSERT(p < SQPRIVATE_MAX);
return (&sqp->sq_private[p]);
}
{
}