2N/A/*
2N/A * CDDL HEADER START
2N/A *
2N/A * The contents of this file are subject to the terms of the
2N/A * Common Development and Distribution License, Version 1.0 only
2N/A * (the "License"). You may not use this file except in compliance
2N/A * with the License.
2N/A *
2N/A * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
2N/A * or http://www.opensolaris.org/os/licensing.
2N/A * See the License for the specific language governing permissions
2N/A * and limitations under the License.
2N/A *
2N/A * When distributing Covered Code, include this CDDL HEADER in each
2N/A * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
2N/A * If applicable, add the following below this CDDL HEADER, with the
2N/A * fields enclosed by brackets "[]" replaced with your own identifying
2N/A * information: Portions Copyright [yyyy] [name of copyright owner]
2N/A *
2N/A * CDDL HEADER END
2N/A */
2N/A/*
2N/A * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
2N/A * Use is subject to license terms.
2N/A */
2N/A
2N/A#pragma ident "%Z%%M% %I% %E% SMI"
2N/A
2N/A#include <stdio.h>
2N/A#include <stdlib.h>
2N/A#include <syslog.h>
2N/A#include <string.h>
2N/A#include <thread.h>
2N/A#include <synch.h>
2N/A#include <slp-internal.h>
2N/A
2N/A/* This is used to pass needed params to consumer_thr and slp_call */
2N/Astruct thr_call_args {
2N/A slp_handle_impl_t *hp;
2N/A SLPGenericAppCB *cb;
2N/A void *cookie;
2N/A SLPMsgReplyCB *msg_cb;
2N/A slp_target_list_t *targets;
2N/A};
2N/A
2N/Astatic SLPError consumer(void *);
2N/Astatic void slp_call(void *);
2N/Astatic SLPError check_message_fit(slp_handle_impl_t *, slp_target_list_t *);
2N/A
2N/ASLPError slp_ua_common(SLPHandle hSLP, const char *scopes,
2N/A SLPGenericAppCB cb, void *cookie,
2N/A SLPMsgReplyCB msg_cb) {
2N/A slp_handle_impl_t *hp;
2N/A slp_target_list_t *targets;
2N/A struct thr_call_args *args;
2N/A slp_queue_t *q;
2N/A SLPError err;
2N/A thread_t tid;
2N/A int terr;
2N/A
2N/A hp = (slp_handle_impl_t *)hSLP;
2N/A
2N/A /* select targets */
2N/A if ((err = slp_new_target_list(hp, scopes, &targets)) != SLP_OK)
2N/A return (err);
2N/A if ((err = check_message_fit(hp, targets)) != SLP_OK) {
2N/A slp_destroy_target_list(targets);
2N/A return (err);
2N/A }
2N/A
2N/A /* populate the args structure */
2N/A args = malloc(sizeof (*args));
2N/A if (args == NULL) {
2N/A slp_err(LOG_CRIT, 0, "ua_common", "out of memory");
2N/A return (SLP_MEMORY_ALLOC_FAILED);
2N/A }
2N/A
2N/A args->hp = hp;
2N/A args->cb = cb;
2N/A args->cookie = cookie;
2N/A args->msg_cb = msg_cb;
2N/A args->targets = targets;
2N/A
2N/A /* create the queue that this call will use */
2N/A q = slp_new_queue(&err); /* freed in consumer_thr */
2N/A if (err != SLP_OK)
2N/A goto error;
2N/A hp->q = q;
2N/A
2N/A /* kick off the producer thread */
2N/A if ((terr = thr_create(
2N/A NULL, 0, (void *(*)(void *)) slp_call, args, 0, &tid)) != 0) {
2N/A slp_err(LOG_CRIT, 0, "ua_common", "could not start thread: %s",
2N/A strerror(terr));
2N/A err = SLP_INTERNAL_SYSTEM_ERROR;
2N/A goto error;
2N/A }
2N/A hp->producer_tid = tid;
2N/A
2N/A if (hp->async) {
2N/A /* kick off the consumer thread */
2N/A if ((terr = thr_create(
2N/A NULL, 0, (void *(*)(void *))consumer,
2N/A args, 0, NULL)) != 0) {
2N/A slp_err(LOG_CRIT, 0, "ua_common",
2N/A "could not start thread: %s",
2N/A strerror(terr));
2N/A err = SLP_INTERNAL_SYSTEM_ERROR;
2N/A /* cleanup producer thread, if necessary */
2N/A hp->cancel = 1;
2N/A (void) thr_join(tid, NULL, NULL);
2N/A
2N/A goto error;
2N/A }
2N/A return (SLP_OK);
2N/A }
2N/A /* else sync */
2N/A return (consumer(args));
2N/Aerror:
2N/A free(args);
2N/A return (err);
2N/A}
2N/A
2N/Astatic SLPError consumer(void *ap) {
2N/A slp_handle_impl_t *hp;
2N/A char *reply;
2N/A void *collator;
2N/A int numResults = 0;
2N/A struct thr_call_args *args = (struct thr_call_args *)ap;
2N/A
2N/A hp = args->hp;
2N/A collator = NULL;
2N/A hp->consumer_tid = thr_self();
2N/A /* while cb wants more and there is more to get ... */
2N/A for (;;) {
2N/A SLPBoolean cont;
2N/A
2N/A reply = slp_dequeue(hp->q);
2N/A /* reply == NULL if no more available or SLPClosed */
2N/A cont = args->msg_cb(hp, reply, args->cb, args->cookie,
2N/A &collator, &numResults);
2N/A
2N/A if (reply) {
2N/A free(reply);
2N/A } else {
2N/A break;
2N/A }
2N/A
2N/A if (!cont) {
2N/A /* cb doesn't want any more; invoke last call */
2N/A args->msg_cb(hp, NULL, args->cb, args->cookie,
2N/A &collator, &numResults);
2N/A break;
2N/A }
2N/A }
2N/A /* cleanup */
2N/A /* clean stop producer [thread] */
2N/A hp->cancel = 1;
2N/A (void) thr_join(hp->producer_tid, NULL, NULL);
2N/A
2N/A /* empty and free queue */
2N/A slp_flush_queue(hp->q, free);
2N/A slp_destroy_queue(hp->q);
2N/A
2N/A free(args);
2N/A slp_end_call(hp);
2N/A return (SLP_OK);
2N/A}
2N/A
2N/A/*
2N/A * This is the producer thread
2N/A */
2N/Astatic void slp_call(void *ap) {
2N/A struct thr_call_args *args = (struct thr_call_args *)ap;
2N/A slp_target_t *t;
2N/A const char *uc_scopes, *mc_scopes;
2N/A SLPBoolean use_tcp = SLP_FALSE;
2N/A size_t len;
2N/A
2N/A /* Unicast */
2N/A if (uc_scopes = slp_get_uc_scopes(args->targets)) {
2N/A size_t mtu;
2N/A int i;
2N/A
2N/A /* calculate msg length */
2N/A len = slp_hdrlang_length(args->hp);
2N/A for (i = 0; i < args->hp->msg.iovlen; i++) {
2N/A len += args->hp->msg.iov[i].iov_len;
2N/A }
2N/A len += strlen(uc_scopes);
2N/A
2N/A mtu = slp_get_mtu();
2N/A if (len > mtu)
2N/A use_tcp = SLP_TRUE;
2N/A
2N/A for (
2N/A t = slp_next_uc_target(args->targets);
2N/A t;
2N/A t = slp_next_uc_target(args->targets)) {
2N/A if (args->hp->cancel)
2N/A break;
2N/A
2N/A if (use_tcp)
2N/A slp_uc_tcp_send(args->hp, t, uc_scopes,
2N/A SLP_FALSE, 0);
2N/A else
2N/A slp_uc_udp_send(args->hp, t, uc_scopes);
2N/A }
2N/A }
2N/A
2N/A /* Multicast */
2N/A if ((!args->hp->cancel) &&
2N/A (mc_scopes = slp_get_mc_scopes(args->targets)))
2N/A slp_mc_send(args->hp, mc_scopes);
2N/A
2N/A /* Wait for TCP to complete, if necessary */
2N/A if (args->hp->tcp_lock)
2N/A slp_tcp_wait(args->hp);
2N/A
2N/A slp_destroy_target_list(args->targets);
2N/A
2N/A /* free the message */
2N/A free(args->hp->msg.iov);
2N/A free(args->hp->msg.msg);
2N/A
2N/A /* null terminate message queue */
2N/A (void) slp_enqueue(args->hp->q, NULL);
2N/A
2N/A thr_exit(NULL); /* we're outa here */
2N/A}
2N/A
2N/A/*
2N/A * If the message to be sent needs to be multicast, check that it
2N/A * can fit into a datagram. If not, return BUFFER_OVERFLOW, otherwise
2N/A * return SLP_OK.
2N/A */
2N/Astatic SLPError check_message_fit(slp_handle_impl_t *hp,
2N/A slp_target_list_t *targets) {
2N/A size_t msgSize;
2N/A int i;
2N/A const char *mc_scopes;
2N/A
2N/A if (!(mc_scopes = slp_get_mc_scopes(targets)))
2N/A return (SLP_OK); /* no mc targets to worry about */
2N/A
2N/A msgSize = slp_hdrlang_length(hp);
2N/A for (i = 0; i < hp->msg.iovlen; i++) {
2N/A msgSize += hp->msg.iov[i].iov_len;
2N/A }
2N/A msgSize += strlen(mc_scopes);
2N/A
2N/A if (msgSize > slp_get_mtu())
2N/A return (SLP_BUFFER_OVERFLOW);
2N/A return (SLP_OK);
2N/A}