2N/A/*
2N/A * CDDL HEADER START
2N/A *
2N/A * The contents of this file are subject to the terms of the
2N/A * Common Development and Distribution License (the "License").
2N/A * You may not use this file except in compliance with the License.
2N/A *
2N/A * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
2N/A * or http://www.opensolaris.org/os/licensing.
2N/A * See the License for the specific language governing permissions
2N/A * and limitations under the License.
2N/A *
2N/A * When distributing Covered Code, include this CDDL HEADER in each
2N/A * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
2N/A * If applicable, add the following below this CDDL HEADER, with the
2N/A * fields enclosed by brackets "[]" replaced with your own identifying
2N/A * information: Portions Copyright [yyyy] [name of copyright owner]
2N/A *
2N/A * CDDL HEADER END
2N/A */
2N/A
2N/A/*
2N/A * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
2N/A * Use is subject to license terms.
2N/A */
2N/A
2N/A#pragma ident "%Z%%M% %I% %E% SMI"
2N/A
2N/A#include "lint.h"
2N/A#include "thr_uberdata.h"
2N/A#include <stdlib.h>
2N/A#include <signal.h>
2N/A#include <errno.h>
2N/A#include "thread_pool_impl.h"
2N/A
2N/Astatic mutex_t thread_pool_lock = DEFAULTMUTEX;
2N/Astatic tpool_t *thread_pools = NULL;
2N/A
2N/Astatic void
2N/Adelete_pool(tpool_t *tpool)
2N/A{
2N/A tpool_job_t *job;
2N/A
2N/A ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
2N/A
2N/A /*
2N/A * Unlink the pool from the global list of all pools.
2N/A */
2N/A lmutex_lock(&thread_pool_lock);
2N/A if (thread_pools == tpool)
2N/A thread_pools = tpool->tp_forw;
2N/A if (thread_pools == tpool)
2N/A thread_pools = NULL;
2N/A else {
2N/A tpool->tp_back->tp_forw = tpool->tp_forw;
2N/A tpool->tp_forw->tp_back = tpool->tp_back;
2N/A }
2N/A lmutex_unlock(&thread_pool_lock);
2N/A
2N/A /*
2N/A * There should be no pending jobs, but just in case...
2N/A */
2N/A for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
2N/A tpool->tp_head = job->tpj_next;
2N/A lfree(job, sizeof (*job));
2N/A }
2N/A (void) pthread_attr_destroy(&tpool->tp_attr);
2N/A lfree(tpool, sizeof (*tpool));
2N/A}
2N/A
2N/A/*
2N/A * Worker thread is terminating.
2N/A */
2N/Astatic void
2N/Aworker_cleanup(tpool_t *tpool)
2N/A{
2N/A ASSERT(MUTEX_HELD(&tpool->tp_mutex));
2N/A
2N/A if (--tpool->tp_current == 0 &&
2N/A (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
2N/A if (tpool->tp_flags & TP_ABANDON) {
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A delete_pool(tpool);
2N/A return;
2N/A }
2N/A if (tpool->tp_flags & TP_DESTROY)
2N/A (void) cond_broadcast(&tpool->tp_busycv);
2N/A }
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A}
2N/A
2N/Astatic void
2N/Anotify_waiters(tpool_t *tpool)
2N/A{
2N/A if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
2N/A tpool->tp_flags &= ~TP_WAIT;
2N/A (void) cond_broadcast(&tpool->tp_waitcv);
2N/A }
2N/A}
2N/A
2N/A/*
2N/A * Called by a worker thread on return from a tpool_dispatch()d job.
2N/A */
2N/Astatic void
2N/Ajob_cleanup(tpool_t *tpool)
2N/A{
2N/A pthread_t my_tid = pthread_self();
2N/A tpool_active_t *activep;
2N/A tpool_active_t **activepp;
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A /* CSTYLED */
2N/A for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
2N/A activep = *activepp;
2N/A if (activep->tpa_tid == my_tid) {
2N/A *activepp = activep->tpa_next;
2N/A break;
2N/A }
2N/A }
2N/A if (tpool->tp_flags & TP_WAIT)
2N/A notify_waiters(tpool);
2N/A}
2N/A
2N/Astatic void *
2N/Atpool_worker(void *arg)
2N/A{
2N/A tpool_t *tpool = (tpool_t *)arg;
2N/A int elapsed;
2N/A tpool_job_t *job;
2N/A void (*func)(void *);
2N/A tpool_active_t active;
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A pthread_cleanup_push(worker_cleanup, tpool);
2N/A
2N/A /*
2N/A * This is the worker's main loop.
2N/A * It will only be left if a timeout or an error has occured.
2N/A */
2N/A active.tpa_tid = pthread_self();
2N/A for (;;) {
2N/A elapsed = 0;
2N/A tpool->tp_idle++;
2N/A if (tpool->tp_flags & TP_WAIT)
2N/A notify_waiters(tpool);
2N/A while ((tpool->tp_head == NULL ||
2N/A (tpool->tp_flags & TP_SUSPEND)) &&
2N/A !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
2N/A if (tpool->tp_current <= tpool->tp_minimum ||
2N/A tpool->tp_linger == 0) {
2N/A (void) sig_cond_wait(&tpool->tp_workcv,
2N/A &tpool->tp_mutex);
2N/A } else {
2N/A timestruc_t timeout;
2N/A
2N/A timeout.tv_sec = tpool->tp_linger;
2N/A timeout.tv_nsec = 0;
2N/A if (sig_cond_reltimedwait(&tpool->tp_workcv,
2N/A &tpool->tp_mutex, &timeout) != 0) {
2N/A elapsed = 1;
2N/A break;
2N/A }
2N/A }
2N/A }
2N/A tpool->tp_idle--;
2N/A if (tpool->tp_flags & TP_DESTROY)
2N/A break;
2N/A if (tpool->tp_flags & TP_ABANDON) {
2N/A /* can't abandon a suspended pool */
2N/A if (tpool->tp_flags & TP_SUSPEND) {
2N/A tpool->tp_flags &= ~TP_SUSPEND;
2N/A (void) cond_broadcast(&tpool->tp_workcv);
2N/A }
2N/A if (tpool->tp_head == NULL)
2N/A break;
2N/A }
2N/A if ((job = tpool->tp_head) != NULL &&
2N/A !(tpool->tp_flags & TP_SUSPEND)) {
2N/A elapsed = 0;
2N/A func = job->tpj_func;
2N/A arg = job->tpj_arg;
2N/A tpool->tp_head = job->tpj_next;
2N/A if (job == tpool->tp_tail)
2N/A tpool->tp_tail = NULL;
2N/A tpool->tp_njobs--;
2N/A active.tpa_next = tpool->tp_active;
2N/A tpool->tp_active = &active;
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A pthread_cleanup_push(job_cleanup, tpool);
2N/A lfree(job, sizeof (*job));
2N/A /*
2N/A * Call the specified function.
2N/A */
2N/A func(arg);
2N/A /*
2N/A * We don't know what this thread has been doing,
2N/A * so we reset its signal mask and cancellation
2N/A * state back to the initial values.
2N/A */
2N/A (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
2N/A (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
2N/A NULL);
2N/A (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
2N/A NULL);
2N/A pthread_cleanup_pop(1);
2N/A }
2N/A if (elapsed && tpool->tp_current > tpool->tp_minimum) {
2N/A /*
2N/A * We timed out and there is no work to be done
2N/A * and the number of workers exceeds the minimum.
2N/A * Exit now to reduce the size of the pool.
2N/A */
2N/A break;
2N/A }
2N/A }
2N/A pthread_cleanup_pop(1);
2N/A return (arg);
2N/A}
2N/A
2N/A/*
2N/A * Create a worker thread, with all signals blocked.
2N/A */
2N/Astatic int
2N/Acreate_worker(tpool_t *tpool)
2N/A{
2N/A sigset_t oset;
2N/A int error;
2N/A
2N/A (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2N/A error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
2N/A (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
2N/A return (error);
2N/A}
2N/A
2N/Atpool_t *
2N/Atpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
2N/A pthread_attr_t *attr)
2N/A{
2N/A tpool_t *tpool;
2N/A void *stackaddr;
2N/A size_t stacksize;
2N/A size_t minstack;
2N/A int error;
2N/A
2N/A if (min_threads > max_threads || max_threads < 1) {
2N/A errno = EINVAL;
2N/A return (NULL);
2N/A }
2N/A if (attr != NULL) {
2N/A if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
2N/A errno = EINVAL;
2N/A return (NULL);
2N/A }
2N/A /*
2N/A * Allow only one thread in the pool with a specified stack.
2N/A * Require threads to have at least the minimum stack size.
2N/A */
2N/A minstack = thr_min_stack();
2N/A if (stackaddr != NULL) {
2N/A if (stacksize < minstack || max_threads != 1) {
2N/A errno = EINVAL;
2N/A return (NULL);
2N/A }
2N/A } else if (stacksize != 0 && stacksize < minstack) {
2N/A errno = EINVAL;
2N/A return (NULL);
2N/A }
2N/A }
2N/A
2N/A tpool = lmalloc(sizeof (*tpool));
2N/A if (tpool == NULL) {
2N/A errno = ENOMEM;
2N/A return (NULL);
2N/A }
2N/A (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
2N/A tpool->tp_minimum = min_threads;
2N/A tpool->tp_maximum = max_threads;
2N/A tpool->tp_linger = linger;
2N/A
2N/A /*
2N/A * We cannot just copy the attribute pointer.
2N/A * We need to initialize a new pthread_attr_t structure
2N/A * with the values from the user-supplied pthread_attr_t.
2N/A * If the attribute pointer is NULL, we need to initialize
2N/A * the new pthread_attr_t structure with default values.
2N/A */
2N/A error = pthread_attr_clone(&tpool->tp_attr, attr);
2N/A if (error) {
2N/A lfree(tpool, sizeof (*tpool));
2N/A errno = error;
2N/A return (NULL);
2N/A }
2N/A
2N/A /* make all pool threads be detached daemon threads */
2N/A (void) pthread_attr_setdetachstate(&tpool->tp_attr,
2N/A PTHREAD_CREATE_DETACHED);
2N/A (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
2N/A PTHREAD_CREATE_DAEMON_NP);
2N/A
2N/A /* insert into the global list of all thread pools */
2N/A lmutex_lock(&thread_pool_lock);
2N/A if (thread_pools == NULL) {
2N/A tpool->tp_forw = tpool;
2N/A tpool->tp_back = tpool;
2N/A thread_pools = tpool;
2N/A } else {
2N/A thread_pools->tp_back->tp_forw = tpool;
2N/A tpool->tp_forw = thread_pools;
2N/A tpool->tp_back = thread_pools->tp_back;
2N/A thread_pools->tp_back = tpool;
2N/A }
2N/A lmutex_unlock(&thread_pool_lock);
2N/A
2N/A return (tpool);
2N/A}
2N/A
2N/A/*
2N/A * Dispatch a work request to the thread pool.
2N/A * If there are idle workers, awaken one.
2N/A * Else, if the maximum number of workers has
2N/A * not been reached, spawn a new worker thread.
2N/A * Else just return with the job added to the queue.
2N/A */
2N/Aint
2N/Atpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
2N/A{
2N/A tpool_job_t *job;
2N/A
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A if ((job = lmalloc(sizeof (*job))) == NULL)
2N/A return (-1);
2N/A job->tpj_next = NULL;
2N/A job->tpj_func = func;
2N/A job->tpj_arg = arg;
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A
2N/A if (tpool->tp_head == NULL)
2N/A tpool->tp_head = job;
2N/A else
2N/A tpool->tp_tail->tpj_next = job;
2N/A tpool->tp_tail = job;
2N/A tpool->tp_njobs++;
2N/A
2N/A if (!(tpool->tp_flags & TP_SUSPEND)) {
2N/A if (tpool->tp_idle > 0)
2N/A (void) cond_signal(&tpool->tp_workcv);
2N/A else if (tpool->tp_current < tpool->tp_maximum &&
2N/A create_worker(tpool) == 0)
2N/A tpool->tp_current++;
2N/A }
2N/A
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A return (0);
2N/A}
2N/A
2N/A/*
2N/A * Assumes: by the time tpool_destroy() is called no one will use this
2N/A * thread pool in any way and no one will try to dispatch entries to it.
2N/A * Calling tpool_destroy() from a job in the pool will cause deadlock.
2N/A */
2N/Avoid
2N/Atpool_destroy(tpool_t *tpool)
2N/A{
2N/A tpool_active_t *activep;
2N/A
2N/A ASSERT(!tpool_member(tpool));
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
2N/A
2N/A /* mark the pool as being destroyed; wakeup idle workers */
2N/A tpool->tp_flags |= TP_DESTROY;
2N/A tpool->tp_flags &= ~TP_SUSPEND;
2N/A (void) cond_broadcast(&tpool->tp_workcv);
2N/A
2N/A /* cancel all active workers */
2N/A for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
2N/A (void) pthread_cancel(activep->tpa_tid);
2N/A
2N/A /* wait for all active workers to finish */
2N/A while (tpool->tp_active != NULL) {
2N/A tpool->tp_flags |= TP_WAIT;
2N/A (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
2N/A }
2N/A
2N/A /* the last worker to terminate will wake us up */
2N/A while (tpool->tp_current != 0)
2N/A (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
2N/A
2N/A pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
2N/A delete_pool(tpool);
2N/A}
2N/A
2N/A/*
2N/A * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
2N/A * The last worker to terminate will delete the pool.
2N/A */
2N/Avoid
2N/Atpool_abandon(tpool_t *tpool)
2N/A{
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A if (tpool->tp_current == 0) {
2N/A /* no workers, just delete the pool */
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A delete_pool(tpool);
2N/A } else {
2N/A /* wake up all workers, last one will delete the pool */
2N/A tpool->tp_flags |= TP_ABANDON;
2N/A tpool->tp_flags &= ~TP_SUSPEND;
2N/A (void) cond_broadcast(&tpool->tp_workcv);
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A }
2N/A}
2N/A
2N/A/*
2N/A * Wait for all jobs to complete.
2N/A * Calling tpool_wait() from a job in the pool will cause deadlock.
2N/A */
2N/Avoid
2N/Atpool_wait(tpool_t *tpool)
2N/A{
2N/A ASSERT(!tpool_member(tpool));
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
2N/A while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
2N/A tpool->tp_flags |= TP_WAIT;
2N/A (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A }
2N/A pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
2N/A}
2N/A
2N/Avoid
2N/Atpool_suspend(tpool_t *tpool)
2N/A{
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A tpool->tp_flags |= TP_SUSPEND;
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A}
2N/A
2N/Aint
2N/Atpool_suspended(tpool_t *tpool)
2N/A{
2N/A int suspended;
2N/A
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A
2N/A return (suspended);
2N/A}
2N/A
2N/Avoid
2N/Atpool_resume(tpool_t *tpool)
2N/A{
2N/A int excess;
2N/A
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A if (!(tpool->tp_flags & TP_SUSPEND)) {
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A return;
2N/A }
2N/A tpool->tp_flags &= ~TP_SUSPEND;
2N/A (void) cond_broadcast(&tpool->tp_workcv);
2N/A excess = tpool->tp_njobs - tpool->tp_idle;
2N/A while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
2N/A if (create_worker(tpool) != 0)
2N/A break; /* pthread_create() failed */
2N/A tpool->tp_current++;
2N/A }
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A}
2N/A
2N/Aint
2N/Atpool_member(tpool_t *tpool)
2N/A{
2N/A pthread_t my_tid = pthread_self();
2N/A tpool_active_t *activep;
2N/A
2N/A ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
2N/A
2N/A sig_mutex_lock(&tpool->tp_mutex);
2N/A for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
2N/A if (activep->tpa_tid == my_tid) {
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A return (1);
2N/A }
2N/A }
2N/A sig_mutex_unlock(&tpool->tp_mutex);
2N/A return (0);
2N/A}
2N/A
2N/Avoid
2N/Apostfork1_child_tpool(void)
2N/A{
2N/A pthread_t my_tid = pthread_self();
2N/A tpool_t *tpool;
2N/A tpool_job_t *job;
2N/A
2N/A /*
2N/A * All of the thread pool workers are gone, except possibly
2N/A * for the current thread, if it is a thread pool worker thread.
2N/A * Retain the thread pools, but make them all empty. Whatever
2N/A * jobs were queued or running belong to the parent process.
2N/A */
2N/Atop:
2N/A if ((tpool = thread_pools) == NULL)
2N/A return;
2N/A
2N/A do {
2N/A tpool_active_t *activep;
2N/A
2N/A (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
2N/A (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
2N/A for (job = tpool->tp_head; job; job = tpool->tp_head) {
2N/A tpool->tp_head = job->tpj_next;
2N/A lfree(job, sizeof (*job));
2N/A }
2N/A tpool->tp_tail = NULL;
2N/A tpool->tp_njobs = 0;
2N/A for (activep = tpool->tp_active; activep;
2N/A activep = activep->tpa_next) {
2N/A if (activep->tpa_tid == my_tid) {
2N/A activep->tpa_next = NULL;
2N/A break;
2N/A }
2N/A }
2N/A tpool->tp_idle = 0;
2N/A tpool->tp_current = 0;
2N/A if ((tpool->tp_active = activep) != NULL)
2N/A tpool->tp_current = 1;
2N/A tpool->tp_flags &= ~TP_WAIT;
2N/A if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
2N/A tpool->tp_flags &= ~TP_DESTROY;
2N/A tpool->tp_flags |= TP_ABANDON;
2N/A if (tpool->tp_current == 0) {
2N/A delete_pool(tpool);
2N/A goto top; /* start over */
2N/A }
2N/A }
2N/A } while ((tpool = tpool->tp_forw) != thread_pools);
2N/A}