svc.c revision 45916cd2fec6e79bca5dee0421bd39e3c2910d1e
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2006 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/*
* Copyright 1993 OpenVision Technologies, Inc., All Rights Reserved.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
* Portions of this source code were derived from Berkeley 4.3 BSD
* under license from the Regents of the University of California.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
/*
* Server-side remote procedure call interface.
*
* Master transport handle (SVCMASTERXPRT).
* The master transport handle structure is shared among service
* threads processing events on the transport. Some fields in the
* master structure are protected by locks
* - xp_req_lock protects the request queue:
* xp_req_head, xp_req_tail
* - xp_thread_lock protects the thread (clone) counts
* xp_threads, xp_detached_threads, xp_wq
* Each master transport is registered to exactly one thread pool.
*
* Clone transport handle (SVCXPRT)
* The clone transport handle structure is a per-service-thread handle
* to the transport. The structure carries all the fields/buffers used
* for request processing. A service thread or, in other words, a clone
* structure, can be linked to an arbitrary master structure to process
* requests on this transport. The master handle keeps track of reference
* counts of threads (clones) linked to it. A service thread can switch
* to another transport by unlinking its clone handle from the current
* transport and linking to a new one. Switching is relatively inexpensive
* but it involves locking (master's xprt->xp_thread_lock).
*
* Pools.
* A pool represents a kernel RPC service (NFS, Lock Manager, etc.).
* Transports related to the service are registered to the service pool.
* Service threads can switch between different transports in the pool.
* Thus, each service has its own pool of service threads. The maximum
* number of threads in a pool is pool->p_maxthreads. This limit allows
* to restrict resource usage by the service. Some fields are protected
* by locks:
* - p_req_lock protects several counts and flags:
* p_reqs, p_walkers, p_asleep, p_drowsy, p_req_cv
* - p_thread_lock governs other thread counts:
* p_threads, p_detached_threads, p_reserved_threads, p_closing
*
* In addition, each pool contains a doubly-linked list of transports,
* an `xprt-ready' queue and a creator thread (see below). Threads in
* the pool share some other parameters such as stack size and
* polling timeout.
*
* Pools are initialized through the svc_pool_create() function called from
* the nfssys() system call. However, thread creation must be done by
* the userland agent. This is done by using SVCPOOL_WAIT and
* SVCPOOL_RUN arguments to nfssys(), which call svc_wait() and
* svc_do_run(), respectively. Once the pool has been initialized,
* the userland process must set up a 'creator' thread. This thread
* should park itself in the kernel by calling svc_wait(). If
* svc_wait() returns successfully, it should fork off a new worker
* thread, which then calls svc_do_run() in order to get work. When
* that thread is complete, svc_do_run() will return, and the user
* program should call thr_exit().
*
* When we try to register a new pool and there is an old pool with
* the same id in the doubly linked pool list (this happens when we kill
* and restart nfsd or lockd), then we unlink the old pool from the list
* and mark its state as `closing'. After that the transports can still
* process requests but new transports won't be registered. When all the
* transports and service threads associated with the pool are gone the
* creator thread (see below) will clean up the pool structure and exit.
*
* svc_queuereq() and svc_run().
* The kernel RPC server is interrupt driven. The svc_queuereq() interrupt
* routine is called to deliver an RPC request. The service threads
* loop in svc_run(). The interrupt function queues a request on the
* transport's queue and it makes sure that the request is serviced.
* It may either wake up one of sleeping threads, or ask for a new thread
* to be created, or, if the previous request is just being picked up, do
* nothing. In the last case the service thread that is picking up the
* previous request will wake up or create the next thread. After a service
* thread processes a request and sends a reply it returns to svc_run()
* and svc_run() calls svc_poll() to find new input.
*
* There is an "inconsistent" but "safe" optimization in the
* svc_queuereq() code. The request is queued under the transport's
* request lock, while the `pending-requests' count is incremented
* independently under the pool request lock. Thus, a request can be picked
* up by a service thread before the counter is incremented. It may also
* happen that the service thread will win the race condition on the pool
* lock and it will decrement the count even before the interrupt thread
* increments it (so the count can be temporarily negative).
*
* svc_poll().
* In order to avoid unnecessary locking, which causes performance
* problems, we always look for a pending request on the current transport.
* If there is none we take a hint from the pool's `xprt-ready' queue.
* If the queue had an overflow we switch to the `drain' mode checking
* each transport in the pool's transport list. Once we find a
* master transport handle with a pending request we latch the request
* lock on this transport and return to svc_run(). If the request
* belongs to a transport different than the one the service thread is
* linked to we need to unlink and link again.
*
* A service thread goes asleep when there are no pending
* requests on the transports registered on the pool's transports.
* All the pool's threads sleep on the same condition variable.
* If a thread has been sleeping for too long period of time
* (by default 5 seconds) it wakes up and exits. Also when a transport
* is closing sleeping threads wake up to unlink from this transport.
*
* The `xprt-ready' queue.
* If a service thread finds no request on a transport it is currently linked
* to it will find another transport with a pending request. To make
* this search more efficient each pool has an `xprt-ready' queue.
* The queue is a FIFO. When the interrupt routine queues a request it also
* inserts a pointer to the transport into the `xprt-ready' queue. A
* thread looking for a transport with a pending request can pop up a
* transport and check for a request. The request can be already gone
* since it could be taken by a thread linked to that transport. In such a
* case we try the next hint. The `xprt-ready' queue has fixed size (by
* default 256 nodes). If it overflows svc_poll() has to switch to the
* less efficient but safe `drain' mode and walk through the pool's
* transport list.
*
* Both the svc_poll() loop and the `xprt-ready' queue are optimized
* for the peak load case that is for the situation when the queue is not
* empty, there are all the time few pending requests, and a service
* thread which has just processed a request does not go asleep but picks
* up immediately the next request.
*
* Thread creator.
* Each pool has a thread creator associated with it. The creator thread
* sleeps on a condition variable and waits for a signal to create a
* service thread. The actual thread creation is done in userland by
* the method described in "Pools" above.
*
* Signaling threads should turn on the `creator signaled' flag, and
* can avoid sending signals when the flag is on. The flag is cleared
* when the thread is created.
*
* When the pool is in closing state (ie it has been already unregistered
* from the pool list) the last thread on the last transport in the pool
* should turn the p_creator_exit flag on. The creator thread will
* clean up the pool structure and exit.
*
* Thread reservation; Detaching service threads.
* A service thread can detach itself to block for an extended amount
* of time. However, to keep the service active we need to guarantee
* at least pool->p_redline non-detached threads that can process incoming
* requests. This, the maximum number of detached and reserved threads is
* p->p_maxthreads - p->p_redline. A service thread should first acquire
* a reservation, and if the reservation was granted it can detach itself.
* If a reservation was granted but the thread does not detach itself
* it should cancel the reservation before it returns to svc_run().
*/
#include <sys/param.h>
#include <sys/types.h>
#include <rpc/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/tiuser.h>
#include <sys/t_kuser.h>
#include <netinet/in.h>
#include <rpc/xdr.h>
#include <rpc/auth.h>
#include <rpc/clnt.h>
#include <rpc/rpc_msg.h>
#include <rpc/svc.h>
#include <sys/proc.h>
#include <sys/user.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/tihdr.h>
#include <sys/debug.h>
#include <sys/cmn_err.h>
#include <sys/file.h>
#include <sys/systm.h>
#include <sys/callb.h>
#include <sys/vtrace.h>
#include <sys/zone.h>
#include <nfs/nfs.h>
#include <sys/tsol/label_macro.h>
#define RQCRED_SIZE 400 /* this size is excessive */
/*
* Defines for svc_poll()
*/
#define SVC_EXPRTGONE ((SVCMASTERXPRT *)1) /* Transport is closing */
#define SVC_ETIMEDOUT ((SVCMASTERXPRT *)2) /* Timeout */
#define SVC_EINTR ((SVCMASTERXPRT *)3) /* Interrupted by signal */
/*
* Default stack size for service threads.
*/
#define DEFAULT_SVC_RUN_STKSIZE (0) /* default kernel stack */
int svc_default_stksize = DEFAULT_SVC_RUN_STKSIZE;
/*
* Default polling timeout for service threads.
* Multiplied by hz when used.
*/
#define DEFAULT_SVC_POLL_TIMEOUT (5) /* seconds */
clock_t svc_default_timeout = DEFAULT_SVC_POLL_TIMEOUT;
/*
* Size of the `xprt-ready' queue.
*/
#define DEFAULT_SVC_QSIZE (256) /* qnodes */
size_t svc_default_qsize = DEFAULT_SVC_QSIZE;
/*
* Default limit for the number of service threads.
*/
#define DEFAULT_SVC_MAXTHREADS (INT16_MAX)
int svc_default_maxthreads = DEFAULT_SVC_MAXTHREADS;
/*
* Maximum number of requests from the same transport (in `drain' mode).
*/
#define DEFAULT_SVC_MAX_SAME_XPRT (8)
int svc_default_max_same_xprt = DEFAULT_SVC_MAX_SAME_XPRT;
/*
* Default `Redline' of non-detached threads.
* Total number of detached and reserved threads in an RPC server
* thread pool is limited to pool->p_maxthreads - svc_redline.
*/
#define DEFAULT_SVC_REDLINE (1)
int svc_default_redline = DEFAULT_SVC_REDLINE;
/*
* A node for the `xprt-ready' queue.
* See below.
*/
struct __svcxprt_qnode {
__SVCXPRT_QNODE *q_next;
SVCMASTERXPRT *q_xprt;
};
/*
* Global SVC variables (private).
*/
struct svc_globals {
SVCPOOL *svc_pools;
kmutex_t svc_plock;
};
/*
* Debug variable to check for rdma based
* transport startup and cleanup. Contorlled
* through /etc/system. Off by default.
*/
int rdma_check = 0;
/*
* Authentication parameters list.
*/
static caddr_t rqcred_head;
static kmutex_t rqcred_lock;
/*
* Pointers to transport specific `rele' routines in rpcmod (set from rpcmod).
*/
void (*rpc_rele)(queue_t *, mblk_t *) = NULL;
void (*mir_rele)(queue_t *, mblk_t *) = NULL;
/* ARGSUSED */
void
rpc_rdma_rele(queue_t *q, mblk_t *mp)
{
}
void (*rdma_rele)(queue_t *, mblk_t *) = rpc_rdma_rele;
/*
* This macro picks which `rele' routine to use, based on the transport type.
*/
#define RELE_PROC(xprt) \
((xprt)->xp_type == T_RDMA ? rdma_rele : \
(((xprt)->xp_type == T_CLTS) ? rpc_rele : mir_rele))
/*
* If true, then keep quiet about version mismatch.
* This macro is for broadcast RPC only. We have no broadcast RPC in
* kernel now but one may define a flag in the transport structure
* and redefine this macro.
*/
#define version_keepquiet(xprt) (FALSE)
/*
* ZSD key used to retrieve zone-specific svc globals
*/
static zone_key_t svc_zone_key;
static void svc_callout_free(SVCMASTERXPRT *);
static void svc_xprt_qinit(SVCPOOL *, size_t);
static void svc_xprt_qdestroy(SVCPOOL *);
static void svc_thread_creator(SVCPOOL *);
static void svc_creator_signal(SVCPOOL *);
static void svc_creator_signalexit(SVCPOOL *);
static void svc_pool_unregister(struct svc_globals *, SVCPOOL *);
static int svc_run(SVCPOOL *);
/* ARGSUSED */
static void *
svc_zoneinit(zoneid_t zoneid)
{
struct svc_globals *svc;
svc = kmem_alloc(sizeof (*svc), KM_SLEEP);
mutex_init(&svc->svc_plock, NULL, MUTEX_DEFAULT, NULL);
svc->svc_pools = NULL;
return (svc);
}
/* ARGSUSED */
static void
svc_zoneshutdown(zoneid_t zoneid, void *arg)
{
struct svc_globals *svc = arg;
SVCPOOL *pool;
mutex_enter(&svc->svc_plock);
while ((pool = svc->svc_pools) != NULL) {
svc_pool_unregister(svc, pool);
}
mutex_exit(&svc->svc_plock);
}
/* ARGSUSED */
static void
svc_zonefini(zoneid_t zoneid, void *arg)
{
struct svc_globals *svc = arg;
ASSERT(svc->svc_pools == NULL);
mutex_destroy(&svc->svc_plock);
kmem_free(svc, sizeof (*svc));
}
/*
* Global SVC init routine.
* Initialize global generic and transport type specific structures
* used by the kernel RPC server side. This routine is called only
* once when the module is being loaded.
*/
void
svc_init()
{
zone_key_create(&svc_zone_key, svc_zoneinit, svc_zoneshutdown,
svc_zonefini);
svc_cots_init();
svc_clts_init();
}
/*
* Destroy the SVCPOOL structure.
*/
static void
svc_pool_cleanup(SVCPOOL *pool)
{
ASSERT(pool->p_threads + pool->p_detached_threads == 0);
ASSERT(pool->p_lcount == 0);
ASSERT(pool->p_closing);
/*
* Call the user supplied shutdown function. This is done
* here so the user of the pool will be able to cleanup
* service related resources.
*/
if (pool->p_shutdown != NULL)
(pool->p_shutdown)();
/* Destroy `xprt-ready' queue */
svc_xprt_qdestroy(pool);
/* Destroy transport list */
rw_destroy(&pool->p_lrwlock);
/* Destroy locks and condition variables */
mutex_destroy(&pool->p_thread_lock);
mutex_destroy(&pool->p_req_lock);
cv_destroy(&pool->p_req_cv);
/* Destroy creator's locks and condition variables */
mutex_destroy(&pool->p_creator_lock);
cv_destroy(&pool->p_creator_cv);
mutex_destroy(&pool->p_user_lock);
cv_destroy(&pool->p_user_cv);
/* Free pool structure */
kmem_free(pool, sizeof (SVCPOOL));
}
/*
* If all the transports and service threads are already gone
* signal the creator thread to clean up and exit.
*/
static bool_t
svc_pool_tryexit(SVCPOOL *pool)
{
ASSERT(MUTEX_HELD(&pool->p_thread_lock));
ASSERT(pool->p_closing);
if (pool->p_threads + pool->p_detached_threads == 0) {
rw_enter(&pool->p_lrwlock, RW_READER);
if (pool->p_lcount == 0) {
/*
* Release the locks before sending a signal.
*/
rw_exit(&pool->p_lrwlock);
mutex_exit(&pool->p_thread_lock);
/*
* Notify the creator thread to clean up and exit
*
* NOTICE: No references to the pool beyond this point!
* The pool is being destroyed.
*/
ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
svc_creator_signalexit(pool);
return (TRUE);
}
rw_exit(&pool->p_lrwlock);
}
ASSERT(MUTEX_HELD(&pool->p_thread_lock));
return (FALSE);
}
/*
* Find a pool with a given id.
*/
static SVCPOOL *
svc_pool_find(struct svc_globals *svc, int id)
{
SVCPOOL *pool;
ASSERT(MUTEX_HELD(&svc->svc_plock));
/*
* Search the list for a pool with a matching id
* and register the transport handle with that pool.
*/
for (pool = svc->svc_pools; pool; pool = pool->p_next)
if (pool->p_id == id)
return (pool);
return (NULL);
}
/*
* PSARC 2003/523 Contract Private Interface
* svc_do_run
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
int
svc_do_run(int id)
{
SVCPOOL *pool;
int err = 0;
struct svc_globals *svc;
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
mutex_enter(&svc->svc_plock);
pool = svc_pool_find(svc, id);
mutex_exit(&svc->svc_plock);
if (pool == NULL)
return (ENOENT);
/*
* Increment counter of pool threads now
* that a thread has been created.
*/
mutex_enter(&pool->p_thread_lock);
pool->p_threads++;
mutex_exit(&pool->p_thread_lock);
/* Give work to the new thread. */
err = svc_run(pool);
return (err);
}
/*
* Unregister a pool from the pool list.
* Set the closing state. If all the transports and service threads
* are already gone signal the creator thread to clean up and exit.
*/
static void
svc_pool_unregister(struct svc_globals *svc, SVCPOOL *pool)
{
SVCPOOL *next = pool->p_next;
SVCPOOL *prev = pool->p_prev;
ASSERT(MUTEX_HELD(&svc->svc_plock));
/* Remove from the list */
if (pool == svc->svc_pools)
svc->svc_pools = next;
if (next)
next->p_prev = prev;
if (prev)
prev->p_next = next;
pool->p_next = pool->p_prev = NULL;
/*
* Offline the pool. Mark the pool as closing.
* If there are no transports in this pool notify
* the creator thread to clean it up and exit.
*/
mutex_enter(&pool->p_thread_lock);
if (pool->p_offline != NULL)
(pool->p_offline)();
pool->p_closing = TRUE;
if (svc_pool_tryexit(pool))
return;
mutex_exit(&pool->p_thread_lock);
}
/*
* Register a pool with a given id in the global doubly linked pool list.
* - if there is a pool with the same id in the list then unregister it
* - insert the new pool into the list.
*/
static void
svc_pool_register(struct svc_globals *svc, SVCPOOL *pool, int id)
{
SVCPOOL *old_pool;
/*
* If there is a pool with the same id then remove it from
* the list and mark the pool as closing.
*/
mutex_enter(&svc->svc_plock);
if (old_pool = svc_pool_find(svc, id))
svc_pool_unregister(svc, old_pool);
/* Insert into the doubly linked list */
pool->p_id = id;
pool->p_next = svc->svc_pools;
pool->p_prev = NULL;
if (svc->svc_pools)
svc->svc_pools->p_prev = pool;
svc->svc_pools = pool;
mutex_exit(&svc->svc_plock);
}
/*
* Initialize a newly created pool structure
*/
static int
svc_pool_init(SVCPOOL *pool, uint_t maxthreads, uint_t redline,
uint_t qsize, uint_t timeout, uint_t stksize, uint_t max_same_xprt)
{
klwp_t *lwp = ttolwp(curthread);
ASSERT(pool);
if (maxthreads == 0)
maxthreads = svc_default_maxthreads;
if (redline == 0)
redline = svc_default_redline;
if (qsize == 0)
qsize = svc_default_qsize;
if (timeout == 0)
timeout = svc_default_timeout;
if (stksize == 0)
stksize = svc_default_stksize;
if (max_same_xprt == 0)
max_same_xprt = svc_default_max_same_xprt;
if (maxthreads < redline)
return (EINVAL);
/* Allocate and initialize the `xprt-ready' queue */
svc_xprt_qinit(pool, qsize);
/* Initialize doubly-linked xprt list */
rw_init(&pool->p_lrwlock, NULL, RW_DEFAULT, NULL);
/*
* Setting lwp_childstksz on the current lwp so that
* descendants of this lwp get the modified stacksize, if
* it is defined. It is important that either this lwp or
* one of its descendants do the actual servicepool thread
* creation to maintain the stacksize inheritance.
*/
if (lwp != NULL)
lwp->lwp_childstksz = stksize;
/* Initialize thread limits, locks and condition variables */
pool->p_maxthreads = maxthreads;
pool->p_redline = redline;
pool->p_timeout = timeout * hz;
pool->p_stksize = stksize;
pool->p_max_same_xprt = max_same_xprt;
mutex_init(&pool->p_thread_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&pool->p_req_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&pool->p_req_cv, NULL, CV_DEFAULT, NULL);
/* Initialize userland creator */
pool->p_user_exit = FALSE;
pool->p_signal_create_thread = FALSE;
pool->p_user_waiting = FALSE;
mutex_init(&pool->p_user_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&pool->p_user_cv, NULL, CV_DEFAULT, NULL);
/* Initialize the creator and start the creator thread */
pool->p_creator_exit = FALSE;
mutex_init(&pool->p_creator_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&pool->p_creator_cv, NULL, CV_DEFAULT, NULL);
(void) zthread_create(NULL, pool->p_stksize, svc_thread_creator,
pool, 0, minclsyspri);
return (0);
}
/*
* PSARC 2003/523 Contract Private Interface
* svc_pool_create
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*
* Create an kernel RPC server-side thread/transport pool.
*
* This is public interface for creation of a server RPC thread pool
* for a given service provider. Transports registered with the pool's id
* will be served by a pool's threads. This function is called from the
* nfssys() system call.
*/
int
svc_pool_create(struct svcpool_args *args)
{
SVCPOOL *pool;
int error;
struct svc_globals *svc;
/*
* Caller should check credentials in a way appropriate
* in the context of the call.
*/
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
/* Allocate a new pool */
pool = kmem_zalloc(sizeof (SVCPOOL), KM_SLEEP);
/*
* Initialize the pool structure and create a creator thread.
*/
error = svc_pool_init(pool, args->maxthreads, args->redline,
args->qsize, args->timeout, args->stksize, args->max_same_xprt);
if (error) {
kmem_free(pool, sizeof (SVCPOOL));
return (error);
}
/* Register the pool with the global pool list */
svc_pool_register(svc, pool, args->id);
return (0);
}
int
svc_pool_control(int id, int cmd, void *arg)
{
SVCPOOL *pool;
struct svc_globals *svc;
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
switch (cmd) {
case SVCPSET_SHUTDOWN_PROC:
/*
* Search the list for a pool with a matching id
* and register the transport handle with that pool.
*/
mutex_enter(&svc->svc_plock);
if ((pool = svc_pool_find(svc, id)) == NULL) {
mutex_exit(&svc->svc_plock);
return (ENOENT);
}
/*
* Grab the transport list lock before releasing the
* pool list lock
*/
rw_enter(&pool->p_lrwlock, RW_WRITER);
mutex_exit(&svc->svc_plock);
pool->p_shutdown = *((void (*)())arg);
rw_exit(&pool->p_lrwlock);
return (0);
case SVCPSET_UNREGISTER_PROC:
/*
* Search the list for a pool with a matching id
* and register the unregister callback handle with that pool.
*/
mutex_enter(&svc->svc_plock);
if ((pool = svc_pool_find(svc, id)) == NULL) {
mutex_exit(&svc->svc_plock);
return (ENOENT);
}
/*
* Grab the transport list lock before releasing the
* pool list lock
*/
rw_enter(&pool->p_lrwlock, RW_WRITER);
mutex_exit(&svc->svc_plock);
pool->p_offline = *((void (*)())arg);
rw_exit(&pool->p_lrwlock);
return (0);
default:
return (EINVAL);
}
}
/*
* Pool's transport list manipulation routines.
* - svc_xprt_register()
* - svc_xprt_unregister()
*
* svc_xprt_register() is called from svc_tli_kcreate() to
* insert a new master transport handle into the doubly linked
* list of server transport handles (one list per pool).
*
* The list is used by svc_poll(), when it operates in `drain'
* mode, to search for a next transport with a pending request.
*/
int
svc_xprt_register(SVCMASTERXPRT *xprt, int id)
{
SVCMASTERXPRT *prev, *next;
SVCPOOL *pool;
struct svc_globals *svc;
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
/*
* Search the list for a pool with a matching id
* and register the transport handle with that pool.
*/
mutex_enter(&svc->svc_plock);
if ((pool = svc_pool_find(svc, id)) == NULL) {
mutex_exit(&svc->svc_plock);
return (ENOENT);
}
/* Grab the transport list lock before releasing the pool list lock */
rw_enter(&pool->p_lrwlock, RW_WRITER);
mutex_exit(&svc->svc_plock);
/* Don't register new transports when the pool is in closing state */
if (pool->p_closing) {
rw_exit(&pool->p_lrwlock);
return (EBUSY);
}
/*
* Initialize xp_pool to point to the pool.
* We don't want to go through the pool list every time.
*/
xprt->xp_pool = pool;
/*
* Insert a transport handle into the list.
* The list head points to the most recently inserted transport.
*/
if (pool->p_lhead == NULL)
pool->p_lhead = xprt->xp_prev = xprt->xp_next = xprt;
else {
next = pool->p_lhead;
prev = pool->p_lhead->xp_prev;
xprt->xp_next = next;
xprt->xp_prev = prev;
pool->p_lhead = prev->xp_next = next->xp_prev = xprt;
}
/* Increment the transports count */
pool->p_lcount++;
rw_exit(&pool->p_lrwlock);
return (0);
}
/*
* Called from svc_xprt_cleanup() to remove a master transport handle
* from the pool's list of server transports (when a transport is
* being destroyed).
*/
void
svc_xprt_unregister(SVCMASTERXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
/*
* Unlink xprt from the list.
* If the list head points to this xprt then move it
* to the next xprt or reset to NULL if this is the last
* xprt in the list.
*/
rw_enter(&pool->p_lrwlock, RW_WRITER);
if (xprt == xprt->xp_next)
pool->p_lhead = NULL;
else {
SVCMASTERXPRT *next = xprt->xp_next;
SVCMASTERXPRT *prev = xprt->xp_prev;
next->xp_prev = prev;
prev->xp_next = next;
if (pool->p_lhead == xprt)
pool->p_lhead = next;
}
xprt->xp_next = xprt->xp_prev = NULL;
/* Decrement list count */
pool->p_lcount--;
rw_exit(&pool->p_lrwlock);
}
static void
svc_xprt_qdestroy(SVCPOOL *pool)
{
mutex_destroy(&pool->p_qend_lock);
kmem_free(pool->p_qbody, pool->p_qsize * sizeof (__SVCXPRT_QNODE));
}
/*
* Initialize an `xprt-ready' queue for a given pool.
*/
static void
svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
{
int i;
pool->p_qsize = qsize;
pool->p_qbody = kmem_zalloc(pool->p_qsize * sizeof (__SVCXPRT_QNODE),
KM_SLEEP);
for (i = 0; i < pool->p_qsize - 1; i++)
pool->p_qbody[i].q_next = &(pool->p_qbody[i+1]);
pool->p_qbody[pool->p_qsize-1].q_next = &(pool->p_qbody[0]);
pool->p_qtop = &(pool->p_qbody[0]);
pool->p_qend = &(pool->p_qbody[0]);
mutex_init(&pool->p_qend_lock, NULL, MUTEX_DEFAULT, NULL);
}
/*
* Called from the svc_queuereq() interrupt routine to queue
* a hint for svc_poll() which transport has a pending request.
* - insert a pointer to xprt into the xprt-ready queue (FIFO)
* - if the xprt-ready queue is full turn the overflow flag on.
*
* NOTICE: pool->p_qtop is protected by the the pool's request lock
* and the caller (svc_queuereq()) must hold the lock.
*/
static void
svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
{
ASSERT(MUTEX_HELD(&pool->p_req_lock));
/* If the overflow flag is there is nothing we can do */
if (pool->p_qoverflow)
return;
/* If the queue is full turn the overflow flag on and exit */
if (pool->p_qtop->q_next == pool->p_qend) {
mutex_enter(&pool->p_qend_lock);
if (pool->p_qtop->q_next == pool->p_qend) {
pool->p_qoverflow = TRUE;
mutex_exit(&pool->p_qend_lock);
return;
}
mutex_exit(&pool->p_qend_lock);
}
/* Insert a hint and move pool->p_qtop */
pool->p_qtop->q_xprt = xprt;
pool->p_qtop = pool->p_qtop->q_next;
}
/*
* Called from svc_poll() to get a hint which transport has a
* pending request. Returns a pointer to a transport or NULL if the
* `xprt-ready' queue is empty.
*
* Since we do not acquire the pool's request lock while checking if
* the queue is empty we may miss a request that is just being delivered.
* However this is ok since svc_poll() will retry again until the
* count indicates that there are pending requests for this pool.
*/
static SVCMASTERXPRT *
svc_xprt_qget(SVCPOOL *pool)
{
SVCMASTERXPRT *xprt;
mutex_enter(&pool->p_qend_lock);
do {
/*
* If the queue is empty return NULL.
* Since we do not acquire the pool's request lock which
* protects pool->p_qtop this is not exact check. However,
* this is safe - if we miss a request here svc_poll()
* will retry again.
*/
if (pool->p_qend == pool->p_qtop) {
mutex_exit(&pool->p_qend_lock);
return (NULL);
}
/* Get a hint and move pool->p_qend */
xprt = pool->p_qend->q_xprt;
pool->p_qend = pool->p_qend->q_next;
/* Skip fields deleted by svc_xprt_qdelete() */
} while (xprt == NULL);
mutex_exit(&pool->p_qend_lock);
return (xprt);
}
/*
* Reset an overflow in the xprt-ready queue after
* all the pending requests has been drained.
* This switches svc_poll back to getting hints from the
* xprt-ready queue.
*
* NOTICE: pool->p_qtop is protected by the the pool's request lock
* and the caller (svc_poll()) must hold the lock.
*/
static void
svc_xprt_qreset(SVCPOOL *pool)
{
ASSERT(MUTEX_HELD(&pool->p_req_lock));
pool->p_qend = pool->p_qtop;
pool->p_qoverflow = FALSE;
}
/*
* Delete all the references to a transport handle that
* is being destroyed from the xprt-ready queue.
* Deleted pointers are replaced with NULLs.
*/
static void
svc_xprt_qdelete(SVCPOOL *pool, SVCMASTERXPRT *xprt)
{
__SVCXPRT_QNODE *q = pool->p_qend;
__SVCXPRT_QNODE *qtop = pool->p_qtop;
/*
* Delete all the references to xprt between the current
* position of pool->p_qend and current pool->p_qtop.
*/
for (;;) {
if (q->q_xprt == xprt)
q->q_xprt = NULL;
if (q == qtop)
return;
q = q->q_next;
}
}
/*
* Destructor for a master server transport handle.
* - if there are no more non-detached threads linked to this transport
* then, if requested, call xp_closeproc (we don't wait for detached
* threads linked to this transport to complete).
* - if there are no more threads linked to this
* transport then
* a) remove references to this transport from the xprt-ready queue
* b) remove a reference to this transport from the pool's transport list
* c) call a transport specific `destroy' function
* d) cancel remaining thread reservations.
*
* NOTICE: Caller must hold the transport's thread lock.
*/
static void
svc_xprt_cleanup(SVCMASTERXPRT *xprt, bool_t detached)
{
ASSERT(MUTEX_HELD(&xprt->xp_thread_lock));
ASSERT(xprt->xp_wq == NULL);
/*
* If called from the last non-detached thread
* it should call the closeproc on this transport.
*/
if (!detached && xprt->xp_threads == 0 && xprt->xp_closeproc) {
(*(xprt->xp_closeproc)) (xprt);
}
if (xprt->xp_threads + xprt->xp_detached_threads > 0)
mutex_exit(&xprt->xp_thread_lock);
else {
/* Remove references to xprt from the `xprt-ready' queue */
svc_xprt_qdelete(xprt->xp_pool, xprt);
/* Unregister xprt from the pool's transport list */
svc_xprt_unregister(xprt);
svc_callout_free(xprt);
SVC_DESTROY(xprt);
}
}
/*
* Find a dispatch routine for a given prog/vers pair.
* This function is called from svc_getreq() to search the callout
* table for an entry with a matching RPC program number `prog'
* and a version range that covers `vers'.
* - if it finds a matching entry it returns pointer to the dispatch routine
* - otherwise it returns NULL and, if `minp' or `maxp' are not NULL,
* fills them with, respectively, lowest version and highest version
* supported for the program `prog'
*/
static SVC_DISPATCH *
svc_callout_find(SVCXPRT *xprt, rpcprog_t prog, rpcvers_t vers,
rpcvers_t *vers_min, rpcvers_t *vers_max)
{
SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
int i;
*vers_min = ~(rpcvers_t)0;
*vers_max = 0;
for (i = 0; i < sct->sct_size; i++) {
SVC_CALLOUT *sc = &sct->sct_sc[i];
if (prog == sc->sc_prog) {
if (vers >= sc->sc_versmin && vers <= sc->sc_versmax)
return (sc->sc_dispatch);
if (*vers_max < sc->sc_versmax)
*vers_max = sc->sc_versmax;
if (*vers_min > sc->sc_versmin)
*vers_min = sc->sc_versmin;
}
}
return (NULL);
}
/*
* Optionally free callout table allocated for this transport by
* the service provider.
*/
static void
svc_callout_free(SVCMASTERXPRT *xprt)
{
SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
if (sct->sct_free) {
kmem_free(sct->sct_sc, sct->sct_size * sizeof (SVC_CALLOUT));
kmem_free(sct, sizeof (SVC_CALLOUT_TABLE));
}
}
/*
* Send a reply to an RPC request
*
* PSARC 2003/523 Contract Private Interface
* svc_sendreply
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
bool_t
svc_sendreply(const SVCXPRT *clone_xprt, const xdrproc_t xdr_results,
const caddr_t xdr_location)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = SUCCESS;
rply.acpted_rply.ar_results.where = xdr_location;
rply.acpted_rply.ar_results.proc = xdr_results;
return (SVC_REPLY((SVCXPRT *)clone_xprt, &rply));
}
/*
* No procedure error reply
*
* PSARC 2003/523 Contract Private Interface
* svcerr_noproc
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
void
svcerr_noproc(const SVCXPRT *clone_xprt)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = PROC_UNAVAIL;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Can't decode arguments error reply
*
* PSARC 2003/523 Contract Private Interface
* svcerr_decode
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
void
svcerr_decode(const SVCXPRT *clone_xprt)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = GARBAGE_ARGS;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Some system error
*/
void
svcerr_systemerr(const SVCXPRT *clone_xprt)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = SYSTEM_ERR;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Authentication error reply
*/
void
svcerr_auth(const SVCXPRT *clone_xprt, const enum auth_stat why)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_DENIED;
rply.rjcted_rply.rj_stat = AUTH_ERROR;
rply.rjcted_rply.rj_why = why;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Authentication too weak error reply
*/
void
svcerr_weakauth(const SVCXPRT *clone_xprt)
{
svcerr_auth((SVCXPRT *)clone_xprt, AUTH_TOOWEAK);
}
/*
* Program unavailable error reply
*
* PSARC 2003/523 Contract Private Interface
* svcerr_noprog
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
void
svcerr_noprog(const SVCXPRT *clone_xprt)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = PROG_UNAVAIL;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Program version mismatch error reply
*
* PSARC 2003/523 Contract Private Interface
* svcerr_progvers
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
void
svcerr_progvers(const SVCXPRT *clone_xprt,
const rpcvers_t low_vers, const rpcvers_t high_vers)
{
struct rpc_msg rply;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
rply.acpted_rply.ar_stat = PROG_MISMATCH;
rply.acpted_rply.ar_vers.low = low_vers;
rply.acpted_rply.ar_vers.high = high_vers;
SVC_FREERES((SVCXPRT *)clone_xprt);
SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
}
/*
* Get server side input from some transport.
*
* Statement of authentication parameters management:
* This function owns and manages all authentication parameters, specifically
* the "raw" parameters (msg.rm_call.cb_cred and msg.rm_call.cb_verf) and
* the "cooked" credentials (rqst->rq_clntcred).
* However, this function does not know the structure of the cooked
* credentials, so it make the following assumptions:
* a) the structure is contiguous (no pointers), and
* b) the cred structure size does not exceed RQCRED_SIZE bytes.
* In all events, all three parameters are freed upon exit from this routine.
* The storage is trivially managed on the call stack in user land, but
* is malloced in kernel land.
*
* Note: the xprt's xp_svc_lock is not held while the service's dispatch
* routine is running. If we decide to implement svc_unregister(), we'll
* need to decide whether it's okay for a thread to unregister a service
* while a request is being processed. If we decide that this is a
* problem, we can probably use some sort of reference counting scheme to
* keep the callout entry from going away until the request has completed.
*/
static void
svc_getreq(
SVCXPRT *clone_xprt, /* clone transport handle */
mblk_t *mp)
{
struct rpc_msg msg;
struct svc_req r;
char *cred_area; /* too big to allocate on call stack */
TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_START,
"svc_getreq_start:");
ASSERT(clone_xprt->xp_master != NULL);
ASSERT(!is_system_labeled() || DB_CRED(mp) != NULL ||
mp->b_datap->db_type != M_DATA);
/*
* Firstly, allocate the authentication parameters' storage
*/
mutex_enter(&rqcred_lock);
if (rqcred_head) {
cred_area = rqcred_head;
/* LINTED pointer alignment */
rqcred_head = *(caddr_t *)rqcred_head;
mutex_exit(&rqcred_lock);
} else {
mutex_exit(&rqcred_lock);
cred_area = kmem_alloc(2 * MAX_AUTH_BYTES + RQCRED_SIZE,
KM_SLEEP);
}
msg.rm_call.cb_cred.oa_base = cred_area;
msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
r.rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
/*
* underlying transport recv routine may modify mblk data
* and make it difficult to extract label afterwards. So
* get the label from the raw mblk data now.
*/
if (is_system_labeled()) {
mblk_t *lmp;
r.rq_label = kmem_alloc(sizeof (bslabel_t), KM_SLEEP);
if (DB_CRED(mp) != NULL)
lmp = mp;
else {
ASSERT(mp->b_cont != NULL);
lmp = mp->b_cont;
ASSERT(DB_CRED(lmp) != NULL);
}
bcopy(label2bslabel(crgetlabel(DB_CRED(lmp))), r.rq_label,
sizeof (bslabel_t));
} else {
r.rq_label = NULL;
}
/*
* Now receive a message from the transport.
*/
if (SVC_RECV(clone_xprt, mp, &msg)) {
void (*dispatchroutine) (struct svc_req *, SVCXPRT *);
rpcvers_t vers_min;
rpcvers_t vers_max;
bool_t no_dispatch;
enum auth_stat why;
/*
* Find the registered program and call its
* dispatch routine.
*/
r.rq_xprt = clone_xprt;
r.rq_prog = msg.rm_call.cb_prog;
r.rq_vers = msg.rm_call.cb_vers;
r.rq_proc = msg.rm_call.cb_proc;
r.rq_cred = msg.rm_call.cb_cred;
/*
* First authenticate the message.
*/
TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_START,
"svc_getreq_auth_start:");
if ((why = sec_svc_msg(&r, &msg, &no_dispatch)) != AUTH_OK) {
TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
"svc_getreq_auth_end:(%S)", "failed");
svcerr_auth(clone_xprt, why);
/*
* Free the arguments.
*/
(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
} else if (no_dispatch) {
/*
* XXX - when bug id 4053736 is done, remove
* the SVC_FREEARGS() call.
*/
(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
} else {
TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
"svc_getreq_auth_end:(%S)", "good");
dispatchroutine = svc_callout_find(clone_xprt,
r.rq_prog, r.rq_vers, &vers_min, &vers_max);
if (dispatchroutine) {
(*dispatchroutine) (&r, clone_xprt);
} else {
/*
* If we got here, the program or version
* is not served ...
*/
if (vers_max == 0 ||
version_keepquiet(clone_xprt))
svcerr_noprog(clone_xprt);
else
svcerr_progvers(clone_xprt, vers_min,
vers_max);
/*
* Free the arguments. For successful calls
* this is done by the dispatch routine.
*/
(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
/* Fall through to ... */
}
/*
* Call cleanup procedure for RPCSEC_GSS.
* This is a hack since there is currently no
* op, such as SVC_CLEANAUTH. rpc_gss_cleanup
* should only be called for a non null proc.
* Null procs in RPC GSS are overloaded to
* provide context setup and control. The main
* purpose of rpc_gss_cleanup is to decrement the
* reference count associated with the cached
* GSS security context. We should never get here
* for an RPCSEC_GSS null proc since *no_dispatch
* would have been set to true from sec_svc_msg above.
*/
if (r.rq_cred.oa_flavor == RPCSEC_GSS)
rpc_gss_cleanup(clone_xprt);
}
}
if (r.rq_label != NULL)
kmem_free(r.rq_label, sizeof (bslabel_t));
/*
* Free authentication parameters' storage
*/
mutex_enter(&rqcred_lock);
/* LINTED pointer alignment */
*(caddr_t *)cred_area = rqcred_head;
rqcred_head = cred_area;
mutex_exit(&rqcred_lock);
}
/*
* Allocate new clone transport handle.
*/
static SVCXPRT *
svc_clone_init(void)
{
SVCXPRT *clone_xprt;
clone_xprt = kmem_zalloc(sizeof (SVCXPRT), KM_SLEEP);
clone_xprt->xp_cred = crget();
return (clone_xprt);
}
/*
* Free memory allocated by svc_clone_init.
*/
static void
svc_clone_free(SVCXPRT *clone_xprt)
{
/* Fre credentials from crget() */
if (clone_xprt->xp_cred)
crfree(clone_xprt->xp_cred);
kmem_free(clone_xprt, sizeof (SVCXPRT));
}
/*
* Link a per-thread clone transport handle to a master
* - increment a thread reference count on the master
* - copy some of the master's fields to the clone
* - call a transport specific clone routine.
*/
static void
svc_clone_link(SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
{
cred_t *cred = clone_xprt->xp_cred;
ASSERT(cred);
/*
* Bump up master's thread count.
* Linking a per-thread clone transport handle to a master
* associates a service thread with the master.
*/
mutex_enter(&xprt->xp_thread_lock);
xprt->xp_threads++;
mutex_exit(&xprt->xp_thread_lock);
/* Clear everything */
bzero(clone_xprt, sizeof (SVCXPRT));
/* Set pointer to the master transport stucture */
clone_xprt->xp_master = xprt;
/* Structure copy of all the common fields */
clone_xprt->xp_xpc = xprt->xp_xpc;
/* Restore per-thread fields (xp_cred) */
clone_xprt->xp_cred = cred;
/*
* NOTICE: There is no transport-type specific code now.
* If you want to add a transport-type specific cloning code
* add one more operation (e.g. xp_clone()) to svc_ops,
* implement it for each transport type, and call it here
* through an appropriate macro (e.g. SVC_CLONE()).
*/
}
/*
* Unlink a non-detached clone transport handle from a master
* - decrement a thread reference count on the master
* - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
* if this is the last non-detached/absolute thread on this transport
* then it will close/destroy the transport
* - call transport specific function to destroy the clone handle
* - clear xp_master to avoid recursion.
*/
static void
svc_clone_unlink(SVCXPRT *clone_xprt)
{
SVCMASTERXPRT *xprt = clone_xprt->xp_master;
/* This cannot be a detached thread */
ASSERT(!clone_xprt->xp_detached);
ASSERT(xprt->xp_threads > 0);
/* Decrement a reference count on the transport */
mutex_enter(&xprt->xp_thread_lock);
xprt->xp_threads--;
/* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
if (xprt->xp_wq)
mutex_exit(&xprt->xp_thread_lock);
else
svc_xprt_cleanup(xprt, FALSE);
/* Call a transport specific clone `destroy' function */
SVC_CLONE_DESTROY(clone_xprt);
/* Clear xp_master */
clone_xprt->xp_master = NULL;
}
/*
* Unlink a detached clone transport handle from a master
* - decrement the thread count on the master
* - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
* if this is the last thread on this transport then it will destroy
* the transport.
* - call a transport specific function to destroy the clone handle
* - clear xp_master to avoid recursion.
*/
static void
svc_clone_unlinkdetached(SVCXPRT *clone_xprt)
{
SVCMASTERXPRT *xprt = clone_xprt->xp_master;
/* This must be a detached thread */
ASSERT(clone_xprt->xp_detached);
ASSERT(xprt->xp_detached_threads > 0);
ASSERT(xprt->xp_threads + xprt->xp_detached_threads > 0);
/* Grab xprt->xp_thread_lock and decrement link counts */
mutex_enter(&xprt->xp_thread_lock);
xprt->xp_detached_threads--;
/* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
if (xprt->xp_wq)
mutex_exit(&xprt->xp_thread_lock);
else
svc_xprt_cleanup(xprt, TRUE);
/* Call transport specific clone `destroy' function */
SVC_CLONE_DESTROY(clone_xprt);
/* Clear xp_master */
clone_xprt->xp_master = NULL;
}
/*
* Try to exit a non-detached service thread
* - check if there are enough threads left
* - if this thread (ie its clone transport handle) are linked
* to a master transport then unlink it
* - free the clone structure
* - return to userland for thread exit
*
* If this is the last non-detached or the last thread on this
* transport then the call to svc_clone_unlink() will, respectively,
* close and/or destroy the transport.
*/
static void
svc_thread_exit(SVCPOOL *pool, SVCXPRT *clone_xprt)
{
if (clone_xprt->xp_master)
svc_clone_unlink(clone_xprt);
svc_clone_free(clone_xprt);
mutex_enter(&pool->p_thread_lock);
pool->p_threads--;
if (pool->p_closing && svc_pool_tryexit(pool))
/* return - thread exit will be handled at user level */
return;
mutex_exit(&pool->p_thread_lock);
/* return - thread exit will be handled at user level */
}
/*
* Exit a detached service thread that returned to svc_run
* - decrement the `detached thread' count for the pool
* - unlink the detached clone transport handle from the master
* - free the clone structure
* - return to userland for thread exit
*
* If this is the last thread on this transport then the call
* to svc_clone_unlinkdetached() will destroy the transport.
*/
static void
svc_thread_exitdetached(SVCPOOL *pool, SVCXPRT *clone_xprt)
{
/* This must be a detached thread */
ASSERT(clone_xprt->xp_master);
ASSERT(clone_xprt->xp_detached);
ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
svc_clone_unlinkdetached(clone_xprt);
svc_clone_free(clone_xprt);
mutex_enter(&pool->p_thread_lock);
ASSERT(pool->p_reserved_threads >= 0);
ASSERT(pool->p_detached_threads > 0);
pool->p_detached_threads--;
if (pool->p_closing && svc_pool_tryexit(pool))
/* return - thread exit will be handled at user level */
return;
mutex_exit(&pool->p_thread_lock);
/* return - thread exit will be handled at user level */
}
/*
* PSARC 2003/523 Contract Private Interface
* svc_wait
* Changes must be reviewed by Solaris File Sharing
* Changes must be communicated to contract-2003-523@sun.com
*/
int
svc_wait(int id)
{
SVCPOOL *pool;
int err = 0;
struct svc_globals *svc;
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
mutex_enter(&svc->svc_plock);
pool = svc_pool_find(svc, id);
mutex_exit(&svc->svc_plock);
if (pool == NULL)
return (ENOENT);
mutex_enter(&pool->p_user_lock);
/* Check if there's already a user thread waiting on this pool */
if (pool->p_user_waiting) {
mutex_exit(&pool->p_user_lock);
return (EBUSY);
}
pool->p_user_waiting = TRUE;
/* Go to sleep, waiting for the signaled flag. */
while (!pool->p_signal_create_thread && !pool->p_user_exit) {
if (cv_wait_sig(&pool->p_user_cv, &pool->p_user_lock) == 0) {
/* Interrupted, return to handle exit or signal */
pool->p_user_waiting = FALSE;
pool->p_signal_create_thread = FALSE;
mutex_exit(&pool->p_user_lock);
/*
* Thread has been interrupted and therefore
* the service daemon is leaving as well so
* let's go ahead and remove the service
* pool at this time.
*/
mutex_enter(&svc->svc_plock);
svc_pool_unregister(svc, pool);
mutex_exit(&svc->svc_plock);
return (EINTR);
}
}
pool->p_signal_create_thread = FALSE;
pool->p_user_waiting = FALSE;
/*
* About to exit the service pool. Set return value
* to let the userland code know our intent. Signal
* svc_thread_creator() so that it can clean up the
* pool structure.
*/
if (pool->p_user_exit) {
err = ECANCELED;
cv_signal(&pool->p_user_cv);
}
mutex_exit(&pool->p_user_lock);
/* Return to userland with error code, for possible thread creation. */
return (err);
}
/*
* `Service threads' creator thread.
* The creator thread waits for a signal to create new thread.
*/
static void
svc_thread_creator(SVCPOOL *pool)
{
callb_cpr_t cpr_info; /* CPR info for the creator thread */
CALLB_CPR_INIT(&cpr_info, &pool->p_creator_lock, callb_generic_cpr,
"svc_thread_creator");
for (;;) {
mutex_enter(&pool->p_creator_lock);
/* Check if someone set the exit flag */
if (pool->p_creator_exit)
break;
/* Clear the `signaled' flag and go asleep */
pool->p_creator_signaled = FALSE;
CALLB_CPR_SAFE_BEGIN(&cpr_info);
cv_wait(&pool->p_creator_cv, &pool->p_creator_lock);
CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
/* Check if someone signaled to exit */
if (pool->p_creator_exit)
break;
mutex_exit(&pool->p_creator_lock);
mutex_enter(&pool->p_thread_lock);
/*
* When the pool is in closing state and all the transports
* are gone the creator should not create any new threads.
*/
if (pool->p_closing) {
rw_enter(&pool->p_lrwlock, RW_READER);
if (pool->p_lcount == 0) {
rw_exit(&pool->p_lrwlock);
mutex_exit(&pool->p_thread_lock);
continue;
}
rw_exit(&pool->p_lrwlock);
}
/*
* Create a new service thread now.
*/
ASSERT(pool->p_reserved_threads >= 0);
ASSERT(pool->p_detached_threads >= 0);
if (pool->p_threads + pool->p_detached_threads <
pool->p_maxthreads) {
/*
* Signal the service pool wait thread
* only if it hasn't already been signaled.
*/
mutex_enter(&pool->p_user_lock);
if (pool->p_signal_create_thread == FALSE) {
pool->p_signal_create_thread = TRUE;
cv_signal(&pool->p_user_cv);
}
mutex_exit(&pool->p_user_lock);
}
mutex_exit(&pool->p_thread_lock);
}
/*
* Pool is closed. Cleanup and exit.
*/
/* Signal userland creator thread that it can stop now. */
mutex_enter(&pool->p_user_lock);
pool->p_user_exit = TRUE;
cv_broadcast(&pool->p_user_cv);
mutex_exit(&pool->p_user_lock);
/* Wait for svc_wait() to be done with the pool */
mutex_enter(&pool->p_user_lock);
while (pool->p_user_waiting) {
CALLB_CPR_SAFE_BEGIN(&cpr_info);
cv_wait(&pool->p_user_cv, &pool->p_user_lock);
CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
}
mutex_exit(&pool->p_user_lock);
CALLB_CPR_EXIT(&cpr_info);
svc_pool_cleanup(pool);
zthread_exit();
}
/*
* If the creator thread is idle signal it to create
* a new service thread.
*/
static void
svc_creator_signal(SVCPOOL *pool)
{
mutex_enter(&pool->p_creator_lock);
if (pool->p_creator_signaled == FALSE) {
pool->p_creator_signaled = TRUE;
cv_signal(&pool->p_creator_cv);
}
mutex_exit(&pool->p_creator_lock);
}
/*
* Notify the creator thread to clean up and exit.
*/
static void
svc_creator_signalexit(SVCPOOL *pool)
{
mutex_enter(&pool->p_creator_lock);
pool->p_creator_exit = TRUE;
cv_signal(&pool->p_creator_cv);
mutex_exit(&pool->p_creator_lock);
}
/*
* Polling part of the svc_run().
* - search for a transport with a pending request
* - when one is found then latch the request lock and return to svc_run()
* - if there is no request go asleep and wait for a signal
* - handle two exceptions:
* a) current transport is closing
* b) timeout waiting for a new request
* in both cases return to svc_run()
*/
static SVCMASTERXPRT *
svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
{
/*
* Main loop iterates until
* a) we find a pending request,
* b) detect that the current transport is closing
* c) time out waiting for a new request.
*/
for (;;) {
SVCMASTERXPRT *next;
clock_t timeleft;
/*
* Step 1.
* Check if there is a pending request on the current
* transport handle so that we can avoid cloning.
* If so then decrement the `pending-request' count for
* the pool and return to svc_run().
*
* We need to prevent a potential starvation. When
* a selected transport has all pending requests coming in
* all the time then the service threads will never switch to
* another transport. With a limited number of service
* threads some transports may be never serviced.
* To prevent such a scenario we pick up at most
* pool->p_max_same_xprt requests from the same transport
* and then take a hint from the xprt-ready queue or walk
* the transport list.
*/
if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
mutex_enter(&xprt->xp_req_lock);
if (xprt->xp_req_head) {
mutex_enter(&pool->p_req_lock);
pool->p_reqs--;
mutex_exit(&pool->p_req_lock);
return (xprt);
}
mutex_exit(&xprt->xp_req_lock);
}
clone_xprt->xp_same_xprt = 0;
/*
* Step 2.
* If there is no request on the current transport try to
* find another transport with a pending request.
*/
mutex_enter(&pool->p_req_lock);
pool->p_walkers++;
mutex_exit(&pool->p_req_lock);
/*
* Make sure that transports will not be destroyed just
* while we are checking them.
*/
rw_enter(&pool->p_lrwlock, RW_READER);
for (;;) {
SVCMASTERXPRT *hint;
/*
* Get the next transport from the xprt-ready queue.
* This is a hint. There is no guarantee that the
* transport still has a pending request since it
* could be picked up by another thread in step 1.
*
* If the transport has a pending request then keep
* it locked. Decrement the `pending-requests' for
* the pool and `walking-threads' counts, and return
* to svc_run().
*/
hint = svc_xprt_qget(pool);
if (hint && hint->xp_req_head) {
mutex_enter(&hint->xp_req_lock);
if (hint->xp_req_head) {
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
pool->p_reqs--;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
return (hint);
}
mutex_exit(&hint->xp_req_lock);
}
/*
* If there was no hint in the xprt-ready queue then
* - if there is less pending requests than polling
* threads go asleep
* - otherwise check if there was an overflow in the
* xprt-ready queue; if so, then we need to break
* the `drain' mode
*/
if (hint == NULL) {
if (pool->p_reqs < pool->p_walkers) {
mutex_enter(&pool->p_req_lock);
if (pool->p_reqs < pool->p_walkers)
goto sleep;
mutex_exit(&pool->p_req_lock);
}
if (pool->p_qoverflow) {
break;
}
}
}
/*
* If there was an overflow in the xprt-ready queue then we
* need to switch to the `drain' mode, i.e. walk through the
* pool's transport list and search for a transport with a
* pending request. If we manage to drain all the pending
* requests then we can clear the overflow flag. This will
* switch svc_poll() back to taking hints from the xprt-ready
* queue (which is generally more efficient).
*
* If there are no registered transports simply go asleep.
*/
if (xprt == NULL && pool->p_lhead == NULL) {
mutex_enter(&pool->p_req_lock);
goto sleep;
}
/*
* `Walk' through the pool's list of master server
* transport handles. Continue to loop until there are less
* looping threads then pending requests.
*/
next = xprt ? xprt->xp_next : pool->p_lhead;
for (;;) {
/*
* Check if there is a request on this transport.
*
* Since blocking on a locked mutex is very expensive
* check for a request without a lock first. If we miss
* a request that is just being delivered but this will
* cost at most one full walk through the list.
*/
if (next->xp_req_head) {
/*
* Check again, now with a lock.
*/
mutex_enter(&next->xp_req_lock);
if (next->xp_req_head) {
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
pool->p_reqs--;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
return (next);
}
mutex_exit(&next->xp_req_lock);
}
/*
* Continue to `walk' through the pool's
* transport list until there is less requests
* than walkers. Check this condition without
* a lock first to avoid contention on a mutex.
*/
if (pool->p_reqs < pool->p_walkers) {
/*
* Check again, now with the lock.
* If all the pending requests have been
* picked up than clear the overflow flag.
*/
mutex_enter(&pool->p_req_lock);
if (pool->p_reqs <= 0)
svc_xprt_qreset(pool);
if (pool->p_reqs < pool->p_walkers)
break; /* goto sleep */
mutex_exit(&pool->p_req_lock);
}
next = next->xp_next;
}
sleep:
/*
* No work to do. Stop the `walk' and go asleep.
* Decrement the `walking-threads' count for the pool.
*/
pool->p_walkers--;
rw_exit(&pool->p_lrwlock);
/*
* Count us as asleep, mark this thread as safe
* for suspend and wait for a request.
*/
pool->p_asleep++;
timeleft = cv_timedwait_sig(&pool->p_req_cv, &pool->p_req_lock,
pool->p_timeout + lbolt);
/*
* If the drowsy flag is on this means that
* someone has signaled a wakeup. In such a case
* the `asleep-threads' count has already updated
* so just clear the flag.
*
* If the drowsy flag is off then we need to update
* the `asleep-threads' count.
*/
if (pool->p_drowsy) {
pool->p_drowsy = FALSE;
/*
* If the thread is here because it timedout,
* instead of returning SVC_ETIMEDOUT, it is
* time to do some more work.
*/
if (timeleft == -1)
timeleft = 1;
} else {
pool->p_asleep--;
}
mutex_exit(&pool->p_req_lock);
/*
* If we received a signal while waiting for a
* request, inform svc_run(), so that we can return
* to user level and restart the call.
*/
if (timeleft == 0)
return (SVC_EINTR);
/*
* If the current transport is gone then notify
* svc_run() to unlink from it.
*/
if (xprt && xprt->xp_wq == NULL)
return (SVC_EXPRTGONE);
/*
* If we have timed out waiting for a request inform
* svc_run() that we probably don't need this thread.
*/
if (timeleft == -1)
return (SVC_ETIMEDOUT);
}
}
/*
* Main loop of the kernel RPC server
* - wait for input (find a transport with a pending request).
* - dequeue the request
* - call a registered server routine to process the requests
*
* There can many threads running concurrently in this loop
* on the same or on different transports.
*/
static int
svc_run(SVCPOOL *pool)
{
SVCMASTERXPRT *xprt = NULL; /* master transport handle */
SVCXPRT *clone_xprt; /* clone for this thread */
struct svc_globals *svc;
proc_t *p = ttoproc(curthread);
/* Allocate a clone transport handle for this thread */
clone_xprt = svc_clone_init();
/*
* The loop iterates until the thread becomes
* idle too long or the transport is gone.
*/
for (;;) {
SVCMASTERXPRT *next;
mblk_t *mp;
TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
/*
* If the process is exiting/killed, return
* immediately without processing any more
* requests.
*/
if (p->p_flag & (SEXITING | SKILLED)) {
svc_thread_exit(pool, clone_xprt);
/*
* Thread has been interrupted and therefore
* the service daemon is leaving as well so
* let's go ahead and remove the service
* pool at this time.
*/
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
mutex_enter(&svc->svc_plock);
svc_pool_unregister(svc, pool);
mutex_exit(&svc->svc_plock);
return (0);
}
/* Find a transport with a pending request */
next = svc_poll(pool, xprt, clone_xprt);
/*
* If svc_poll() finds a transport with a request
* it latches xp_req_lock on it. Therefore we need
* to dequeue the request and release the lock as
* soon as possible.
*/
ASSERT(next != NULL &&
(next == SVC_EXPRTGONE ||
next == SVC_ETIMEDOUT ||
next == SVC_EINTR ||
MUTEX_HELD(&next->xp_req_lock)));
/* Ooops! Current transport is closing. Unlink now */
if (next == SVC_EXPRTGONE) {
svc_clone_unlink(clone_xprt);
xprt = NULL;
continue;
}
/* Ooops! Timeout while waiting for a request. Exit */
if (next == SVC_ETIMEDOUT) {
svc_thread_exit(pool, clone_xprt);
return (0);
}
/*
* Interrupted by a signal while waiting for a
* request. Return to userspace and restart.
*/
if (next == SVC_EINTR) {
svc_thread_exit(pool, clone_xprt);
/*
* Thread has been interrupted and therefore
* the service daemon is leaving as well so
* let's go ahead and remove the service
* pool at this time.
*/
svc = zone_getspecific(svc_zone_key, curproc->p_zone);
mutex_enter(&svc->svc_plock);
svc_pool_unregister(svc, pool);
mutex_exit(&svc->svc_plock);
return (EINTR);
}
/*
* De-queue the request and release the request lock
* on this transport (latched by svc_poll()).
*/
mp = next->xp_req_head;
next->xp_req_head = mp->b_next;
mp->b_next = (mblk_t *)0;
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
"rpc_que_req_deq:pool %p mp %p", pool, mp);
mutex_exit(&next->xp_req_lock);
/*
* If this is a new request on a current transport then
* the clone structure is already properly initialized.
* Otherwise, if the request is on a different transport,
* unlink from the current master and link to
* the one we got a request on.
*/
if (next != xprt) {
if (xprt)
svc_clone_unlink(clone_xprt);
svc_clone_link(next, clone_xprt);
xprt = next;
}
/*
* If there are more requests and req_cv hasn't
* been signaled yet then wake up one more thread now.
*
* We avoid signaling req_cv until the most recently
* signaled thread wakes up and gets CPU to clear
* the `drowsy' flag.
*/
if (!(pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
pool->p_asleep == 0)) {
mutex_enter(&pool->p_req_lock);
if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
pool->p_asleep == 0)
mutex_exit(&pool->p_req_lock);
else {
pool->p_asleep--;
pool->p_drowsy = TRUE;
cv_signal(&pool->p_req_cv);
mutex_exit(&pool->p_req_lock);
}
}
/*
* If there are no asleep/signaled threads, we are
* still below pool->p_maxthreads limit, and no thread is
* currently being created then signal the creator
* for one more service thread.
*
* The asleep and drowsy checks are not protected
* by a lock since it hurts performance and a wrong
* decision is not essential.
*/
if (pool->p_asleep == 0 && !pool->p_drowsy &&
pool->p_threads + pool->p_detached_threads <
pool->p_maxthreads)
svc_creator_signal(pool);
/*
* Process the request.
*/
svc_getreq(clone_xprt, mp);
/* If thread had a reservation it should have been canceled */
ASSERT(!clone_xprt->xp_reserved);
/*
* If the clone is marked detached then exit.
* The rpcmod slot has already been released
* when we detached this thread.
*/
if (clone_xprt->xp_detached) {
svc_thread_exitdetached(pool, clone_xprt);
return (0);
}
/*
* Release our reference on the rpcmod
* slot attached to xp_wq->q_ptr.
*/
(*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
}
/* NOTREACHED */
}
/*
* Flush any pending requests for the queue and
* and free the associated mblks.
*/
void
svc_queueclean(queue_t *q)
{
SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
mblk_t *mp;
/*
* clean up the requests
*/
mutex_enter(&xprt->xp_req_lock);
while ((mp = xprt->xp_req_head) != NULL) {
xprt->xp_req_head = mp->b_next;
mp->b_next = (mblk_t *)0;
(*RELE_PROC(xprt)) (xprt->xp_wq, mp);
}
mutex_exit(&xprt->xp_req_lock);
}
/*
* This routine is called by rpcmod to inform kernel RPC that a
* queue is closing. It is called after all the requests have been
* picked up (that is after all the slots on the queue have
* been released by kernel RPC). It is also guaranteed that no more
* request will be delivered on this transport.
*
* - clear xp_wq to mark the master server transport handle as closing
* - if there are no more threads on this transport close/destroy it
* - otherwise, broadcast threads sleeping in svc_poll(); the last
* thread will close/destroy the transport.
*/
void
svc_queueclose(queue_t *q)
{
SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
if (xprt == NULL) {
/*
* If there is no master xprt associated with this stream,
* then there is nothing to do. This happens regularly
* with connection-oriented listening streams created by
* nfsd.
*/
return;
}
mutex_enter(&xprt->xp_thread_lock);
ASSERT(xprt->xp_req_head == NULL);
ASSERT(xprt->xp_wq != NULL);
xprt->xp_wq = NULL;
if (xprt->xp_threads == 0) {
SVCPOOL *pool = xprt->xp_pool;
/*
* svc_xprt_cleanup() destroys the transport
* or releases the transport thread lock
*/
svc_xprt_cleanup(xprt, FALSE);
mutex_enter(&pool->p_thread_lock);
/*
* If the pool is in closing state and this was
* the last transport in the pool then signal the creator
* thread to clean up and exit.
*/
if (pool->p_closing && svc_pool_tryexit(pool)) {
return;
}
mutex_exit(&pool->p_thread_lock);
} else {
/*
* Wakeup threads sleeping in svc_poll() so that they
* unlink from the transport
*/
mutex_enter(&xprt->xp_pool->p_req_lock);
cv_broadcast(&xprt->xp_pool->p_req_cv);
mutex_exit(&xprt->xp_pool->p_req_lock);
/*
* NOTICE: No references to the master transport structure
* beyond this point!
*/
mutex_exit(&xprt->xp_thread_lock);
}
}
/*
* Interrupt `request delivery' routine called from rpcmod
* - put a request at the tail of the transport request queue
* - insert a hint for svc_poll() into the xprt-ready queue
* - increment the `pending-requests' count for the pool
* - wake up a thread sleeping in svc_poll() if necessary
* - if all the threads are running ask the creator for a new one.
*/
void
svc_queuereq(queue_t *q, mblk_t *mp)
{
SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
SVCPOOL *pool = xprt->xp_pool;
TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
ASSERT(!is_system_labeled() || DB_CRED(mp) != NULL ||
mp->b_datap->db_type != M_DATA);
/*
* Step 1.
* Grab the transport's request lock and put
* the request at the tail of the transport's
* request queue.
*/
mutex_enter(&xprt->xp_req_lock);
if (xprt->xp_req_head == NULL)
xprt->xp_req_head = mp;
else
xprt->xp_req_tail->b_next = mp;
xprt->xp_req_tail = mp;
mutex_exit(&xprt->xp_req_lock);
/*
* Step 2.
* Grab the pool request lock, insert a hint into
* the xprt-ready queue, increment `pending-requests'
* count for the pool, and wake up a thread sleeping
* in svc_poll() if necessary.
*/
mutex_enter(&pool->p_req_lock);
/* Insert pointer to this transport into the xprt-ready queue */
svc_xprt_qput(pool, xprt);
/* Increment the `pending-requests' count for the pool */
pool->p_reqs++;
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
"rpc_que_req_enq:pool %p mp %p", pool, mp);
/*
* If there are more requests and req_cv hasn't
* been signaled yet then wake up one more thread now.
*
* We avoid signaling req_cv until the most recently
* signaled thread wakes up and gets CPU to clear
* the `drowsy' flag.
*/
if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
pool->p_asleep == 0) {
mutex_exit(&pool->p_req_lock);
} else {
pool->p_drowsy = TRUE;
pool->p_asleep--;
/*
* Signal wakeup and drop the request lock.
*/
cv_signal(&pool->p_req_cv);
mutex_exit(&pool->p_req_lock);
}
/*
* Step 3.
* If there are no asleep/signaled threads, we are
* still below pool->p_maxthreads limit, and no thread is
* currently being created then signal the creator
* for one more service thread.
*
* The asleep and drowsy checks are not not protected
* by a lock since it hurts performance and a wrong
* decision is not essential.
*/
if (pool->p_asleep == 0 && !pool->p_drowsy &&
pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
svc_creator_signal(pool);
TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
"svc_queuereq_end:(%S)", "end");
}
/*
* Reserve a service thread so that it can be detached later.
* This reservation is required to make sure that when it tries to
* detach itself the total number of detached threads does not exceed
* pool->p_maxthreads - pool->p_redline (i.e. that we can have
* up to pool->p_redline non-detached threads).
*
* If the thread does not detach itself later, it should cancel the
* reservation before returning to svc_run().
*
* - check if there is room for more reserved/detached threads
* - if so, then increment the `reserved threads' count for the pool
* - mark the thread as reserved (setting the flag in the clone transport
* handle for this thread
* - returns 1 if the reservation succeeded, 0 if it failed.
*/
int
svc_reserve_thread(SVCXPRT *clone_xprt)
{
SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
/* Recursive reservations are not allowed */
ASSERT(!clone_xprt->xp_reserved);
ASSERT(!clone_xprt->xp_detached);
/* Check pool counts if there is room for reservation */
mutex_enter(&pool->p_thread_lock);
if (pool->p_reserved_threads + pool->p_detached_threads >=
pool->p_maxthreads - pool->p_redline) {
mutex_exit(&pool->p_thread_lock);
return (0);
}
pool->p_reserved_threads++;
mutex_exit(&pool->p_thread_lock);
/* Mark the thread (clone handle) as reserved */
clone_xprt->xp_reserved = TRUE;
return (1);
}
/*
* Cancel a reservation for a thread.
* - decrement the `reserved threads' count for the pool
* - clear the flag in the clone transport handle for this thread.
*/
void
svc_unreserve_thread(SVCXPRT *clone_xprt)
{
SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
/* Thread must have a reservation */
ASSERT(clone_xprt->xp_reserved);
ASSERT(!clone_xprt->xp_detached);
/* Decrement global count */
mutex_enter(&pool->p_thread_lock);
pool->p_reserved_threads--;
mutex_exit(&pool->p_thread_lock);
/* Clear reservation flag */
clone_xprt->xp_reserved = FALSE;
}
/*
* Detach a thread from its transport, so that it can block for an
* extended time. Because the transport can be closed after the thread is
* detached, the thread should have already sent off a reply if it was
* going to send one.
*
* - decrement `non-detached threads' count and increment `detached threads'
* counts for the transport
* - decrement the `non-detached threads' and `reserved threads'
* counts and increment the `detached threads' count for the pool
* - release the rpcmod slot
* - mark the clone (thread) as detached.
*
* No need to return a pointer to the thread's CPR information, since
* the thread has a userland identity.
*
* NOTICE: a thread must not detach itself without making a prior reservation
* through svc_thread_reserve().
*/
callb_cpr_t *
svc_detach_thread(SVCXPRT *clone_xprt)
{
SVCMASTERXPRT *xprt = clone_xprt->xp_master;
SVCPOOL *pool = xprt->xp_pool;
/* Thread must have a reservation */
ASSERT(clone_xprt->xp_reserved);
ASSERT(!clone_xprt->xp_detached);
/* Bookkeeping for this transport */
mutex_enter(&xprt->xp_thread_lock);
xprt->xp_threads--;
xprt->xp_detached_threads++;
mutex_exit(&xprt->xp_thread_lock);
/* Bookkeeping for the pool */
mutex_enter(&pool->p_thread_lock);
pool->p_threads--;
pool->p_reserved_threads--;
pool->p_detached_threads++;
mutex_exit(&pool->p_thread_lock);
/* Release an rpcmod slot for this request */
(*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
/* Mark the clone (thread) as detached */
clone_xprt->xp_reserved = FALSE;
clone_xprt->xp_detached = TRUE;
return (NULL);
}
/*
* This routine is responsible for extracting RDMA plugin master XPRT,
* unregister from the SVCPOOL and initiate plugin specific cleanup.
* It is passed a list/group of rdma transports as records which are
* active in a given registered or unregistered kRPC thread pool. Its shuts
* all active rdma transports in that pool. If the thread active on the trasport
* happens to be last thread for that pool, it will signal the creater thread
* to cleanup the pool and destroy the xprt in svc_queueclose()
*/
void
rdma_stop(rdma_xprt_group_t rdma_xprts)
{
SVCMASTERXPRT *xprt;
rdma_xprt_record_t *curr_rec;
queue_t *q;
mblk_t *mp;
int i;
if (rdma_xprts.rtg_count == 0)
return;
for (i = 0; i < rdma_xprts.rtg_count; i++) {
curr_rec = rdma_xprts.rtg_listhead;
rdma_xprts.rtg_listhead = curr_rec->rtr_next;
curr_rec->rtr_next = NULL;
xprt = curr_rec->rtr_xprt_ptr;
q = xprt->xp_wq;
svc_rdma_kstop(xprt);
mutex_enter(&xprt->xp_req_lock);
while ((mp = xprt->xp_req_head) != NULL) {
xprt->xp_req_head = mp->b_next;
mp->b_next = (mblk_t *)0;
if (mp)
freemsg(mp);
}
mutex_exit(&xprt->xp_req_lock);
svc_queueclose(q);
#ifdef DEBUG
if (rdma_check)
cmn_err(CE_NOTE, "rdma_stop: Exited svc_queueclose\n");
#endif
/*
* Free the rdma transport record for the expunged rdma
* based master transport handle.
*/
kmem_free(curr_rec, sizeof (rdma_xprt_record_t));
if (!rdma_xprts.rtg_listhead)
break;
}
}