clnt_vc.c revision 7c478bd95313f5f23a4c958a745db2134aa03244
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2005 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
* Portions of this source code were derived from Berkeley
* 4.3 BSD under license from the Regents of the University of
* California.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
/*
* clnt_vc.c
*
* Implements a connectionful client side RPC.
*
* Connectionful RPC supports 'batched calls'.
* A sequence of calls may be batched-up in a send buffer. The rpc call
* return immediately to the client even though the call was not necessarily
* sent. The batching occurs if the results' xdr routine is NULL (0) AND
* the rpc timeout value is zero (see clnt.h, rpc).
*
* Clients should NOT casually batch calls that in fact return results; that
* is the server side should be aware that a call is batched and not produce
* any return message. Batched calls that produce many result messages can
* deadlock (netlock) the client and the server....
*/
#include "mt.h"
#include "rpc_mt.h"
#include <assert.h>
#include <rpc/rpc.h>
#include <rpc/trace.h>
#include <errno.h>
#include <sys/byteorder.h>
#include <sys/mkdev.h>
#include <sys/poll.h>
#include <syslog.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#define MCALL_MSG_SIZE 24
#define SECS_TO_MS 1000
#define USECS_TO_MS 1/1000
#ifndef MIN
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
extern int __rpc_timeval_to_msec();
extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
extern bool_t xdr_opaque_auth();
extern bool_t __rpc_gss_wrap();
extern bool_t __rpc_gss_unwrap();
CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
rpcvers_t, uint_t, uint_t, const struct timeval *);
static struct clnt_ops *clnt_vc_ops();
#ifdef __STDC__
static int read_vc(void *, caddr_t, int);
static int write_vc(void *, caddr_t, int);
#else
static int read_vc();
static int write_vc();
#endif
static int t_rcvall();
static bool_t time_not_ok();
static bool_t set_up_connection();
struct ct_data;
static bool_t set_io_mode(struct ct_data *ct, int ioMode);
/*
* Lock table handle used by various MT sync. routines
*/
static mutex_t vctbl_lock = DEFAULTMUTEX;
static void *vctbl = NULL;
static const char clnt_vc_errstr[] = "%s : %s";
static const char clnt_vc_str[] = "clnt_vc_create";
static const char clnt_read_vc_str[] = "read_vc";
static const char __no_mem_str[] = "out of memory";
static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
static const char no_nonblock_str[] = "could not set transport blocking mode";
/*
* Private data structure
*/
struct ct_data {
int ct_fd; /* connection's fd */
bool_t ct_closeit; /* close it on destroy */
int ct_tsdu; /* size of tsdu */
int ct_wait; /* wait interval in milliseconds */
bool_t ct_waitset; /* wait set by clnt_control? */
struct netbuf ct_addr; /* remote addr */
struct rpc_err ct_error;
char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
uint_t ct_mpos; /* pos after marshal */
XDR ct_xdrs; /* XDR stream */
/* NON STANDARD INFO - 00-08-31 */
bool_t ct_is_oneway; /* True if the current call is oneway. */
bool_t ct_is_blocking;
ushort_t ct_io_mode;
ushort_t ct_blocking_mode;
uint_t ct_bufferSize; /* Total size of the buffer. */
uint_t ct_bufferPendingSize; /* Size of unsent data. */
char *ct_buffer; /* Pointer to the buffer. */
char *ct_bufferWritePtr; /* Ptr to the first free byte. */
char *ct_bufferReadPtr; /* Ptr to the first byte of data. */
};
struct nb_reg_node {
struct nb_reg_node *next;
struct ct_data *ct;
};
static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free;
static bool_t exit_handler_set = FALSE;
static mutex_t nb_list_mutex = DEFAULTMUTEX;
/* Define some macros to manage the linked list. */
#define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
#define LIST_CLR(l) (l = (struct nb_reg_node *)&l)
#define LIST_ADD(l, node) (node->next = l->next, l = node)
#define LIST_EXTRACT(l, node) (node = l, l = l->next)
#define LIST_FOR_EACH(l, node) \
for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
/* Default size of the IO buffer used in non blocking mode */
#define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
static int nb_send(struct ct_data *ct, void *buff,
unsigned int nbytes);
static int do_flush(struct ct_data *ct, uint_t flush_mode);
static bool_t set_flush_mode(struct ct_data *ct,
int P_mode);
static bool_t set_blocking_connection(struct ct_data *ct,
bool_t blocking);
static int register_nb(struct ct_data *ct);
static int unregister_nb(struct ct_data *ct);
/*
* Change the mode of the underlying fd.
*/
static bool_t
set_blocking_connection(struct ct_data *ct, bool_t blocking)
{
int flag;
/*
* If the underlying fd is already in the required mode,
* avoid the syscall.
*/
if (ct->ct_is_blocking == blocking)
return (TRUE);
if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
(void) syslog(LOG_ERR, "set_blocking_connection : %s",
no_fcntl_getfl_str);
return (FALSE);
}
flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
(void) syslog(LOG_ERR, "set_blocking_connection : %s",
no_nonblock_str);
return (FALSE);
}
ct->ct_is_blocking = blocking;
return (TRUE);
}
/*
* Create a client handle for a connection.
* Default options are set, which the user can change using clnt_control()'s.
* The rpc/vc package does buffering similar to stdio, so the client
* must pick send and receive buffer sizes, 0 => use the default.
* NB: fd is copied into a private area.
* NB: The rpch->cl_auth is set null authentication. Caller may wish to
* set this something more useful.
*
* fd should be open and bound.
*/
CLIENT *
clnt_vc_create(fd, svcaddr, prog, vers, sendsz, recvsz)
int fd; /* open file descriptor */
struct netbuf *svcaddr; /* servers address */
rpcprog_t prog; /* program number */
rpcvers_t vers; /* version number */
uint_t sendsz; /* buffer recv size */
uint_t recvsz; /* buffer send size */
{
return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
recvsz, NULL));
}
/*
* This has the same definition as clnt_vc_create(), except it
* takes an additional parameter - a pointer to a timeval structure.
*
* Not a public interface. This is for clnt_create_timed,
* clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
* value to control a tcp connection attempt.
* (for bug 4049792: clnt_create_timed does not time out)
*
* If tp is NULL, use default timeout to set up the connection.
*/
CLIENT *
_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz, recvsz, tp)
int fd; /* open file descriptor */
struct netbuf *svcaddr; /* servers address */
rpcprog_t prog; /* program number */
rpcvers_t vers; /* version number */
uint_t sendsz; /* buffer recv size */
uint_t recvsz; /* buffer send size */
const struct timeval *tp; /* connection timeout */
{
CLIENT *cl; /* client handle */
struct ct_data *ct; /* private data */
struct timeval now;
struct rpc_msg call_msg;
struct t_info tinfo;
int flag;
trace5(TR_clnt_vc_create, 0, prog, vers, sendsz, recvsz);
cl = (CLIENT *)mem_alloc(sizeof (*cl));
ct = (struct ct_data *)mem_alloc(sizeof (*ct));
if ((cl == (CLIENT *)NULL) || (ct == (struct ct_data *)NULL)) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_vc_str, __no_mem_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
goto err;
}
ct->ct_addr.buf = NULL;
sig_mutex_lock(&vctbl_lock);
if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
sig_mutex_unlock(&vctbl_lock);
goto err;
}
ct->ct_io_mode = RPC_CL_BLOCKING;
ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
ct->ct_buffer = NULL; /* We allocate the buffer when needed. */
ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
ct->ct_bufferPendingSize = 0;
ct->ct_bufferWritePtr = NULL;
ct->ct_bufferReadPtr = NULL;
/* Check the current state of the fd. */
if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
no_fcntl_getfl_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_terrno = errno;
rpc_createerr.cf_error.re_errno = 0;
sig_mutex_unlock(&vctbl_lock);
goto err;
}
ct->ct_is_blocking = flag&O_NONBLOCK? FALSE:TRUE;
if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
sig_mutex_unlock(&vctbl_lock);
goto err;
}
sig_mutex_unlock(&vctbl_lock);
/*
* Set up other members of private data struct
*/
ct->ct_fd = fd;
/*
* The actual value will be set by clnt_call or clnt_control
*/
ct->ct_wait = 30000;
ct->ct_waitset = FALSE;
/*
* By default, closeit is always FALSE. It is users responsibility
* to do a t_close on it, else the user may use clnt_control
* to let clnt_destroy do it for him/her.
*/
ct->ct_closeit = FALSE;
/*
* Initialize call message
*/
(void) gettimeofday(&now, (struct timezone *)0);
call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
call_msg.rm_call.cb_prog = prog;
call_msg.rm_call.cb_vers = vers;
/*
* pre-serialize the static part of the call msg and stash it away
*/
xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
goto err;
}
ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
XDR_DESTROY(&(ct->ct_xdrs));
if (t_getinfo(fd, &tinfo) == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = 0;
goto err;
}
/*
* Find the receive and the send size
*/
sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
if ((sendsz == 0) || (recvsz == 0)) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = 0;
rpc_createerr.cf_error.re_errno = 0;
goto err;
}
ct->ct_tsdu = tinfo.tsdu;
/*
* Create a client handle which uses xdrrec for serialization
* and authnone for authentication.
*/
ct->ct_xdrs.x_ops = NULL;
xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
read_vc, write_vc);
if (ct->ct_xdrs.x_ops == NULL) {
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_terrno = 0;
rpc_createerr.cf_error.re_errno = ENOMEM;
goto err;
}
cl->cl_ops = clnt_vc_ops();
cl->cl_private = (caddr_t)ct;
cl->cl_auth = authnone_create();
cl->cl_tp = (char *)NULL;
cl->cl_netid = (char *)NULL;
trace3(TR_clnt_vc_create, 1, prog, vers);
return (cl);
err:
if (cl) {
if (ct) {
if (ct->ct_addr.len)
mem_free(ct->ct_addr.buf, ct->ct_addr.len);
mem_free((caddr_t)ct, sizeof (struct ct_data));
}
mem_free((caddr_t)cl, sizeof (CLIENT));
}
trace3(TR_clnt_vc_create, 1, prog, vers);
return ((CLIENT *)NULL);
}
#define TCPOPT_BUFSIZE 128
/*
* Set tcp connection timeout value.
* Retun 0 for success, -1 for failure.
*/
static int
_set_tcp_conntime(int fd, int optval)
{
struct t_optmgmt req, res;
struct opthdr *opt;
int *ip;
char buf[TCPOPT_BUFSIZE];
opt = (struct opthdr *)buf;
opt->level = IPPROTO_TCP;
opt->name = TCP_CONN_ABORT_THRESHOLD;
opt->len = sizeof (int);
req.flags = T_NEGOTIATE;
req.opt.len = sizeof (struct opthdr) + opt->len;
req.opt.buf = (char *)opt;
ip = (int *)((char *)buf + sizeof (struct opthdr));
*ip = optval;
res.flags = 0;
res.opt.buf = (char *)buf;
res.opt.maxlen = sizeof (buf);
if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
return (-1);
}
return (0);
}
/*
* Get current tcp connection timeout value.
* Retun 0 for success, -1 for failure.
*/
static int
_get_tcp_conntime(int fd)
{
struct t_optmgmt req, res;
struct opthdr *opt;
int *ip, retval;
char buf[TCPOPT_BUFSIZE];
opt = (struct opthdr *)buf;
opt->level = IPPROTO_TCP;
opt->name = TCP_CONN_ABORT_THRESHOLD;
opt->len = sizeof (int);
req.flags = T_CURRENT;
req.opt.len = sizeof (struct opthdr) + opt->len;
req.opt.buf = (char *)opt;
ip = (int *)((char *)buf + sizeof (struct opthdr));
*ip = 0;
res.flags = 0;
res.opt.buf = (char *)buf;
res.opt.maxlen = sizeof (buf);
if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
return (-1);
}
ip = (int *)((char *)buf + sizeof (struct opthdr));
retval = *ip;
return (retval);
}
static bool_t
set_up_connection(fd, svcaddr, ct, tp)
int fd;
struct netbuf *svcaddr; /* servers address */
struct ct_data *ct;
struct timeval *tp;
{
int state;
struct t_call sndcallstr, *rcvcall;
int nconnect;
bool_t connected, do_rcv_connect;
int curr_time = 0;
ct->ct_addr.len = 0;
state = t_getstate(fd);
if (state == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_errno = 0;
rpc_createerr.cf_error.re_terrno = t_errno;
return (FALSE);
}
#ifdef DEBUG
fprintf(stderr, "set_up_connection: state = %d\n", state);
#endif
switch (state) {
case T_IDLE:
if (svcaddr == (struct netbuf *)NULL) {
rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
return (FALSE);
}
/*
* Connect only if state is IDLE and svcaddr known
*/
/* LINTED pointer alignment */
rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
if (rcvcall == NULL) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
return (FALSE);
}
rcvcall->udata.maxlen = 0;
sndcallstr.addr = *svcaddr;
sndcallstr.opt.len = 0;
sndcallstr.udata.len = 0;
/*
* Even NULL could have sufficed for rcvcall, because
* the address returned is same for all cases except
* for the gateway case, and hence required.
*/
connected = FALSE;
do_rcv_connect = FALSE;
/*
* If there is a timeout value specified, we will try to
* reset the tcp connection timeout. If the transport does
* not support the TCP_CONN_ABORT_THRESHOLD option or fails
* for other reason, default timeout will be used.
*/
if (tp != NULL) {
int ms;
/* TCP_CONN_ABORT_THRESHOLD takes int value in millisecs */
ms = tp->tv_sec * SECS_TO_MS + tp->tv_usec * USECS_TO_MS;
if (((curr_time = _get_tcp_conntime(fd)) != -1) &&
(_set_tcp_conntime(fd, ms) == 0)) {
#ifdef DEBUG
fprintf(stderr, "set_up_connection: set tcp ");
fprintf(stderr, "connection timeout to %d ms\n", ms);
#endif
}
}
for (nconnect = 0; nconnect < 3; nconnect++) {
if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
connected = TRUE;
break;
}
if (!(t_errno == TSYSERR && errno == EINTR)) {
break;
}
if ((state = t_getstate(fd)) == T_OUTCON) {
do_rcv_connect = TRUE;
break;
}
if (state != T_IDLE) {
break;
}
}
if (do_rcv_connect) {
do {
if (t_rcvconnect(fd, rcvcall) != -1) {
connected = TRUE;
break;
}
} while (t_errno == TSYSERR && errno == EINTR);
}
/*
* Set the connection timeout back to its old value.
*/
if (curr_time) {
_set_tcp_conntime(fd, curr_time);
}
if (!connected) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
(void) t_free((char *)rcvcall, T_CALL);
#ifdef DEBUG
fprintf(stderr, "clnt_vc: t_connect error %d\n",
rpc_createerr.cf_error.re_terrno);
#endif
return (FALSE);
}
/* Free old area if allocated */
if (ct->ct_addr.buf)
free(ct->ct_addr.buf);
ct->ct_addr = rcvcall->addr; /* To get the new address */
/* So that address buf does not get freed */
rcvcall->addr.buf = NULL;
(void) t_free((char *)rcvcall, T_CALL);
break;
case T_DATAXFER:
case T_OUTCON:
if (svcaddr == (struct netbuf *)NULL) {
/*
* svcaddr could also be NULL in cases where the
* client is already bound and connected.
*/
ct->ct_addr.len = 0;
} else {
ct->ct_addr.buf = malloc(svcaddr->len);
if (ct->ct_addr.buf == (char *)NULL) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_vc_str, __no_mem_str);
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = errno;
rpc_createerr.cf_error.re_terrno = 0;
return (FALSE);
}
(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
(int)svcaddr->len);
ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
}
break;
default:
rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
return (FALSE);
}
return (TRUE);
}
static enum clnt_stat
clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
CLIENT *cl;
rpcproc_t proc;
xdrproc_t xdr_args;
caddr_t args_ptr;
xdrproc_t xdr_results;
caddr_t results_ptr;
struct timeval timeout;
{
/* LINTED pointer alignment */
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
struct rpc_msg reply_msg;
uint32_t x_id;
/* LINTED pointer alignment */
uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
bool_t shipnow;
int refreshes = 2;
trace3(TR_clnt_vc_call, 0, cl, proc);
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_callerr.re_status = RPC_FAILED;
rpc_callerr.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
ct->ct_is_oneway = FALSE;
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED); /* XXX */
}
}
if (!ct->ct_waitset) {
/* If time is not within limits, we ignore it. */
if (time_not_ok(&timeout) == FALSE)
ct->ct_wait = __rpc_timeval_to_msec(&timeout);
} else {
timeout.tv_sec = (ct->ct_wait / 1000);
timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
}
shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
(timeout.tv_usec == 0)) ? FALSE : TRUE;
call_again:
xdrs->x_op = XDR_ENCODE;
rpc_callerr.re_status = RPC_SUCCESS;
/*
* Due to little endian byte order, it is necessary to convert to host
* format before decrementing xid.
*/
x_id = ntohl(*msg_x_id) - 1;
*msg_x_id = htonl(x_id);
if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
(! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
(! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(! xdr_args(xdrs, args_ptr))) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
} else {
/* LINTED pointer alignment */
uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
IXDR_PUT_U_INT32(u, proc);
if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
}
if (! xdrrec_endofrecord(xdrs, shipnow)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status = RPC_CANTSEND);
}
if (! shipnow) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (RPC_SUCCESS);
}
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status = RPC_TIMEDOUT);
}
/*
* Keep receiving until we get a valid transaction id
*/
xdrs->x_op = XDR_DECODE;
/*CONSTANTCONDITION*/
while (TRUE) {
reply_msg.acpted_rply.ar_verf = _null_auth;
reply_msg.acpted_rply.ar_results.where = NULL;
reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
if (! xdrrec_skiprecord(xdrs)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
/* now decode and validate the response header */
if (! xdr_replymsg(xdrs, &reply_msg)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
continue;
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
if (reply_msg.rm_xid == x_id)
break;
}
/*
* process header
*/
if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
(reply_msg.acpted_rply.ar_stat == SUCCESS))
rpc_callerr.re_status = RPC_SUCCESS;
else
__seterr_reply(&reply_msg, &(rpc_callerr));
if (rpc_callerr.re_status == RPC_SUCCESS) {
if (! AUTH_VALIDATE(cl->cl_auth,
&reply_msg.acpted_rply.ar_verf)) {
rpc_callerr.re_status = RPC_AUTHERROR;
rpc_callerr.re_why = AUTH_INVALIDRESP;
} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if (!(*xdr_results)(xdrs, results_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTDECODERES;
}
} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
results_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTDECODERES;
}
} /* end successful completion */
/*
* If unsuccesful AND error is an authentication error
* then refresh credentials and try again, else break
*/
else if (rpc_callerr.re_status == RPC_AUTHERROR) {
/* maybe our credentials need to be refreshed ... */
if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
goto call_again;
else
/*
* We are setting rpc_callerr here given that libnsl
* is not reentrant thereby reinitializing the TSD.
* If not set here then success could be returned even
* though refresh failed.
*/
rpc_callerr.re_status = RPC_AUTHERROR;
} /* end of unsuccessful completion */
/* free verifier ... */
if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
xdrs->x_op = XDR_FREE;
(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
}
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
static enum clnt_stat
clnt_vc_send(cl, proc, xdr_args, args_ptr)
CLIENT *cl;
rpcproc_t proc;
xdrproc_t xdr_args;
caddr_t args_ptr;
{
/* LINTED pointer alignment */
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
uint32_t x_id;
/* LINTED pointer alignment */
uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
trace3(TR_clnt_vc_send, 0, cl, proc);
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_callerr.re_status = RPC_FAILED;
rpc_callerr.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
ct->ct_is_oneway = TRUE;
xdrs->x_op = XDR_ENCODE;
rpc_callerr.re_status = RPC_SUCCESS;
/*
* Due to little endian byte order, it is necessary to convert to host
* format before decrementing xid.
*/
x_id = ntohl(*msg_x_id) - 1;
*msg_x_id = htonl(x_id);
if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
(! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
(! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(! xdr_args(xdrs, args_ptr))) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_send, 1, cl, proc);
return (rpc_callerr.re_status);
}
} else {
/* LINTED pointer alignment */
uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
IXDR_PUT_U_INT32(u, proc);
if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
if (rpc_callerr.re_status == RPC_SUCCESS)
rpc_callerr.re_status = RPC_CANTENCODEARGS;
(void) xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_send, 1, cl, proc);
return (rpc_callerr.re_status);
}
}
/*
* Do not need to check errors, as the following code does
* not depend on the successful completion of the call.
* An error, if any occurs, is reported through
* rpc_callerr.re_status.
*/
xdrrec_endofrecord(xdrs, TRUE);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_call, 1, cl, proc);
return (rpc_callerr.re_status);
}
static void
clnt_vc_geterr(cl, errp)
CLIENT *cl;
struct rpc_err *errp;
{
trace2(TR_clnt_vc_geterr, 0, cl);
*errp = rpc_callerr;
trace2(TR_clnt_vc_geterr, 1, cl);
}
static bool_t
clnt_vc_freeres(cl, xdr_res, res_ptr)
CLIENT *cl;
xdrproc_t xdr_res;
caddr_t res_ptr;
{
/* LINTED pointer alignment */
struct ct_data *ct = (struct ct_data *)cl->cl_private;
XDR *xdrs = &(ct->ct_xdrs);
bool_t dummy;
trace2(TR_clnt_vc_freeres, 0, cl);
rpc_fd_lock(vctbl, ct->ct_fd);
xdrs->x_op = XDR_FREE;
dummy = (*xdr_res)(xdrs, res_ptr);
rpc_fd_unlock(vctbl, ct->ct_fd);
trace2(TR_clnt_vc_freeres, 1, cl);
return (dummy);
}
static void
clnt_vc_abort(void)
{
trace1(TR_clnt_vc_abort, 0);
trace1(TR_clnt_vc_abort, 1);
}
/*ARGSUSED*/
static bool_t
clnt_vc_control(cl, request, info)
CLIENT *cl;
int request;
char *info;
{
bool_t ret;
/* LINTED pointer alignment */
struct ct_data *ct = (struct ct_data *)cl->cl_private;
trace3(TR_clnt_vc_control, 0, cl, request);
if (rpc_fd_lock(vctbl, ct->ct_fd)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (RPC_FAILED);
}
switch (request) {
case CLSET_FD_CLOSE:
ct->ct_closeit = TRUE;
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (TRUE);
case CLSET_FD_NCLOSE:
ct->ct_closeit = FALSE;
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (TRUE);
case CLFLUSH:
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
int res;
res = do_flush(ct, (info == NULL ||
*(int *)info == RPC_CL_DEFAULT_FLUSH)?
ct->ct_blocking_mode: *(int *)info);
ret = (0 == res);
}
rpc_fd_unlock(vctbl, ct->ct_fd);
return (ret);
}
/* for other requests which use info */
if (info == NULL) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
/* LINTED pointer alignment */
if (time_not_ok((struct timeval *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (FALSE);
}
/* LINTED pointer alignment */
ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
ct->ct_waitset = TRUE;
break;
case CLGET_TIMEOUT:
/* LINTED pointer alignment */
((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
/* LINTED pointer alignment */
((struct timeval *)info)->tv_usec =
(ct->ct_wait % 1000) * 1000;
break;
case CLGET_SERVER_ADDR: /* For compatibility only */
(void) memcpy(info, ct->ct_addr.buf, (int)ct->ct_addr.len);
break;
case CLGET_FD:
/* LINTED pointer alignment */
*(int *)info = ct->ct_fd;
break;
case CLGET_SVC_ADDR:
/* The caller should not free this memory area */
/* LINTED pointer alignment */
*(struct netbuf *)info = ct->ct_addr;
break;
case CLSET_SVC_ADDR: /* set to new address */
#ifdef undef
/*
* XXX: once the t_snddis(), followed by t_connect() starts to
* work, this ifdef should be removed. CLIENT handle reuse
* would then be possible for COTS as well.
*/
if (t_snddis(ct->ct_fd, NULL) == -1) {
rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_terrno = t_errno;
rpc_createerr.cf_error.re_errno = errno;
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (FALSE);
}
ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
ct, NULL));
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (ret);
#else
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (FALSE);
#endif
case CLGET_XID:
/*
* use the knowledge that xid is the
* first element in the call structure
* This will get the xid of the PREVIOUS call
*/
/* LINTED pointer alignment */
*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
break;
case CLSET_XID:
/* This will set the xid of the NEXT call */
/* LINTED pointer alignment */
*(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1);
/* increment by 1 as clnt_vc_call() decrements once */
break;
case CLGET_VERS:
/*
* This RELIES on the information that, in the call body,
* the version number field is the fifth field from the
* begining of the RPC header. MUST be changed if the
* call_struct is changed
*/
/* LINTED pointer alignment */
*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
4 * BYTES_PER_XDR_UNIT));
break;
case CLSET_VERS:
/* LINTED pointer alignment */
*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
/* LINTED pointer alignment */
htonl(*(uint32_t *)info);
break;
case CLGET_PROG:
/*
* This RELIES on the information that, in the call body,
* the program number field is the fourth field from the
* begining of the RPC header. MUST be changed if the
* call_struct is changed
*/
/* LINTED pointer alignment */
*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
3 * BYTES_PER_XDR_UNIT));
break;
case CLSET_PROG:
/* LINTED pointer alignment */
*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
/* LINTED pointer alignment */
htonl(*(uint32_t *)info);
break;
case CLSET_IO_MODE:
if (!set_io_mode(ct, *(int *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
break;
case CLSET_FLUSH_MODE:
/* Set a specific FLUSH_MODE */
if (!set_flush_mode(ct, *(int *)info)) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
break;
case CLGET_FLUSH_MODE:
*(rpcflushmode_t *)info = ct->ct_blocking_mode;
break;
case CLGET_IO_MODE:
*(rpciomode_t *)info = ct->ct_io_mode;
break;
case CLGET_CURRENT_REC_SIZE:
/*
* Returns the current amount of memory allocated
* to pending requests
*/
*(int *)info = ct->ct_bufferPendingSize;
break;
case CLSET_CONNMAXREC_SIZE:
/* Cannot resize the buffer if it is used. */
if (ct->ct_bufferPendingSize != 0) {
rpc_fd_unlock(vctbl, ct->ct_fd);
return (FALSE);
}
/*
* If the new size is equal to the current size,
* there is nothing to do.
*/
if (ct->ct_bufferSize == *(uint_t *)info)
break;
ct->ct_bufferSize = *(uint_t *)info;
if (ct->ct_buffer) {
free(ct->ct_buffer);
ct->ct_buffer = NULL;
ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
}
break;
case CLGET_CONNMAXREC_SIZE:
/*
* Returns the size of buffer allocated
* to pending requests
*/
*(uint_t *)info = ct->ct_bufferSize;
break;
default:
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (FALSE);
}
rpc_fd_unlock(vctbl, ct->ct_fd);
trace3(TR_clnt_vc_control, 1, cl, request);
return (TRUE);
}
static void
clnt_vc_destroy(cl)
CLIENT *cl;
{
/* LINTED pointer alignment */
struct ct_data *ct = (struct ct_data *)cl->cl_private;
int ct_fd = ct->ct_fd;
trace2(TR_clnt_vc_destroy, 0, cl);
rpc_fd_lock(vctbl, ct_fd);
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
do_flush(ct, RPC_CL_BLOCKING_FLUSH);
unregister_nb(ct);
}
if (ct->ct_closeit)
(void) t_close(ct_fd);
XDR_DESTROY(&(ct->ct_xdrs));
if (ct->ct_addr.buf)
(void) free(ct->ct_addr.buf);
mem_free((caddr_t)ct, sizeof (struct ct_data));
if (cl->cl_netid && cl->cl_netid[0])
mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
if (cl->cl_tp && cl->cl_tp[0])
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
mem_free((caddr_t)cl, sizeof (CLIENT));
rpc_fd_unlock(vctbl, ct_fd);
trace2(TR_clnt_vc_destroy, 1, cl);
}
/*
* Interface between xdr serializer and vc connection.
* Behaves like the system calls, read & write, but keeps some error state
* around for the rpc level.
*/
static int
read_vc(void *ct_tmp, caddr_t buf, int len)
{
static pthread_key_t pfdp_key;
struct pollfd *pfdp;
int npfd; /* total number of pfdp allocated */
struct ct_data *ct = ct_tmp;
struct timeval starttime;
struct timeval curtime;
struct timeval time_waited;
struct timeval timeout;
int poll_time;
int delta;
trace2(TR_read_vc, 0, len);
if (len == 0) {
trace2(TR_read_vc, 1, len);
return (0);
}
/*
* Allocate just one the first time. thr_get_storage() may
* return a larger buffer, left over from the last time we were
* here, but that's OK. realloc() will deal with it properly.
*/
npfd = 1;
pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
if (pfdp == NULL) {
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_read_vc_str, __no_mem_str);
rpc_callerr.re_status = RPC_SYSTEMERROR;
rpc_callerr.re_errno = errno;
rpc_callerr.re_terrno = 0;
trace2(TR_read_vc, 1, len);
return (-1);
}
/*
* N.B.: slot 0 in the pollfd array is reserved for the file
* descriptor we're really interested in (as opposed to the
* callback descriptors).
*/
pfdp[0].fd = ct->ct_fd;
pfdp[0].events = MASKVAL;
pfdp[0].revents = 0;
poll_time = ct->ct_wait;
if (gettimeofday(&starttime, (struct timezone *)NULL) == -1) {
syslog(LOG_ERR, "Unable to get time of day: %m");
return (-1);
}
for (;;) {
extern void (*_svc_getreqset_proc)();
extern pollfd_t *svc_pollfd;
extern int svc_max_pollfd;
int fds;
/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
if (_svc_getreqset_proc) {
sig_rw_rdlock(&svc_fd_lock);
/* reallocate pfdp to svc_max_pollfd +1 */
if (npfd != (svc_max_pollfd + 1)) {
struct pollfd *tmp_pfdp = realloc(pfdp,
sizeof (struct pollfd) *
(svc_max_pollfd + 1));
if (tmp_pfdp == NULL) {
sig_rw_unlock(&svc_fd_lock);
(void) syslog(LOG_ERR, clnt_vc_errstr,
clnt_read_vc_str, __no_mem_str);
rpc_callerr.re_status = RPC_SYSTEMERROR;
rpc_callerr.re_errno = errno;
rpc_callerr.re_terrno = 0;
trace2(TR_read_vc, 1, len);
return (-1);
}
pfdp = tmp_pfdp;
npfd = svc_max_pollfd + 1;
pthread_setspecific(pfdp_key, pfdp);
}
if (npfd > 1)
(void) memcpy(&pfdp[1], svc_pollfd,
sizeof (struct pollfd) * (npfd - 1));
sig_rw_unlock(&svc_fd_lock);
} else {
npfd = 1; /* don't forget about pfdp[0] */
}
switch (fds = poll(pfdp, npfd, poll_time)) {
case 0:
rpc_callerr.re_status = RPC_TIMEDOUT;
trace2(TR_read_vc, 1, len);
return (-1);
case -1:
if (errno != EINTR)
continue;
else {
/*
* interrupted by another signal,
* update time_waited
*/
if (gettimeofday(&curtime,
(struct timezone *)NULL) == -1) {
syslog(LOG_ERR,
"Unable to get time of day: %m");
errno = 0;
continue;
};
delta = (curtime.tv_sec -
starttime.tv_sec) * 1000 +
(curtime.tv_usec -
starttime.tv_usec) / 1000;
poll_time -= delta;
if (poll_time < 0) {
rpc_callerr.re_status =
RPC_TIMEDOUT;
errno = 0;
trace2(TR_read_vc, 1, len);
return (-1);
} else {
errno = 0; /* reset it */
continue;
}
}
}
if (pfdp[0].revents == 0) {
/* must be for server side of the house */
(*_svc_getreqset_proc)(&pfdp[1], fds);
continue; /* do poll again */
}
if (pfdp[0].revents & POLLNVAL) {
rpc_callerr.re_status = RPC_CANTRECV;
/*
* Note: we're faking errno here because we
* previously would have expected select() to
* return -1 with errno EBADF. Poll(BA_OS)
* returns 0 and sets the POLLNVAL revents flag
* instead.
*/
rpc_callerr.re_errno = errno = EBADF;
trace2(TR_read_vc, 1, len);
return (-1);
}
if (pfdp[0].revents & (POLLERR | POLLHUP)) {
rpc_callerr.re_status = RPC_CANTRECV;
rpc_callerr.re_errno = errno = EPIPE;
trace2(TR_read_vc, 1, len);
return (-1);
}
break;
}
switch (len = t_rcvall(ct->ct_fd, buf, len)) {
case 0:
/* premature eof */
rpc_callerr.re_errno = ENOLINK;
rpc_callerr.re_terrno = 0;
rpc_callerr.re_status = RPC_CANTRECV;
len = -1; /* it's really an error */
break;
case -1:
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTRECV;
break;
}
trace2(TR_read_vc, 1, len);
return (len);
}
static int
write_vc(ct_tmp, buf, len)
void *ct_tmp;
caddr_t buf;
int len;
{
int i, cnt;
struct ct_data *ct = ct_tmp;
int flag;
int maxsz;
trace2(TR_write_vc, 0, len);
maxsz = ct->ct_tsdu;
/* Handle the non-blocking mode */
if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
/*
* Test a special case here. If the length of the current
* write is greater than the transport data unit, and the
* mode is non blocking, we return RPC_CANTSEND.
* XXX this is not very clean.
*/
if (maxsz > 0 && len > maxsz) {
rpc_callerr.re_terrno = errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
return (-1);
}
len = nb_send(ct, buf, (unsigned)len);
if (len == -1) {
rpc_callerr.re_terrno = errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
} else if (len == -2) {
rpc_callerr.re_terrno = 0;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSTORE;
}
trace2(TR_write_vc, 1, len);
return (len);
}
if ((maxsz == 0) || (maxsz == -1)) {
/*
* T_snd may return -1 for error on connection (connection
* needs to be repaired/closed, and -2 for flow-control
* handling error (no operation to do, just wait and call
* T_Flush()).
*/
if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
}
trace2(TR_write_vc, 1, len);
return (len);
}
/*
* This for those transports which have a max size for data.
*/
for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
flag = cnt > maxsz ? T_MORE : 0;
if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
flag)) == -1) {
rpc_callerr.re_terrno = t_errno;
rpc_callerr.re_errno = 0;
rpc_callerr.re_status = RPC_CANTSEND;
trace2(TR_write_vc, 1, len);
return (-1);
}
}
trace2(TR_write_vc, 1, len);
return (len);
}
/*
* Receive the required bytes of data, even if it is fragmented.
*/
static int
t_rcvall(fd, buf, len)
int fd;
char *buf;
int len;
{
int moreflag;
int final = 0;
int res;
trace3(TR_t_rcvall, 0, fd, len);
do {
moreflag = 0;
res = t_rcv(fd, buf, (unsigned)len, &moreflag);
if (res == -1) {
if (t_errno == TLOOK)
switch (t_look(fd)) {
case T_DISCONNECT:
t_rcvdis(fd, NULL);
t_snddis(fd, NULL);
trace3(TR_t_rcvall, 1, fd, len);
return (-1);
case T_ORDREL:
/* Received orderly release indication */
t_rcvrel(fd);
/* Send orderly release indicator */
(void) t_sndrel(fd);
trace3(TR_t_rcvall, 1, fd, len);
return (-1);
default:
trace3(TR_t_rcvall, 1, fd, len);
return (-1);
}
} else if (res == 0) {
trace3(TR_t_rcvall, 1, fd, len);
return (0);
}
final += res;
buf += res;
len -= res;
} while ((len > 0) && (moreflag & T_MORE));
trace3(TR_t_rcvall, 1, fd, len);
return (final);
}
static struct clnt_ops *
clnt_vc_ops(void)
{
static struct clnt_ops ops;
extern mutex_t ops_lock;
/* VARIABLES PROTECTED BY ops_lock: ops */
trace1(TR_clnt_vc_ops, 0);
sig_mutex_lock(&ops_lock);
if (ops.cl_call == NULL) {
ops.cl_call = clnt_vc_call;
ops.cl_send = clnt_vc_send;
ops.cl_abort = clnt_vc_abort;
ops.cl_geterr = clnt_vc_geterr;
ops.cl_freeres = clnt_vc_freeres;
ops.cl_destroy = clnt_vc_destroy;
ops.cl_control = clnt_vc_control;
}
sig_mutex_unlock(&ops_lock);
trace1(TR_clnt_vc_ops, 1);
return (&ops);
}
/*
* Make sure that the time is not garbage. -1 value is disallowed.
* Note this is different from time_not_ok in clnt_dg.c
*/
static bool_t
time_not_ok(t)
struct timeval *t;
{
trace1(TR_time_not_ok, 0);
trace1(TR_time_not_ok, 1);
return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
t->tv_usec <= -1 || t->tv_usec > 1000000);
}
/* Compute the # of bytes that remains until the end of the buffer */
#define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
static int
addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
{
if (NULL == ct->ct_buffer) {
/* Buffer not allocated yet. */
char *buffer;
buffer = (char *)malloc(ct->ct_bufferSize);
if (NULL == buffer) {
errno = ENOMEM;
return (-1);
}
memcpy(buffer, dataToAdd, nBytes);
ct->ct_buffer = buffer;
ct->ct_bufferReadPtr = buffer;
ct->ct_bufferWritePtr = buffer + nBytes;
ct->ct_bufferPendingSize = nBytes;
} else {
/*
* For an already allocated buffer, two mem copies
* might be needed, depending on the current
* writing position.
*/
/* Compute the length of the first copy. */
int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
ct->ct_bufferPendingSize += nBytes;
memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
ct->ct_bufferWritePtr += len;
nBytes -= len;
if (0 == nBytes) {
/* One memcopy needed. */
/*
* If the write pointer is at the end of the buffer,
* wrap it now.
*/
if (ct->ct_bufferWritePtr ==
(ct->ct_buffer + ct->ct_bufferSize)) {
ct->ct_bufferWritePtr = ct->ct_buffer;
}
} else {
/* Two memcopy needed. */
dataToAdd += len;
/*
* Copy the remaining data to the beginning of the
* buffer
*/
memcpy(ct->ct_buffer, dataToAdd, nBytes);
ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
}
}
return (0);
}
static void
getFromBuffer(struct ct_data *ct, char **data, unsigned int *nBytes)
{
int len = MIN(ct->ct_bufferPendingSize, REMAIN_BYTES(bufferReadPtr));
*data = ct->ct_bufferReadPtr;
*nBytes = len;
}
static void
consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
{
ct->ct_bufferPendingSize -= nBytes;
if (ct->ct_bufferPendingSize == 0) {
/*
* If the buffer contains no data, we set the two pointers at
* the beginning of the buffer (to miminize buffer wraps).
*/
ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
} else {
ct->ct_bufferReadPtr += nBytes;
if (ct->ct_bufferReadPtr >
ct->ct_buffer + ct->ct_bufferSize) {
ct->ct_bufferReadPtr -= ct->ct_bufferSize;
}
}
}
static int
iovFromBuffer(struct ct_data *ct, struct iovec *iov)
{
int l;
if (ct->ct_bufferPendingSize == 0)
return (0);
l = REMAIN_BYTES(bufferReadPtr);
if (l < ct->ct_bufferPendingSize) {
/* Buffer in two fragments. */
iov[0].iov_base = ct->ct_bufferReadPtr;
iov[0].iov_len = l;
iov[1].iov_base = ct->ct_buffer;
iov[1].iov_len = ct->ct_bufferPendingSize - l;
return (2);
} else {
/* Buffer in one fragment. */
iov[0].iov_base = ct->ct_bufferReadPtr;
iov[0].iov_len = ct->ct_bufferPendingSize;
return (1);
}
}
static bool_t
set_flush_mode(struct ct_data *ct, int mode)
{
switch (mode) {
case RPC_CL_BLOCKING_FLUSH:
/* flush as most as possible without blocking */
case RPC_CL_BESTEFFORT_FLUSH:
/* flush the buffer completely (possibly blocking) */
case RPC_CL_DEFAULT_FLUSH:
/* flush according to the currently defined policy */
ct->ct_blocking_mode = mode;
return (TRUE);
default:
return (FALSE);
}
}
static bool_t
set_io_mode(struct ct_data *ct, int ioMode)
{
switch (ioMode) {
case RPC_CL_BLOCKING:
if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
if (NULL != ct->ct_buffer) {
/*
* If a buffer was allocated for this
* connection, flush it now, and free it.
*/
do_flush(ct, RPC_CL_BLOCKING_FLUSH);
free(ct->ct_buffer);
ct->ct_buffer = NULL;
}
unregister_nb(ct);
ct->ct_io_mode = ioMode;
}
break;
case RPC_CL_NONBLOCKING:
if (ct->ct_io_mode == RPC_CL_BLOCKING) {
if (-1 == register_nb(ct)) {
return (FALSE);
}
ct->ct_io_mode = ioMode;
}
break;
default:
return (FALSE);
}
return (TRUE);
}
static int
do_flush(struct ct_data *ct, uint_t flush_mode)
{
int result;
if (ct->ct_bufferPendingSize == 0) {
return (0);
}
switch (flush_mode) {
case RPC_CL_BLOCKING_FLUSH:
if (!set_blocking_connection(ct, TRUE)) {
return (-1);
}
while (ct->ct_bufferPendingSize > 0) {
if (REMAIN_BYTES(bufferReadPtr) <
ct->ct_bufferPendingSize) {
struct iovec iov[2];
iovFromBuffer(ct, iov);
result = writev(ct->ct_fd, iov, 2);
} else {
result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
ct->ct_bufferPendingSize, 0);
}
if (result < 0) {
return (-1);
}
consumeFromBuffer(ct, result);
}
break;
case RPC_CL_BESTEFFORT_FLUSH:
set_blocking_connection(ct, FALSE);
if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
struct iovec iov[2];
iovFromBuffer(ct, iov);
result = writev(ct->ct_fd, iov, 2);
} else {
result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
ct->ct_bufferPendingSize, 0);
}
if (result < 0) {
if (errno != EWOULDBLOCK) {
perror("flush");
return (-1);
}
return (0);
}
if (result > 0)
consumeFromBuffer(ct, result);
break;
}
return (0);
}
/*
* Non blocking send.
*/
static int
nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
{
int result;
if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
return (-1);
}
/*
* Check to see if the current message can be stored fully in the
* buffer. We have to check this now because it may be impossible
* to send any data, so the message must be stored in the buffer.
*/
if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
/* Try to flush (to free some space). */
do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
/* Can we store the message now ? */
if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
return (-2);
}
set_blocking_connection(ct, FALSE);
/*
* If there is no data pending, we can simply try
* to send our data.
*/
if (ct->ct_bufferPendingSize == 0) {
result = t_snd(ct->ct_fd, buff, nBytes, 0);
if (result == -1) {
if (errno == EWOULDBLOCK) {
result = 0;
} else {
perror("send");
return (-1);
}
}
/*
* If we have not sent all data, we must store them
* in the buffer.
*/
if (result != nBytes) {
if (addInBuffer(ct, (char *)buff + result,
nBytes - result) == -1) {
return (-1);
}
}
} else {
/*
* Some data pending in the buffer. We try to send
* both buffer data and current message in one shot.
*/
struct iovec iov[3];
int i = iovFromBuffer(ct, &iov[0]);
iov[i].iov_base = buff;
iov[i].iov_len = nBytes;
result = writev(ct->ct_fd, iov, i+1);
if (result == -1) {
if (errno == EWOULDBLOCK) {
/* No bytes sent */
result = 0;
} else {
return (-1);
}
}
/*
* Add the bytes from the message
* that we have not sent.
*/
if (result <= ct->ct_bufferPendingSize) {
/* No bytes from the message sent */
consumeFromBuffer(ct, result);
if (addInBuffer(ct, buff, nBytes) == -1) {
return (-1);
}
} else {
/*
* Some bytes of the message are sent.
* Compute the length of the message that has
* been sent.
*/
int len = result - ct->ct_bufferPendingSize;
/* So, empty the buffer. */
ct->ct_bufferReadPtr = ct->ct_buffer;
ct->ct_bufferWritePtr = ct->ct_buffer;
ct->ct_bufferPendingSize = 0;
/* And add the remaining part of the message. */
if (len != nBytes) {
if (addInBuffer(ct, (char *)buff + len,
nBytes-len) == -1) {
return (-1);
}
}
}
}
return (nBytes);
}
static void
flush_registered_clients()
{
struct nb_reg_node *node;
if (LIST_ISEMPTY(nb_first)) {
return;
}
LIST_FOR_EACH(nb_first, node) {
do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
}
}
static int
allocate_chunk()
{
#define CHUNK_SIZE 16
struct nb_reg_node *chk = (struct nb_reg_node *)
malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
struct nb_reg_node *n;
int i;
if (NULL == chk) {
return (-1);
}
n = chk;
for (i = 0; i < CHUNK_SIZE-1; ++i) {
n[i].next = &(n[i+1]);
}
n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
nb_free = chk;
return (0);
}
static int
register_nb(struct ct_data *ct)
{
struct nb_reg_node *node;
mutex_lock(&nb_list_mutex);
if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
mutex_unlock(&nb_list_mutex);
errno = ENOMEM;
return (-1);
}
if (!exit_handler_set) {
atexit(flush_registered_clients);
exit_handler_set = TRUE;
}
/* Get the first free node */
LIST_EXTRACT(nb_free, node);
node->ct = ct;
LIST_ADD(nb_first, node);
mutex_unlock(&nb_list_mutex);
return (0);
}
static int
unregister_nb(struct ct_data *ct)
{
struct nb_reg_node *node;
mutex_lock(&nb_list_mutex);
assert(! LIST_ISEMPTY(nb_first));
node = nb_first;
LIST_FOR_EACH(nb_first, node) {
if (node->next->ct == ct) {
/* Get the node to unregister. */
struct nb_reg_node *n = node->next;
node->next = n->next;
n->ct = NULL;
LIST_ADD(nb_free, n);
break;
}
}
mutex_unlock(&nb_list_mutex);
return (0);
}