2N/A/*
2N/A * CDDL HEADER START
2N/A *
2N/A * The contents of this file are subject to the terms of the
2N/A * Common Development and Distribution License (the "License").
2N/A * You may not use this file except in compliance with the License.
2N/A *
2N/A * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
2N/A * or http://www.opensolaris.org/os/licensing.
2N/A * See the License for the specific language governing permissions
2N/A * and limitations under the License.
2N/A *
2N/A * When distributing Covered Code, include this CDDL HEADER in each
2N/A * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
2N/A * If applicable, add the following below this CDDL HEADER, with the
2N/A * fields enclosed by brackets "[]" replaced with your own identifying
2N/A * information: Portions Copyright [yyyy] [name of copyright owner]
2N/A *
2N/A * CDDL HEADER END
2N/A */
2N/A
2N/A/*
2N/A * Copyright (c) 1993, 2012, Oracle and/or its affiliates. All rights reserved.
2N/A */
2N/A
2N/A#include "lint.h"
2N/A#include "thr_uberdata.h"
2N/A#include "asyncio.h"
2N/A#include <atomic.h>
2N/A#include <sys/param.h>
2N/A#include <sys/file.h>
2N/A#include <sys/port.h>
2N/A
2N/Astatic int _aio_hash_insert(aio_result_t *, aio_req_t *);
2N/Astatic aio_req_t *_aio_req_get(aio_worker_t *);
2N/Astatic void _aio_req_add(aio_req_t *, aio_worker_t **, int);
2N/Astatic void _aio_req_del(aio_worker_t *, aio_req_t *, int);
2N/Astatic void _aio_work_done(aio_worker_t *);
2N/Astatic void _aio_enq_doneq(aio_req_t *);
2N/A
2N/Aextern void _aio_lio_free(aio_lio_t *);
2N/A
2N/Aextern int __fdsync(int, int);
2N/Aextern int __fcntl(int, int, ...);
2N/Aextern int _port_dispatch(int, int, int, int, uintptr_t, void *);
2N/A
2N/Astatic int _aio_fsync_del(aio_worker_t *, aio_req_t *);
2N/Astatic void _aiodone(aio_req_t *, ssize_t, int);
2N/Astatic void _aio_cancel_work(aio_worker_t *, int, int *, int *);
2N/Astatic void _aio_finish_request(aio_worker_t *, ssize_t, int);
2N/A
2N/A/*
2N/A * switch for kernel async I/O
2N/A */
2N/Aint _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */
2N/A
2N/A/*
2N/A * Key for thread-specific data
2N/A */
2N/Apthread_key_t _aio_key;
2N/A
2N/A/*
2N/A * Array for determining whether or not a file supports kaio.
2N/A * Initialized in _kaio_init().
2N/A */
2N/Auint32_t *_kaio_supported = NULL;
2N/A
2N/A/*
2N/A * workers for read/write requests
2N/A * (__aio_mutex lock protects circular linked list of workers)
2N/A */
2N/Aaio_worker_t *__workers_rw; /* circular list of AIO workers */
2N/Aaio_worker_t *__nextworker_rw; /* next worker in list of workers */
2N/Aint __rw_workerscnt; /* number of read/write workers */
2N/A
2N/A/*
2N/A * worker for notification requests.
2N/A */
2N/Aaio_worker_t *__workers_no; /* circular list of AIO workers */
2N/Aaio_worker_t *__nextworker_no; /* next worker in list of workers */
2N/Aint __no_workerscnt; /* number of write workers */
2N/A
2N/Aaio_req_t *_aio_done_tail; /* list of done requests */
2N/Aaio_req_t *_aio_done_head;
2N/A
2N/Amutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */
2N/Acond_t __aio_initcv = DEFAULTCV;
2N/Aint __aio_initbusy = 0;
2N/A
2N/Amutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */
2N/Acond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */
2N/A
2N/Apid_t __pid = (pid_t)-1; /* initialize as invalid pid */
2N/Aint _sigio_enabled = 0; /* when set, send SIGIO signal */
2N/A
2N/Aaio_hash_t *_aio_hash;
2N/A
2N/Aaio_req_t *_aio_doneq; /* double linked done queue list */
2N/A
2N/Aint _aio_donecnt = 0;
2N/Aint _aio_waitncnt = 0; /* # of requests for aio_waitn */
2N/Aint _aio_doneq_cnt = 0;
2N/Aint _aio_outstand_cnt = 0; /* # of outstanding requests */
2N/Aint _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */
2N/Aint _aio_req_done_cnt = 0; /* req. done but not in "done queue" */
2N/Aint _aio_kernel_suspend = 0; /* active kernel kaio calls */
2N/Aint _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */
2N/A
2N/Aint _max_workers = 256; /* max number of workers permitted */
2N/Aint _min_workers = 4; /* min number of workers */
2N/Aint _minworkload = 2; /* min number of request in q */
2N/Aint _aio_worker_cnt = 0; /* number of workers to do requests */
2N/Aint __uaio_ok = 0; /* AIO has been enabled */
2N/Asigset_t _worker_set; /* worker's signal mask */
2N/A
2N/Aint _aiowait_flag = 0; /* when set, aiowait() is inprogress */
2N/Aint _aio_flags = 0; /* see asyncio.h defines for */
2N/A
2N/Aaio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */
2N/A
2N/Aint hz; /* clock ticks per second */
2N/A
2N/Astatic int
2N/A_kaio_supported_init(void)
2N/A{
2N/A void *ptr;
2N/A size_t size;
2N/A
2N/A if (_kaio_supported != NULL) /* already initialized */
2N/A return (0);
2N/A
2N/A size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
2N/A ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
2N/A MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
2N/A if (ptr == MAP_FAILED)
2N/A return (-1);
2N/A _kaio_supported = ptr;
2N/A return (0);
2N/A}
2N/A
2N/A/*
2N/A * The aio subsystem is initialized when an AIO request is made.
2N/A * Constants are initialized like the max number of workers that
2N/A * the subsystem can create, and the minimum number of workers
2N/A * permitted before imposing some restrictions. Also, some
2N/A * workers are created.
2N/A */
2N/Aint
2N/A__uaio_init(void)
2N/A{
2N/A int ret = -1;
2N/A int i;
2N/A int cancel_state;
2N/A
2N/A lmutex_lock(&__aio_initlock);
2N/A (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
2N/A while (__aio_initbusy)
2N/A (void) cond_wait(&__aio_initcv, &__aio_initlock);
2N/A (void) pthread_setcancelstate(cancel_state, NULL);
2N/A if (__uaio_ok) { /* already initialized */
2N/A lmutex_unlock(&__aio_initlock);
2N/A return (0);
2N/A }
2N/A __aio_initbusy = 1;
2N/A lmutex_unlock(&__aio_initlock);
2N/A
2N/A hz = (int)sysconf(_SC_CLK_TCK);
2N/A __pid = getpid();
2N/A
2N/A setup_cancelsig(SIGAIOCANCEL);
2N/A
2N/A if (_kaio_supported_init() != 0)
2N/A goto out;
2N/A
2N/A /*
2N/A * Allocate and initialize the hash table.
2N/A * Do this only once, even if __uaio_init() is called twice.
2N/A */
2N/A if (_aio_hash == NULL) {
2N/A _aio_hash = (aio_hash_t *)mmap(NULL,
2N/A HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
2N/A MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
2N/A if ((void *)_aio_hash == MAP_FAILED) {
2N/A _aio_hash = NULL;
2N/A goto out;
2N/A }
2N/A for (i = 0; i < HASHSZ; i++)
2N/A (void) mutex_init(&_aio_hash[i].hash_lock,
2N/A USYNC_THREAD, NULL);
2N/A }
2N/A
2N/A /*
2N/A * Initialize worker's signal mask to only catch SIGAIOCANCEL.
2N/A */
2N/A (void) sigfillset(&_worker_set);
2N/A (void) sigdelset(&_worker_set, SIGAIOCANCEL);
2N/A
2N/A /*
2N/A * Create one worker to send asynchronous notifications.
2N/A * Do this only once, even if __uaio_init() is called twice.
2N/A */
2N/A if (__no_workerscnt == 0 &&
2N/A (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
2N/A errno = EAGAIN;
2N/A goto out;
2N/A }
2N/A
2N/A /*
2N/A * Create the minimum number of read/write workers.
2N/A * And later check whether atleast one worker is created;
2N/A * lwp_create() calls could fail because of segkp exhaustion.
2N/A */
2N/A for (i = 0; i < _min_workers; i++)
2N/A (void) _aio_create_worker(NULL, AIOREAD);
2N/A if (__rw_workerscnt == 0) {
2N/A errno = EAGAIN;
2N/A goto out;
2N/A }
2N/A
2N/A ret = 0;
2N/Aout:
2N/A lmutex_lock(&__aio_initlock);
2N/A if (ret == 0)
2N/A __uaio_ok = 1;
2N/A __aio_initbusy = 0;
2N/A (void) cond_broadcast(&__aio_initcv);
2N/A lmutex_unlock(&__aio_initlock);
2N/A return (ret);
2N/A}
2N/A
2N/A/*
2N/A * Called from close() before actually performing the real _close().
2N/A */
2N/Avoid
2N/A_aio_close(int fd)
2N/A{
2N/A if (fd < 0) /* avoid cancelling everything */
2N/A return;
2N/A /*
2N/A * Cancel all outstanding aio requests for this file descriptor.
2N/A */
2N/A if (__uaio_ok)
2N/A (void) aiocancel_all(fd);
2N/A /*
2N/A * If we have allocated the bit array, clear the bit for this file.
2N/A * The next open may re-use this file descriptor and the new file
2N/A * may have different kaio() behaviour.
2N/A */
2N/A if (_kaio_supported != NULL)
2N/A CLEAR_KAIO_SUPPORTED(fd);
2N/A}
2N/A
2N/A/*
2N/A * special kaio cleanup thread sits in a loop in the
2N/A * kernel waiting for pending kaio requests to complete.
2N/A */
2N/Avoid *
2N/A_kaio_cleanup_thread(void *arg)
2N/A{
2N/A if (pthread_setspecific(_aio_key, arg) != 0)
2N/A aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
2N/A (void) _kaio(AIOSTART);
2N/A return (arg);
2N/A}
2N/A
2N/A/*
2N/A * initialize kaio.
2N/A */
2N/Avoid
2N/A_kaio_init()
2N/A{
2N/A int error;
2N/A sigset_t oset;
2N/A int cancel_state;
2N/A
2N/A lmutex_lock(&__aio_initlock);
2N/A (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
2N/A while (__aio_initbusy)
2N/A (void) cond_wait(&__aio_initcv, &__aio_initlock);
2N/A (void) pthread_setcancelstate(cancel_state, NULL);
2N/A if (_kaio_ok) { /* already initialized */
2N/A lmutex_unlock(&__aio_initlock);
2N/A return;
2N/A }
2N/A __aio_initbusy = 1;
2N/A lmutex_unlock(&__aio_initlock);
2N/A
2N/A if (_kaio_supported_init() != 0)
2N/A error = ENOMEM;
2N/A else if ((_kaiowp = _aio_worker_alloc()) == NULL)
2N/A error = ENOMEM;
2N/A else if ((error = (int)_kaio(AIOINIT)) == 0) {
2N/A (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2N/A error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
2N/A _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
2N/A (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
2N/A }
2N/A if (error && _kaiowp != NULL) {
2N/A _aio_worker_free(_kaiowp);
2N/A _kaiowp = NULL;
2N/A }
2N/A
2N/A lmutex_lock(&__aio_initlock);
2N/A if (error)
2N/A _kaio_ok = -1;
2N/A else
2N/A _kaio_ok = 1;
2N/A __aio_initbusy = 0;
2N/A (void) cond_broadcast(&__aio_initcv);
2N/A lmutex_unlock(&__aio_initlock);
2N/A}
2N/A
2N/Aint
2N/Aaioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
2N/A aio_result_t *resultp)
2N/A{
2N/A return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
2N/A}
2N/A
2N/Aint
2N/Aaiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
2N/A aio_result_t *resultp)
2N/A{
2N/A return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
2N/A}
2N/A
2N/A#if !defined(_LP64)
2N/Aint
2N/Aaioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
2N/A aio_result_t *resultp)
2N/A{
2N/A return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
2N/A}
2N/A
2N/Aint
2N/Aaiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
2N/A aio_result_t *resultp)
2N/A{
2N/A return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
2N/A}
2N/A#endif /* !defined(_LP64) */
2N/A
2N/Aint
2N/A_aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
2N/A aio_result_t *resultp, int mode)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_args_t *ap;
2N/A offset_t loffset;
2N/A struct stat64 stat64;
2N/A int error = 0;
2N/A int kerr;
2N/A int umode;
2N/A
2N/A switch (whence) {
2N/A
2N/A case SEEK_SET:
2N/A loffset = offset;
2N/A break;
2N/A case SEEK_CUR:
2N/A if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
2N/A error = -1;
2N/A else
2N/A loffset += offset;
2N/A break;
2N/A case SEEK_END:
2N/A if (fstat64(fd, &stat64) == -1)
2N/A error = -1;
2N/A else
2N/A loffset = offset + stat64.st_size;
2N/A break;
2N/A default:
2N/A errno = EINVAL;
2N/A error = -1;
2N/A }
2N/A
2N/A if (error)
2N/A return (error);
2N/A
2N/A /* initialize kaio */
2N/A if (!_kaio_ok)
2N/A _kaio_init();
2N/A
2N/A /*
2N/A * _aio_do_request() needs the original request code (mode) to be able
2N/A * to choose the appropiate 32/64 bit function. All other functions
2N/A * only require the difference between READ and WRITE (umode).
2N/A */
2N/A if (mode == AIOAREAD64 || mode == AIOAWRITE64)
2N/A umode = mode - AIOAREAD64;
2N/A else
2N/A umode = mode;
2N/A
2N/A /*
2N/A * Try kernel aio first.
2N/A * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
2N/A */
2N/A if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
2N/A resultp->aio_errno = 0;
2N/A sig_mutex_lock(&__aio_mutex);
2N/A _kaio_outstand_cnt++;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
2N/A (umode | AIO_POLL_BIT) : umode),
2N/A fd, buf, bufsz, loffset, resultp);
2N/A if (kerr == 0) {
2N/A return (0);
2N/A }
2N/A sig_mutex_lock(&__aio_mutex);
2N/A _kaio_outstand_cnt--;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (errno != ENOTSUP && errno != EBADFD)
2N/A return (-1);
2N/A if (errno == EBADFD)
2N/A SET_KAIO_NOT_SUPPORTED(fd);
2N/A }
2N/A
2N/A if (!__uaio_ok && __uaio_init() == -1)
2N/A return (-1);
2N/A
2N/A if ((reqp = _aio_req_alloc()) == NULL) {
2N/A errno = EAGAIN;
2N/A return (-1);
2N/A }
2N/A
2N/A /*
2N/A * _aio_do_request() checks reqp->req_op to differentiate
2N/A * between 32 and 64 bit access.
2N/A */
2N/A reqp->req_op = mode;
2N/A reqp->req_resultp = resultp;
2N/A ap = &reqp->req_args;
2N/A ap->fd = fd;
2N/A ap->buf = buf;
2N/A ap->bufsz = bufsz;
2N/A ap->offset = loffset;
2N/A
2N/A if (_aio_hash_insert(resultp, reqp) != 0) {
2N/A _aio_req_free(reqp);
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A /*
2N/A * _aio_req_add() only needs the difference between READ and
2N/A * WRITE to choose the right worker queue.
2N/A */
2N/A _aio_req_add(reqp, &__nextworker_rw, umode);
2N/A return (0);
2N/A}
2N/A
2N/Aint
2N/Aaiocancel(aio_result_t *resultp)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_worker_t *aiowp;
2N/A int ret;
2N/A int done = 0;
2N/A int canceled = 0;
2N/A
2N/A if (!__uaio_ok) {
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A
2N/A sig_mutex_lock(&__aio_mutex);
2N/A reqp = _aio_hash_find(resultp);
2N/A if (reqp == NULL) {
2N/A if (_aio_outstand_cnt == _aio_req_done_cnt)
2N/A errno = EINVAL;
2N/A else
2N/A errno = EACCES;
2N/A ret = -1;
2N/A } else {
2N/A aiowp = reqp->req_worker;
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A (void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A
2N/A if (canceled) {
2N/A ret = 0;
2N/A } else {
2N/A if (_aio_outstand_cnt == 0 ||
2N/A _aio_outstand_cnt == _aio_req_done_cnt)
2N/A errno = EINVAL;
2N/A else
2N/A errno = EACCES;
2N/A ret = -1;
2N/A }
2N/A }
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A return (ret);
2N/A}
2N/A
2N/A/* ARGSUSED */
2N/Astatic void
2N/A_aiowait_cleanup(void *arg)
2N/A{
2N/A sig_mutex_lock(&__aio_mutex);
2N/A _aiowait_flag--;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A}
2N/A
2N/A/*
2N/A * This must be asynch safe and cancel safe
2N/A */
2N/Aaio_result_t *
2N/Aaiowait(struct timeval *uwait)
2N/A{
2N/A aio_result_t *uresultp;
2N/A aio_result_t *kresultp;
2N/A aio_result_t *resultp;
2N/A int dontblock;
2N/A int timedwait = 0;
2N/A int kaio_errno = 0;
2N/A struct timeval twait;
2N/A struct timeval *wait = NULL;
2N/A hrtime_t hrtend;
2N/A hrtime_t hres;
2N/A
2N/A if (uwait) {
2N/A /*
2N/A * Check for a valid specified wait time.
2N/A * If it is invalid, fail the call right away.
2N/A */
2N/A if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
2N/A uwait->tv_usec >= MICROSEC) {
2N/A errno = EINVAL;
2N/A return ((aio_result_t *)-1);
2N/A }
2N/A
2N/A if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
2N/A hrtend = gethrtime() +
2N/A (hrtime_t)uwait->tv_sec * NANOSEC +
2N/A (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
2N/A twait = *uwait;
2N/A wait = &twait;
2N/A timedwait++;
2N/A } else {
2N/A /* polling */
2N/A sig_mutex_lock(&__aio_mutex);
2N/A if (_kaio_outstand_cnt == 0) {
2N/A kresultp = (aio_result_t *)-1;
2N/A } else {
2N/A kresultp = (aio_result_t *)_kaio(AIOWAIT,
2N/A (struct timeval *)-1, 1);
2N/A if (kresultp != (aio_result_t *)-1 &&
2N/A kresultp != NULL &&
2N/A kresultp != (aio_result_t *)1) {
2N/A _kaio_outstand_cnt--;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A return (kresultp);
2N/A }
2N/A }
2N/A uresultp = _aio_req_done();
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (uresultp != NULL &&
2N/A uresultp != (aio_result_t *)-1) {
2N/A return (uresultp);
2N/A }
2N/A if (uresultp == (aio_result_t *)-1 &&
2N/A kresultp == (aio_result_t *)-1) {
2N/A errno = EINVAL;
2N/A return ((aio_result_t *)-1);
2N/A } else {
2N/A return (NULL);
2N/A }
2N/A }
2N/A }
2N/A
2N/A for (;;) {
2N/A sig_mutex_lock(&__aio_mutex);
2N/A uresultp = _aio_req_done();
2N/A if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A resultp = uresultp;
2N/A break;
2N/A }
2N/A _aiowait_flag++;
2N/A dontblock = (uresultp == (aio_result_t *)-1);
2N/A if (dontblock && _kaio_outstand_cnt == 0) {
2N/A kresultp = (aio_result_t *)-1;
2N/A kaio_errno = EINVAL;
2N/A } else {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A pthread_cleanup_push(_aiowait_cleanup, NULL);
2N/A _cancel_prologue();
2N/A kresultp = (aio_result_t *)_kaio(AIOWAIT,
2N/A wait, dontblock);
2N/A _cancel_epilogue();
2N/A pthread_cleanup_pop(0);
2N/A sig_mutex_lock(&__aio_mutex);
2N/A kaio_errno = errno;
2N/A }
2N/A _aiowait_flag--;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (kresultp == (aio_result_t *)1) {
2N/A /* aiowait() awakened by an aionotify() */
2N/A continue;
2N/A } else if (kresultp != NULL &&
2N/A kresultp != (aio_result_t *)-1) {
2N/A resultp = kresultp;
2N/A sig_mutex_lock(&__aio_mutex);
2N/A _kaio_outstand_cnt--;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A break;
2N/A } else if (kresultp == (aio_result_t *)-1 &&
2N/A kaio_errno == EINVAL &&
2N/A uresultp == (aio_result_t *)-1) {
2N/A errno = kaio_errno;
2N/A resultp = (aio_result_t *)-1;
2N/A break;
2N/A } else if (kresultp == (aio_result_t *)-1 &&
2N/A kaio_errno == EINTR) {
2N/A errno = kaio_errno;
2N/A resultp = (aio_result_t *)-1;
2N/A break;
2N/A } else if (timedwait) {
2N/A hres = hrtend - gethrtime();
2N/A if (hres <= 0) {
2N/A /* time is up; return */
2N/A resultp = NULL;
2N/A break;
2N/A } else {
2N/A /*
2N/A * Some time left. Round up the remaining time
2N/A * in nanoseconds to microsec. Retry the call.
2N/A */
2N/A hres += (NANOSEC / MICROSEC) - 1;
2N/A wait->tv_sec = hres / NANOSEC;
2N/A wait->tv_usec =
2N/A (hres % NANOSEC) / (NANOSEC / MICROSEC);
2N/A }
2N/A } else {
2N/A ASSERT(kresultp == NULL && uresultp == NULL);
2N/A resultp = NULL;
2N/A continue;
2N/A }
2N/A }
2N/A return (resultp);
2N/A}
2N/A
2N/A/*
2N/A * _aio_get_timedelta calculates the remaining time and stores the result
2N/A * into timespec_t *wait.
2N/A */
2N/A
2N/Aint
2N/A_aio_get_timedelta(timespec_t *end, timespec_t *wait)
2N/A{
2N/A int ret = 0;
2N/A struct timeval cur;
2N/A timespec_t curtime;
2N/A
2N/A (void) gettimeofday(&cur, NULL);
2N/A curtime.tv_sec = cur.tv_sec;
2N/A curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */
2N/A
2N/A if (end->tv_sec >= curtime.tv_sec) {
2N/A wait->tv_sec = end->tv_sec - curtime.tv_sec;
2N/A if (end->tv_nsec >= curtime.tv_nsec) {
2N/A wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
2N/A if (wait->tv_sec == 0 && wait->tv_nsec == 0)
2N/A ret = -1; /* timer expired */
2N/A } else {
2N/A if (end->tv_sec > curtime.tv_sec) {
2N/A wait->tv_sec -= 1;
2N/A wait->tv_nsec = NANOSEC -
2N/A (curtime.tv_nsec - end->tv_nsec);
2N/A } else {
2N/A ret = -1; /* timer expired */
2N/A }
2N/A }
2N/A } else {
2N/A ret = -1;
2N/A }
2N/A return (ret);
2N/A}
2N/A
2N/A/*
2N/A * If closing by file descriptor: we will simply cancel all the outstanding
2N/A * aio`s and return. Those aio's in question will have either noticed the
2N/A * cancellation notice before, during, or after initiating io.
2N/A */
2N/Aint
2N/Aaiocancel_all(int fd)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_req_t **reqpp, *last;
2N/A aio_worker_t *first;
2N/A aio_worker_t *next;
2N/A int canceled = 0;
2N/A int done = 0;
2N/A int cancelall = 0;
2N/A
2N/A sig_mutex_lock(&__aio_mutex);
2N/A
2N/A if (_aio_outstand_cnt == 0) {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A return (AIO_ALLDONE);
2N/A }
2N/A
2N/A /*
2N/A * Cancel requests from the read/write workers' queues.
2N/A */
2N/A first = __nextworker_rw;
2N/A next = first;
2N/A do {
2N/A _aio_cancel_work(next, fd, &canceled, &done);
2N/A } while ((next = next->work_forw) != first);
2N/A
2N/A /*
2N/A * finally, check if there are requests on the done queue that
2N/A * should be canceled.
2N/A */
2N/A if (fd < 0)
2N/A cancelall = 1;
2N/A reqpp = &_aio_done_tail;
2N/A last = _aio_done_tail;
2N/A while ((reqp = *reqpp) != NULL) {
2N/A if (cancelall || reqp->req_args.fd == fd) {
2N/A *reqpp = reqp->req_next;
2N/A if (last == reqp) {
2N/A last = reqp->req_next;
2N/A }
2N/A if (_aio_done_head == reqp) {
2N/A /* this should be the last req in list */
2N/A _aio_done_head = last;
2N/A }
2N/A _aio_donecnt--;
2N/A _aio_set_result(reqp, -1, ECANCELED);
2N/A (void) _aio_hash_del(reqp->req_resultp);
2N/A _aio_req_free(reqp);
2N/A } else {
2N/A reqpp = &reqp->req_next;
2N/A last = reqp;
2N/A }
2N/A }
2N/A
2N/A if (cancelall) {
2N/A ASSERT(_aio_donecnt == 0);
2N/A _aio_done_head = NULL;
2N/A }
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A
2N/A if (canceled && done == 0)
2N/A return (AIO_CANCELED);
2N/A else if (done && canceled == 0)
2N/A return (AIO_ALLDONE);
2N/A else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
2N/A return ((int)_kaio(AIOCANCEL, fd, NULL));
2N/A return (AIO_NOTCANCELED);
2N/A}
2N/A
2N/A/*
2N/A * Cancel requests from a given work queue. If the file descriptor
2N/A * parameter, fd, is non-negative, then only cancel those requests
2N/A * in this queue that are to this file descriptor. If the fd
2N/A * parameter is -1, then cancel all requests.
2N/A */
2N/Astatic void
2N/A_aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
2N/A{
2N/A aio_req_t *reqp;
2N/A
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A /*
2N/A * cancel queued requests first.
2N/A */
2N/A reqp = aiowp->work_tail1;
2N/A while (reqp != NULL) {
2N/A if (fd < 0 || reqp->req_args.fd == fd) {
2N/A if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
2N/A /*
2N/A * Callers locks were dropped.
2N/A * reqp is invalid; start traversing
2N/A * the list from the beginning again.
2N/A */
2N/A reqp = aiowp->work_tail1;
2N/A continue;
2N/A }
2N/A }
2N/A reqp = reqp->req_next;
2N/A }
2N/A /*
2N/A * Since the queued requests have been canceled, there can
2N/A * only be one inprogress request that should be canceled.
2N/A */
2N/A if ((reqp = aiowp->work_req) != NULL &&
2N/A (fd < 0 || reqp->req_args.fd == fd))
2N/A (void) _aio_cancel_req(aiowp, reqp, canceled, done);
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A}
2N/A
2N/A/*
2N/A * Cancel a request. Return 1 if the callers locks were temporarily
2N/A * dropped, otherwise return 0.
2N/A */
2N/Aint
2N/A_aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
2N/A{
2N/A int ostate = reqp->req_state;
2N/A
2N/A ASSERT(MUTEX_HELD(&__aio_mutex));
2N/A ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
2N/A if (ostate == AIO_REQ_CANCELED)
2N/A return (0);
2N/A if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
2N/A aiowp->work_prev1 == reqp) {
2N/A ASSERT(aiowp->work_done1 != 0);
2N/A /*
2N/A * If not on the done queue yet, just mark it CANCELED,
2N/A * _aio_work_done() will do the necessary clean up.
2N/A * This is required to ensure that aiocancel_all() cancels
2N/A * all the outstanding requests, including this one which
2N/A * is not yet on done queue but has been marked done.
2N/A */
2N/A _aio_set_result(reqp, -1, ECANCELED);
2N/A (void) _aio_hash_del(reqp->req_resultp);
2N/A reqp->req_state = AIO_REQ_CANCELED;
2N/A (*canceled)++;
2N/A return (0);
2N/A }
2N/A
2N/A if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
2N/A (*done)++;
2N/A return (0);
2N/A }
2N/A if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
2N/A ASSERT(POSIX_AIO(reqp));
2N/A /* Cancel the queued aio_fsync() request */
2N/A if (!reqp->req_head->lio_canned) {
2N/A reqp->req_head->lio_canned = 1;
2N/A _aio_outstand_cnt--;
2N/A (*canceled)++;
2N/A }
2N/A return (0);
2N/A }
2N/A reqp->req_state = AIO_REQ_CANCELED;
2N/A _aio_req_del(aiowp, reqp, ostate);
2N/A (void) _aio_hash_del(reqp->req_resultp);
2N/A (*canceled)++;
2N/A if (reqp == aiowp->work_req) {
2N/A ASSERT(ostate == AIO_REQ_INPROGRESS);
2N/A /*
2N/A * Set the result values now, before _aiodone() is called.
2N/A * We do this because the application can expect aio_return
2N/A * and aio_errno to be set to -1 and ECANCELED, respectively,
2N/A * immediately after a successful return from aiocancel()
2N/A * or aio_cancel().
2N/A */
2N/A _aio_set_result(reqp, -1, ECANCELED);
2N/A (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
2N/A return (0);
2N/A }
2N/A if (!POSIX_AIO(reqp)) {
2N/A _aio_outstand_cnt--;
2N/A _aio_set_result(reqp, -1, ECANCELED);
2N/A _aio_req_free(reqp);
2N/A return (0);
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A _aiodone(reqp, -1, ECANCELED);
2N/A sig_mutex_lock(&__aio_mutex);
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A return (1);
2N/A}
2N/A
2N/Aint
2N/A_aio_create_worker(aio_req_t *reqp, int mode)
2N/A{
2N/A aio_worker_t *aiowp, **workers, **nextworker;
2N/A int *aio_workerscnt;
2N/A void *(*func)(void *);
2N/A sigset_t oset;
2N/A int error;
2N/A
2N/A /*
2N/A * Put the new worker thread in the right queue.
2N/A */
2N/A switch (mode) {
2N/A case AIOREAD:
2N/A case AIOWRITE:
2N/A case AIOAREAD:
2N/A case AIOAWRITE:
2N/A#if !defined(_LP64)
2N/A case AIOAREAD64:
2N/A case AIOAWRITE64:
2N/A#endif
2N/A workers = &__workers_rw;
2N/A nextworker = &__nextworker_rw;
2N/A aio_workerscnt = &__rw_workerscnt;
2N/A func = _aio_do_request;
2N/A break;
2N/A case AIONOTIFY:
2N/A workers = &__workers_no;
2N/A nextworker = &__nextworker_no;
2N/A func = _aio_do_notify;
2N/A aio_workerscnt = &__no_workerscnt;
2N/A break;
2N/A default:
2N/A aio_panic("_aio_create_worker: invalid mode");
2N/A break;
2N/A }
2N/A
2N/A if ((aiowp = _aio_worker_alloc()) == NULL)
2N/A return (-1);
2N/A
2N/A if (reqp) {
2N/A reqp->req_state = AIO_REQ_QUEUED;
2N/A reqp->req_worker = aiowp;
2N/A aiowp->work_head1 = reqp;
2N/A aiowp->work_tail1 = reqp;
2N/A aiowp->work_next1 = reqp;
2N/A aiowp->work_count1 = 1;
2N/A aiowp->work_minload1 = 1;
2N/A }
2N/A
2N/A (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2N/A error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
2N/A THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
2N/A (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
2N/A if (error) {
2N/A if (reqp) {
2N/A reqp->req_state = 0;
2N/A reqp->req_worker = NULL;
2N/A }
2N/A _aio_worker_free(aiowp);
2N/A return (-1);
2N/A }
2N/A
2N/A lmutex_lock(&__aio_mutex);
2N/A (*aio_workerscnt)++;
2N/A if (*workers == NULL) {
2N/A aiowp->work_forw = aiowp;
2N/A aiowp->work_backw = aiowp;
2N/A *nextworker = aiowp;
2N/A *workers = aiowp;
2N/A } else {
2N/A aiowp->work_backw = (*workers)->work_backw;
2N/A aiowp->work_forw = (*workers);
2N/A (*workers)->work_backw->work_forw = aiowp;
2N/A (*workers)->work_backw = aiowp;
2N/A }
2N/A _aio_worker_cnt++;
2N/A lmutex_unlock(&__aio_mutex);
2N/A
2N/A (void) thr_continue(aiowp->work_tid);
2N/A
2N/A return (0);
2N/A}
2N/A
2N/A/*
2N/A * This is the worker's main routine.
2N/A * The task of this function is to execute all queued requests;
2N/A * once the last pending request is executed this function will block
2N/A * in _aio_idle(). A new incoming request must wakeup this thread to
2N/A * restart the work.
2N/A * Every worker has an own work queue. The queue lock is required
2N/A * to synchronize the addition of new requests for this worker or
2N/A * cancellation of pending/running requests.
2N/A *
2N/A * Cancellation scenarios:
2N/A * The cancellation of a request is being done asynchronously using
2N/A * _aio_cancel_req() from another thread context.
2N/A * A queued request can be cancelled in different manners :
2N/A * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
2N/A * - lock the queue -> remove the request -> unlock the queue
2N/A * - this function/thread does not detect this cancellation process
2N/A * b) request is in progress (AIO_REQ_INPROGRESS) :
2N/A * - this function first allow the cancellation of the running
2N/A * request with the flag "work_cancel_flg=1"
2N/A * see _aio_req_get() -> _aio_cancel_on()
2N/A * During this phase, it is allowed to interrupt the worker
2N/A * thread running the request (this thread) using the SIGAIOCANCEL
2N/A * signal.
2N/A * Once this thread returns from the kernel (because the request
2N/A * is just done), then it must disable a possible cancellation
2N/A * and proceed to finish the request. To disable the cancellation
2N/A * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
2N/A * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
2N/A * same procedure as in a)
2N/A *
2N/A * To b)
2N/A * This thread uses sigsetjmp() to define the position in the code, where
2N/A * it wish to continue working in the case that a SIGAIOCANCEL signal
2N/A * is detected.
2N/A * Normally this thread should get the cancellation signal during the
2N/A * kernel phase (reading or writing). In that case the signal handler
2N/A * aiosigcancelhndlr() is activated using the worker thread context,
2N/A * which again will use the siglongjmp() function to break the standard
2N/A * code flow and jump to the "sigsetjmp" position, provided that
2N/A * "work_cancel_flg" is set to "1".
2N/A * Because the "work_cancel_flg" is only manipulated by this worker
2N/A * thread and it can only run on one CPU at a given time, it is not
2N/A * necessary to protect that flag with the queue lock.
2N/A * Returning from the kernel (read or write system call) we must
2N/A * first disable the use of the SIGAIOCANCEL signal and accordingly
2N/A * the use of the siglongjmp() function to prevent a possible deadlock:
2N/A * - It can happens that this worker thread returns from the kernel and
2N/A * blocks in "work_qlock1",
2N/A * - then a second thread cancels the apparently "in progress" request
2N/A * and sends the SIGAIOCANCEL signal to the worker thread,
2N/A * - the worker thread gets assigned the "work_qlock1" and will returns
2N/A * from the kernel,
2N/A * - the kernel detects the pending signal and activates the signal
2N/A * handler instead,
2N/A * - if the "work_cancel_flg" is still set then the signal handler
2N/A * should use siglongjmp() to cancel the "in progress" request and
2N/A * it would try to acquire the same work_qlock1 in _aio_req_get()
2N/A * for a second time => deadlock.
2N/A * To avoid that situation we disable the cancellation of the request
2N/A * in progress BEFORE we try to acquire the work_qlock1.
2N/A * In that case the signal handler will not call siglongjmp() and the
2N/A * worker thread will continue running the standard code flow.
2N/A * Then this thread must check the AIO_REQ_CANCELED flag to emulate
2N/A * an eventually required siglongjmp() freeing the work_qlock1 and
2N/A * avoiding a deadlock.
2N/A */
2N/Avoid *
2N/A_aio_do_request(void *arglist)
2N/A{
2N/A aio_worker_t *aiowp = (aio_worker_t *)arglist;
2N/A ulwp_t *self = curthread;
2N/A struct aio_args *arg;
2N/A aio_req_t *reqp; /* current AIO request */
2N/A ssize_t retval;
2N/A int append;
2N/A int error;
2N/A
2N/A if (pthread_setspecific(_aio_key, aiowp) != 0)
2N/A aio_panic("_aio_do_request, pthread_setspecific()");
2N/A (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
2N/A ASSERT(aiowp->work_req == NULL);
2N/A
2N/A /*
2N/A * We resume here when an operation is cancelled.
2N/A * On first entry, aiowp->work_req == NULL, so all
2N/A * we do is block SIGAIOCANCEL.
2N/A */
2N/A (void) sigsetjmp(aiowp->work_jmp_buf, 0);
2N/A ASSERT(self->ul_sigdefer == 0);
2N/A
2N/A sigoff(self); /* block SIGAIOCANCEL */
2N/A if (aiowp->work_req != NULL)
2N/A _aio_finish_request(aiowp, -1, ECANCELED);
2N/A
2N/A for (;;) {
2N/A /*
2N/A * Put completed requests on aio_done_list. This has
2N/A * to be done as part of the main loop to ensure that
2N/A * we don't artificially starve any aiowait'ers.
2N/A */
2N/A if (aiowp->work_done1)
2N/A _aio_work_done(aiowp);
2N/A
2N/Atop:
2N/A /* consume any deferred SIGAIOCANCEL signal here */
2N/A sigon(self);
2N/A sigoff(self);
2N/A
2N/A while ((reqp = _aio_req_get(aiowp)) == NULL) {
2N/A if (_aio_idle(aiowp) != 0)
2N/A goto top;
2N/A }
2N/A arg = &reqp->req_args;
2N/A ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
2N/A reqp->req_state == AIO_REQ_CANCELED);
2N/A error = 0;
2N/A
2N/A switch (reqp->req_op) {
2N/A case AIOREAD:
2N/A case AIOAREAD:
2N/A sigon(self); /* unblock SIGAIOCANCEL */
2N/A retval = pread(arg->fd, arg->buf,
2N/A arg->bufsz, arg->offset);
2N/A if (retval == -1) {
2N/A if (errno == ESPIPE) {
2N/A retval = read(arg->fd,
2N/A arg->buf, arg->bufsz);
2N/A if (retval == -1)
2N/A error = errno;
2N/A } else {
2N/A error = errno;
2N/A }
2N/A }
2N/A sigoff(self); /* block SIGAIOCANCEL */
2N/A break;
2N/A case AIOWRITE:
2N/A case AIOAWRITE:
2N/A /*
2N/A * The SUSv3 POSIX spec for aio_write() states:
2N/A * If O_APPEND is set for the file descriptor,
2N/A * write operations append to the file in the
2N/A * same order as the calls were made.
2N/A * but, somewhat inconsistently, it requires pwrite()
2N/A * to ignore the O_APPEND setting. So we have to use
2N/A * fcntl() to get the open modes and call write() for
2N/A * the O_APPEND case.
2N/A */
2N/A append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
2N/A sigon(self); /* unblock SIGAIOCANCEL */
2N/A retval = append?
2N/A write(arg->fd, arg->buf, arg->bufsz) :
2N/A pwrite(arg->fd, arg->buf, arg->bufsz,
2N/A arg->offset);
2N/A if (retval == -1) {
2N/A if (errno == ESPIPE) {
2N/A retval = write(arg->fd,
2N/A arg->buf, arg->bufsz);
2N/A if (retval == -1)
2N/A error = errno;
2N/A } else {
2N/A error = errno;
2N/A }
2N/A }
2N/A sigoff(self); /* block SIGAIOCANCEL */
2N/A break;
2N/A#if !defined(_LP64)
2N/A case AIOAREAD64:
2N/A sigon(self); /* unblock SIGAIOCANCEL */
2N/A retval = pread64(arg->fd, arg->buf,
2N/A arg->bufsz, arg->offset);
2N/A if (retval == -1) {
2N/A if (errno == ESPIPE) {
2N/A retval = read(arg->fd,
2N/A arg->buf, arg->bufsz);
2N/A if (retval == -1)
2N/A error = errno;
2N/A } else {
2N/A error = errno;
2N/A }
2N/A }
2N/A sigoff(self); /* block SIGAIOCANCEL */
2N/A break;
2N/A case AIOAWRITE64:
2N/A /*
2N/A * The SUSv3 POSIX spec for aio_write() states:
2N/A * If O_APPEND is set for the file descriptor,
2N/A * write operations append to the file in the
2N/A * same order as the calls were made.
2N/A * but, somewhat inconsistently, it requires pwrite()
2N/A * to ignore the O_APPEND setting. So we have to use
2N/A * fcntl() to get the open modes and call write() for
2N/A * the O_APPEND case.
2N/A */
2N/A append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
2N/A sigon(self); /* unblock SIGAIOCANCEL */
2N/A retval = append?
2N/A write(arg->fd, arg->buf, arg->bufsz) :
2N/A pwrite64(arg->fd, arg->buf, arg->bufsz,
2N/A arg->offset);
2N/A if (retval == -1) {
2N/A if (errno == ESPIPE) {
2N/A retval = write(arg->fd,
2N/A arg->buf, arg->bufsz);
2N/A if (retval == -1)
2N/A error = errno;
2N/A } else {
2N/A error = errno;
2N/A }
2N/A }
2N/A sigoff(self); /* block SIGAIOCANCEL */
2N/A break;
2N/A#endif /* !defined(_LP64) */
2N/A case AIOFSYNC:
2N/A if (_aio_fsync_del(aiowp, reqp))
2N/A goto top;
2N/A ASSERT(reqp->req_head == NULL);
2N/A /*
2N/A * All writes for this fsync request are now
2N/A * acknowledged. Now make these writes visible
2N/A * and put the final request into the hash table.
2N/A */
2N/A if (reqp->req_state == AIO_REQ_CANCELED) {
2N/A /* EMPTY */;
2N/A } else if (arg->offset == O_SYNC) {
2N/A if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
2N/A error = errno;
2N/A } else {
2N/A if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
2N/A error = errno;
2N/A }
2N/A if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
2N/A aio_panic("_aio_do_request(): AIOFSYNC: "
2N/A "request already in hash table");
2N/A break;
2N/A default:
2N/A aio_panic("_aio_do_request, bad op");
2N/A }
2N/A
2N/A _aio_finish_request(aiowp, retval, error);
2N/A }
2N/A /* NOTREACHED */
2N/A return (NULL);
2N/A}
2N/A
2N/A/*
2N/A * Perform the tail processing for _aio_do_request().
2N/A * The in-progress request may or may not have been cancelled.
2N/A */
2N/Astatic void
2N/A_aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
2N/A{
2N/A aio_req_t *reqp;
2N/A
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A if ((reqp = aiowp->work_req) == NULL)
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A else {
2N/A aiowp->work_req = NULL;
2N/A if (reqp->req_state == AIO_REQ_CANCELED) {
2N/A retval = -1;
2N/A error = ECANCELED;
2N/A }
2N/A if (!POSIX_AIO(reqp)) {
2N/A int notify;
2N/A if (reqp->req_state == AIO_REQ_INPROGRESS) {
2N/A reqp->req_state = AIO_REQ_DONE;
2N/A _aio_set_result(reqp, retval, error);
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A sig_mutex_lock(&__aio_mutex);
2N/A /*
2N/A * If it was canceled, this request will not be
2N/A * added to done list. Just free it.
2N/A */
2N/A if (error == ECANCELED) {
2N/A _aio_outstand_cnt--;
2N/A _aio_req_free(reqp);
2N/A } else {
2N/A _aio_req_done_cnt++;
2N/A }
2N/A /*
2N/A * Notify any thread that may have blocked
2N/A * because it saw an outstanding request.
2N/A */
2N/A notify = 0;
2N/A if (_aio_outstand_cnt == 0 && _aiowait_flag) {
2N/A notify = 1;
2N/A }
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (notify) {
2N/A (void) _kaio(AIONOTIFY);
2N/A }
2N/A } else {
2N/A if (reqp->req_state == AIO_REQ_INPROGRESS)
2N/A reqp->req_state = AIO_REQ_DONE;
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A _aiodone(reqp, retval, error);
2N/A }
2N/A }
2N/A}
2N/A
2N/Avoid
2N/A_aio_req_mark_done(aio_req_t *reqp)
2N/A{
2N/A#if !defined(_LP64)
2N/A if (reqp->req_largefile)
2N/A ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
2N/A else
2N/A#endif
2N/A ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
2N/A}
2N/A
2N/A/*
2N/A * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
2N/A * hopefully to consume one of our queued signals.
2N/A */
2N/Astatic void
2N/A_aio_delay(int ticks)
2N/A{
2N/A (void) usleep(ticks * (MICROSEC / hz));
2N/A}
2N/A
2N/A/*
2N/A * Actually send the notifications.
2N/A * We could block indefinitely here if the application
2N/A * is not listening for the signal or port notifications.
2N/A */
2N/Astatic void
2N/Asend_notification(notif_param_t *npp)
2N/A{
2N/A if (npp->np_signo)
2N/A (void) __sigqueue(__pid, npp->np_signo, npp->np_user,
2N/A SI_ASYNCIO, NULL);
2N/A else if (npp->np_port >= 0)
2N/A (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
2N/A npp->np_event, npp->np_object, npp->np_user);
2N/A
2N/A if (npp->np_lio_signo)
2N/A (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
2N/A SI_ASYNCIO, NULL);
2N/A else if (npp->np_lio_port >= 0)
2N/A (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
2N/A npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
2N/A}
2N/A
2N/A/*
2N/A * Asynchronous notification worker.
2N/A */
2N/Avoid *
2N/A_aio_do_notify(void *arg)
2N/A{
2N/A aio_worker_t *aiowp = (aio_worker_t *)arg;
2N/A aio_req_t *reqp;
2N/A
2N/A /*
2N/A * This isn't really necessary. All signals are blocked.
2N/A */
2N/A if (pthread_setspecific(_aio_key, aiowp) != 0)
2N/A aio_panic("_aio_do_notify, pthread_setspecific()");
2N/A
2N/A /*
2N/A * Notifications are never cancelled.
2N/A * All signals remain blocked, forever.
2N/A */
2N/A for (;;) {
2N/A while ((reqp = _aio_req_get(aiowp)) == NULL) {
2N/A if (_aio_idle(aiowp) != 0)
2N/A aio_panic("_aio_do_notify: _aio_idle() failed");
2N/A }
2N/A send_notification(&reqp->req_notify);
2N/A _aio_req_free(reqp);
2N/A }
2N/A
2N/A /* NOTREACHED */
2N/A return (NULL);
2N/A}
2N/A
2N/A/*
2N/A * Do the completion semantics for a request that was either canceled
2N/A * by _aio_cancel_req() or was completed by _aio_do_request().
2N/A */
2N/Astatic void
2N/A_aiodone(aio_req_t *reqp, ssize_t retval, int error)
2N/A{
2N/A aio_result_t *resultp = reqp->req_resultp;
2N/A int notify = 0;
2N/A aio_lio_t *head;
2N/A int sigev_none;
2N/A int sigev_signal;
2N/A int sigev_thread;
2N/A int sigev_port;
2N/A notif_param_t np;
2N/A
2N/A /*
2N/A * We call _aiodone() only for Posix I/O.
2N/A */
2N/A ASSERT(POSIX_AIO(reqp));
2N/A
2N/A sigev_none = 0;
2N/A sigev_signal = 0;
2N/A sigev_thread = 0;
2N/A sigev_port = 0;
2N/A np.np_signo = 0;
2N/A np.np_port = -1;
2N/A np.np_lio_signo = 0;
2N/A np.np_lio_port = -1;
2N/A
2N/A switch (reqp->req_sigevent.sigev_notify) {
2N/A case SIGEV_NONE:
2N/A sigev_none = 1;
2N/A break;
2N/A case SIGEV_SIGNAL:
2N/A sigev_signal = 1;
2N/A break;
2N/A case SIGEV_THREAD:
2N/A sigev_thread = 1;
2N/A break;
2N/A case SIGEV_PORT:
2N/A sigev_port = 1;
2N/A break;
2N/A default:
2N/A aio_panic("_aiodone: improper sigev_notify");
2N/A break;
2N/A }
2N/A
2N/A /*
2N/A * Figure out the notification parameters while holding __aio_mutex.
2N/A * Actually perform the notifications after dropping __aio_mutex.
2N/A * This allows us to sleep for a long time (if the notifications
2N/A * incur delays) without impeding other async I/O operations.
2N/A */
2N/A
2N/A sig_mutex_lock(&__aio_mutex);
2N/A
2N/A if (sigev_signal) {
2N/A if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
2N/A notify = 1;
2N/A np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
2N/A } else if (sigev_thread | sigev_port) {
2N/A if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
2N/A notify = 1;
2N/A np.np_event = reqp->req_op;
2N/A if (np.np_event == AIOFSYNC && reqp->req_largefile)
2N/A np.np_event = AIOFSYNC64;
2N/A np.np_object = (uintptr_t)reqp->req_aiocbp;
2N/A np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
2N/A }
2N/A
2N/A if (resultp->aio_errno == EINPROGRESS)
2N/A _aio_set_result(reqp, retval, error);
2N/A
2N/A _aio_outstand_cnt--;
2N/A
2N/A head = reqp->req_head;
2N/A reqp->req_head = NULL;
2N/A
2N/A if (sigev_none) {
2N/A _aio_enq_doneq(reqp);
2N/A reqp = NULL;
2N/A } else {
2N/A (void) _aio_hash_del(resultp);
2N/A _aio_req_mark_done(reqp);
2N/A }
2N/A
2N/A _aio_waitn_wakeup();
2N/A
2N/A /*
2N/A * __aio_waitn() sets AIO_WAIT_INPROGRESS and
2N/A * __aio_suspend() increments "_aio_kernel_suspend"
2N/A * when they are waiting in the kernel for completed I/Os.
2N/A *
2N/A * _kaio(AIONOTIFY) awakes the corresponding function
2N/A * in the kernel; then the corresponding __aio_waitn() or
2N/A * __aio_suspend() function could reap the recently
2N/A * completed I/Os (_aiodone()).
2N/A */
2N/A if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
2N/A (void) _kaio(AIONOTIFY);
2N/A
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A
2N/A if (head != NULL) {
2N/A /*
2N/A * If all the lio requests have completed,
2N/A * prepare to notify the waiting thread.
2N/A */
2N/A sig_mutex_lock(&head->lio_mutex);
2N/A ASSERT(head->lio_refcnt == head->lio_nent);
2N/A if (head->lio_refcnt == 1) {
2N/A int waiting = 0;
2N/A if (head->lio_mode == LIO_WAIT) {
2N/A if ((waiting = head->lio_waiting) != 0)
2N/A (void) cond_signal(&head->lio_cond_cv);
2N/A } else if (head->lio_port < 0) { /* none or signal */
2N/A if ((np.np_lio_signo = head->lio_signo) != 0)
2N/A notify = 1;
2N/A np.np_lio_user = head->lio_sigval.sival_ptr;
2N/A } else { /* thread or port */
2N/A notify = 1;
2N/A np.np_lio_port = head->lio_port;
2N/A np.np_lio_event = head->lio_event;
2N/A np.np_lio_object =
2N/A (uintptr_t)head->lio_sigevent;
2N/A np.np_lio_user = head->lio_sigval.sival_ptr;
2N/A }
2N/A head->lio_nent = head->lio_refcnt = 0;
2N/A sig_mutex_unlock(&head->lio_mutex);
2N/A if (waiting == 0)
2N/A _aio_lio_free(head);
2N/A } else {
2N/A head->lio_nent--;
2N/A head->lio_refcnt--;
2N/A sig_mutex_unlock(&head->lio_mutex);
2N/A }
2N/A }
2N/A
2N/A /*
2N/A * The request is completed; now perform the notifications.
2N/A */
2N/A if (notify) {
2N/A if (reqp != NULL) {
2N/A /*
2N/A * We usually put the request on the notification
2N/A * queue because we don't want to block and delay
2N/A * other operations behind us in the work queue.
2N/A * Also we must never block on a cancel notification
2N/A * because we are being called from an application
2N/A * thread in this case and that could lead to deadlock
2N/A * if no other thread is receiving notificatins.
2N/A */
2N/A reqp->req_notify = np;
2N/A reqp->req_op = AIONOTIFY;
2N/A _aio_req_add(reqp, &__workers_no, AIONOTIFY);
2N/A reqp = NULL;
2N/A } else {
2N/A /*
2N/A * We already put the request on the done queue,
2N/A * so we can't queue it to the notification queue.
2N/A * Just do the notification directly.
2N/A */
2N/A send_notification(&np);
2N/A }
2N/A }
2N/A
2N/A if (reqp != NULL)
2N/A _aio_req_free(reqp);
2N/A}
2N/A
2N/A/*
2N/A * Delete fsync requests from list head until there is
2N/A * only one left. Return 0 when there is only one,
2N/A * otherwise return a non-zero value.
2N/A */
2N/Astatic int
2N/A_aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
2N/A{
2N/A aio_lio_t *head = reqp->req_head;
2N/A int rval = 0;
2N/A
2N/A ASSERT(reqp == aiowp->work_req);
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A sig_mutex_lock(&head->lio_mutex);
2N/A if (head->lio_refcnt > 1) {
2N/A head->lio_refcnt--;
2N/A head->lio_nent--;
2N/A aiowp->work_req = NULL;
2N/A sig_mutex_unlock(&head->lio_mutex);
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A sig_mutex_lock(&__aio_mutex);
2N/A _aio_outstand_cnt--;
2N/A _aio_waitn_wakeup();
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A _aio_req_free(reqp);
2N/A return (1);
2N/A }
2N/A ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
2N/A reqp->req_head = NULL;
2N/A if (head->lio_canned)
2N/A reqp->req_state = AIO_REQ_CANCELED;
2N/A if (head->lio_mode == LIO_DESTROY) {
2N/A aiowp->work_req = NULL;
2N/A rval = 1;
2N/A }
2N/A sig_mutex_unlock(&head->lio_mutex);
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A head->lio_refcnt--;
2N/A head->lio_nent--;
2N/A _aio_lio_free(head);
2N/A if (rval != 0)
2N/A _aio_req_free(reqp);
2N/A return (rval);
2N/A}
2N/A
2N/A/*
2N/A * A worker is set idle when its work queue is empty.
2N/A * The worker checks again that it has no more work
2N/A * and then goes to sleep waiting for more work.
2N/A */
2N/Aint
2N/A_aio_idle(aio_worker_t *aiowp)
2N/A{
2N/A int error = 0;
2N/A
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A if (aiowp->work_count1 == 0) {
2N/A ASSERT(aiowp->work_minload1 == 0);
2N/A aiowp->work_idleflg = 1;
2N/A /*
2N/A * A cancellation handler is not needed here.
2N/A * aio worker threads are never cancelled via pthread_cancel().
2N/A */
2N/A error = sig_cond_wait(&aiowp->work_idle_cv,
2N/A &aiowp->work_qlock1);
2N/A /*
2N/A * The idle flag is normally cleared before worker is awakened
2N/A * by aio_req_add(). On error (EINTR), we clear it ourself.
2N/A */
2N/A if (error)
2N/A aiowp->work_idleflg = 0;
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A return (error);
2N/A}
2N/A
2N/A/*
2N/A * A worker's completed AIO requests are placed onto a global
2N/A * done queue. The application is only sent a SIGIO signal if
2N/A * the process has a handler enabled and it is not waiting via
2N/A * aiowait().
2N/A */
2N/Astatic void
2N/A_aio_work_done(aio_worker_t *aiowp)
2N/A{
2N/A aio_req_t *reqp;
2N/A
2N/A sig_mutex_lock(&__aio_mutex);
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A reqp = aiowp->work_prev1;
2N/A reqp->req_next = NULL;
2N/A aiowp->work_done1 = 0;
2N/A aiowp->work_tail1 = aiowp->work_next1;
2N/A if (aiowp->work_tail1 == NULL)
2N/A aiowp->work_head1 = NULL;
2N/A aiowp->work_prev1 = NULL;
2N/A _aio_outstand_cnt--;
2N/A _aio_req_done_cnt--;
2N/A if (reqp->req_state == AIO_REQ_CANCELED) {
2N/A /*
2N/A * Request got cancelled after it was marked done. This can
2N/A * happen because _aio_finish_request() marks it AIO_REQ_DONE
2N/A * and drops all locks. Don't add the request to the done
2N/A * queue and just discard it.
2N/A */
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A _aio_req_free(reqp);
2N/A if (_aio_outstand_cnt == 0 && _aiowait_flag) {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A (void) _kaio(AIONOTIFY);
2N/A } else {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A }
2N/A return;
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A _aio_donecnt++;
2N/A ASSERT(_aio_donecnt > 0 &&
2N/A _aio_outstand_cnt >= 0 &&
2N/A _aio_req_done_cnt >= 0);
2N/A ASSERT(reqp != NULL);
2N/A
2N/A if (_aio_done_tail == NULL) {
2N/A _aio_done_head = _aio_done_tail = reqp;
2N/A } else {
2N/A _aio_done_head->req_next = reqp;
2N/A _aio_done_head = reqp;
2N/A }
2N/A
2N/A if (_aiowait_flag) {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A (void) _kaio(AIONOTIFY);
2N/A } else {
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (_sigio_enabled)
2N/A (void) kill(__pid, SIGIO);
2N/A }
2N/A}
2N/A
2N/A/*
2N/A * The done queue consists of AIO requests that are in either the
2N/A * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
2N/A * are discarded. If the done queue is empty then NULL is returned.
2N/A * Otherwise the address of a done aio_result_t is returned.
2N/A */
2N/Aaio_result_t *
2N/A_aio_req_done(void)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_result_t *resultp;
2N/A
2N/A ASSERT(MUTEX_HELD(&__aio_mutex));
2N/A
2N/A if ((reqp = _aio_done_tail) != NULL) {
2N/A if ((_aio_done_tail = reqp->req_next) == NULL)
2N/A _aio_done_head = NULL;
2N/A ASSERT(_aio_donecnt > 0);
2N/A _aio_donecnt--;
2N/A (void) _aio_hash_del(reqp->req_resultp);
2N/A resultp = reqp->req_resultp;
2N/A ASSERT(reqp->req_state == AIO_REQ_DONE);
2N/A _aio_req_free(reqp);
2N/A return (resultp);
2N/A }
2N/A /* is queue empty? */
2N/A if (reqp == NULL && _aio_outstand_cnt == 0) {
2N/A return ((aio_result_t *)-1);
2N/A }
2N/A return (NULL);
2N/A}
2N/A
2N/A/*
2N/A * Set the return and errno values for the application's use.
2N/A *
2N/A * For the Posix interfaces, we must set the return value first followed
2N/A * by the errno value because the Posix interfaces allow for a change
2N/A * in the errno value from EINPROGRESS to something else to signal
2N/A * the completion of the asynchronous request.
2N/A *
2N/A * The opposite is true for the Solaris interfaces. These allow for
2N/A * a change in the return value from AIO_INPROGRESS to something else
2N/A * to signal the completion of the asynchronous request.
2N/A */
2N/Avoid
2N/A_aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
2N/A{
2N/A aio_result_t *resultp = reqp->req_resultp;
2N/A
2N/A if (POSIX_AIO(reqp)) {
2N/A resultp->aio_return = retval;
2N/A membar_producer();
2N/A resultp->aio_errno = error;
2N/A } else {
2N/A resultp->aio_errno = error;
2N/A membar_producer();
2N/A resultp->aio_return = retval;
2N/A }
2N/A}
2N/A
2N/A/*
2N/A * Add an AIO request onto the next work queue.
2N/A * A circular list of workers is used to choose the next worker.
2N/A */
2N/Avoid
2N/A_aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
2N/A{
2N/A ulwp_t *self = curthread;
2N/A aio_worker_t *aiowp;
2N/A aio_worker_t *first;
2N/A int load_bal_flg = 1;
2N/A int found;
2N/A
2N/A ASSERT(reqp->req_state != AIO_REQ_DONEQ);
2N/A reqp->req_next = NULL;
2N/A /*
2N/A * Try to acquire the next worker's work queue. If it is locked,
2N/A * then search the list of workers until a queue is found unlocked,
2N/A * or until the list is completely traversed at which point another
2N/A * worker will be created.
2N/A */
2N/A sigoff(self); /* defer SIGIO */
2N/A sig_mutex_lock(&__aio_mutex);
2N/A first = aiowp = *nextworker;
2N/A if (mode != AIONOTIFY)
2N/A _aio_outstand_cnt++;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A
2N/A switch (mode) {
2N/A case AIOREAD:
2N/A case AIOWRITE:
2N/A case AIOAREAD:
2N/A case AIOAWRITE:
2N/A#if !defined(_LP64)
2N/A case AIOAREAD64:
2N/A case AIOAWRITE64:
2N/A#endif
2N/A /* try to find an idle worker */
2N/A found = 0;
2N/A do {
2N/A if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
2N/A if (aiowp->work_idleflg) {
2N/A found = 1;
2N/A break;
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A }
2N/A } while ((aiowp = aiowp->work_forw) != first);
2N/A
2N/A if (found) {
2N/A aiowp->work_minload1++;
2N/A break;
2N/A }
2N/A
2N/A /* try to acquire some worker's queue lock */
2N/A do {
2N/A if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
2N/A found = 1;
2N/A break;
2N/A }
2N/A } while ((aiowp = aiowp->work_forw) != first);
2N/A
2N/A /*
2N/A * Create more workers when the workers appear overloaded.
2N/A * Either all the workers are busy draining their queues
2N/A * or no worker's queue lock could be acquired.
2N/A */
2N/A if (!found) {
2N/A if (_aio_worker_cnt < _max_workers) {
2N/A if (_aio_create_worker(reqp, mode))
2N/A aio_panic("_aio_req_add: add worker");
2N/A sigon(self); /* reenable SIGIO */
2N/A return;
2N/A }
2N/A
2N/A /*
2N/A * No worker available and we have created
2N/A * _max_workers, keep going through the
2N/A * list slowly until we get a lock
2N/A */
2N/A while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
2N/A /*
2N/A * give someone else a chance
2N/A */
2N/A _aio_delay(1);
2N/A aiowp = aiowp->work_forw;
2N/A }
2N/A }
2N/A
2N/A ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
2N/A if (_aio_worker_cnt < _max_workers &&
2N/A aiowp->work_minload1 >= _minworkload) {
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A sig_mutex_lock(&__aio_mutex);
2N/A *nextworker = aiowp->work_forw;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A if (_aio_create_worker(reqp, mode))
2N/A aio_panic("aio_req_add: add worker");
2N/A sigon(self); /* reenable SIGIO */
2N/A return;
2N/A }
2N/A aiowp->work_minload1++;
2N/A break;
2N/A case AIOFSYNC:
2N/A case AIONOTIFY:
2N/A load_bal_flg = 0;
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A break;
2N/A default:
2N/A aio_panic("_aio_req_add: invalid mode");
2N/A break;
2N/A }
2N/A /*
2N/A * Put request onto worker's work queue.
2N/A */
2N/A if (aiowp->work_tail1 == NULL) {
2N/A ASSERT(aiowp->work_count1 == 0);
2N/A aiowp->work_tail1 = reqp;
2N/A aiowp->work_next1 = reqp;
2N/A } else {
2N/A aiowp->work_head1->req_next = reqp;
2N/A if (aiowp->work_next1 == NULL)
2N/A aiowp->work_next1 = reqp;
2N/A }
2N/A reqp->req_state = AIO_REQ_QUEUED;
2N/A reqp->req_worker = aiowp;
2N/A aiowp->work_head1 = reqp;
2N/A /*
2N/A * Awaken worker if it is not currently active.
2N/A */
2N/A if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
2N/A aiowp->work_idleflg = 0;
2N/A (void) cond_signal(&aiowp->work_idle_cv);
2N/A }
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A
2N/A if (load_bal_flg) {
2N/A sig_mutex_lock(&__aio_mutex);
2N/A *nextworker = aiowp->work_forw;
2N/A sig_mutex_unlock(&__aio_mutex);
2N/A }
2N/A sigon(self); /* reenable SIGIO */
2N/A}
2N/A
2N/A/*
2N/A * Get an AIO request for a specified worker.
2N/A * If the work queue is empty, return NULL.
2N/A */
2N/Aaio_req_t *
2N/A_aio_req_get(aio_worker_t *aiowp)
2N/A{
2N/A aio_req_t *reqp;
2N/A
2N/A sig_mutex_lock(&aiowp->work_qlock1);
2N/A if ((reqp = aiowp->work_next1) != NULL) {
2N/A /*
2N/A * Remove a POSIX request from the queue; the
2N/A * request queue is a singularly linked list
2N/A * with a previous pointer. The request is
2N/A * removed by updating the previous pointer.
2N/A *
2N/A * Non-posix requests are left on the queue
2N/A * to eventually be placed on the done queue.
2N/A */
2N/A
2N/A if (POSIX_AIO(reqp)) {
2N/A if (aiowp->work_prev1 == NULL) {
2N/A aiowp->work_tail1 = reqp->req_next;
2N/A if (aiowp->work_tail1 == NULL)
2N/A aiowp->work_head1 = NULL;
2N/A } else {
2N/A aiowp->work_prev1->req_next = reqp->req_next;
2N/A if (aiowp->work_head1 == reqp)
2N/A aiowp->work_head1 = reqp->req_next;
2N/A }
2N/A
2N/A } else {
2N/A aiowp->work_prev1 = reqp;
2N/A ASSERT(aiowp->work_done1 >= 0);
2N/A aiowp->work_done1++;
2N/A }
2N/A ASSERT(reqp != reqp->req_next);
2N/A aiowp->work_next1 = reqp->req_next;
2N/A ASSERT(aiowp->work_count1 >= 1);
2N/A aiowp->work_count1--;
2N/A switch (reqp->req_op) {
2N/A case AIOREAD:
2N/A case AIOWRITE:
2N/A case AIOAREAD:
2N/A case AIOAWRITE:
2N/A#if !defined(_LP64)
2N/A case AIOAREAD64:
2N/A case AIOAWRITE64:
2N/A#endif
2N/A ASSERT(aiowp->work_minload1 > 0);
2N/A aiowp->work_minload1--;
2N/A break;
2N/A }
2N/A reqp->req_state = AIO_REQ_INPROGRESS;
2N/A }
2N/A aiowp->work_req = reqp;
2N/A ASSERT(reqp != NULL || aiowp->work_count1 == 0);
2N/A sig_mutex_unlock(&aiowp->work_qlock1);
2N/A return (reqp);
2N/A}
2N/A
2N/Astatic void
2N/A_aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
2N/A{
2N/A aio_req_t **last;
2N/A aio_req_t *lastrp;
2N/A aio_req_t *next;
2N/A
2N/A ASSERT(aiowp != NULL);
2N/A ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
2N/A if (POSIX_AIO(reqp)) {
2N/A if (ostate != AIO_REQ_QUEUED)
2N/A return;
2N/A }
2N/A last = &aiowp->work_tail1;
2N/A lastrp = aiowp->work_tail1;
2N/A ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
2N/A while ((next = *last) != NULL) {
2N/A if (next == reqp) {
2N/A *last = next->req_next;
2N/A if (aiowp->work_next1 == next)
2N/A aiowp->work_next1 = next->req_next;
2N/A
2N/A /*
2N/A * if this is the first request on the queue, move
2N/A * the lastrp pointer forward.
2N/A */
2N/A if (lastrp == next)
2N/A lastrp = next->req_next;
2N/A
2N/A /*
2N/A * if this request is pointed by work_head1, then
2N/A * make work_head1 point to the last request that is
2N/A * present on the queue.
2N/A */
2N/A if (aiowp->work_head1 == next)
2N/A aiowp->work_head1 = lastrp;
2N/A
2N/A /*
2N/A * work_prev1 is used only in non posix case and it
2N/A * points to the current AIO_REQ_INPROGRESS request.
2N/A * If work_prev1 points to this request which is being
2N/A * deleted, make work_prev1 NULL and set work_done1
2N/A * to 0.
2N/A *
2N/A * A worker thread can be processing only one request
2N/A * at a time.
2N/A */
2N/A if (aiowp->work_prev1 == next) {
2N/A ASSERT(ostate == AIO_REQ_INPROGRESS &&
2N/A !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
2N/A aiowp->work_prev1 = NULL;
2N/A aiowp->work_done1--;
2N/A }
2N/A
2N/A if (ostate == AIO_REQ_QUEUED) {
2N/A ASSERT(aiowp->work_count1 >= 1);
2N/A aiowp->work_count1--;
2N/A ASSERT(aiowp->work_minload1 >= 1);
2N/A aiowp->work_minload1--;
2N/A }
2N/A return;
2N/A }
2N/A last = &next->req_next;
2N/A lastrp = next;
2N/A }
2N/A /* NOTREACHED */
2N/A}
2N/A
2N/Astatic void
2N/A_aio_enq_doneq(aio_req_t *reqp)
2N/A{
2N/A if (_aio_doneq == NULL) {
2N/A _aio_doneq = reqp;
2N/A reqp->req_next = reqp->req_prev = reqp;
2N/A } else {
2N/A reqp->req_next = _aio_doneq;
2N/A reqp->req_prev = _aio_doneq->req_prev;
2N/A _aio_doneq->req_prev->req_next = reqp;
2N/A _aio_doneq->req_prev = reqp;
2N/A }
2N/A reqp->req_state = AIO_REQ_DONEQ;
2N/A _aio_doneq_cnt++;
2N/A}
2N/A
2N/A/*
2N/A * caller owns the _aio_mutex
2N/A */
2N/Aaio_req_t *
2N/A_aio_req_remove(aio_req_t *reqp)
2N/A{
2N/A if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2N/A return (NULL);
2N/A
2N/A if (reqp) {
2N/A /* request in done queue */
2N/A if (_aio_doneq == reqp)
2N/A _aio_doneq = reqp->req_next;
2N/A if (_aio_doneq == reqp) {
2N/A /* only one request on queue */
2N/A _aio_doneq = NULL;
2N/A } else {
2N/A aio_req_t *tmp = reqp->req_next;
2N/A reqp->req_prev->req_next = tmp;
2N/A tmp->req_prev = reqp->req_prev;
2N/A }
2N/A } else if ((reqp = _aio_doneq) != NULL) {
2N/A if (reqp == reqp->req_next) {
2N/A /* only one request on queue */
2N/A _aio_doneq = NULL;
2N/A } else {
2N/A reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2N/A _aio_doneq->req_prev = reqp->req_prev;
2N/A }
2N/A }
2N/A if (reqp) {
2N/A _aio_doneq_cnt--;
2N/A reqp->req_next = reqp->req_prev = reqp;
2N/A reqp->req_state = AIO_REQ_DONE;
2N/A }
2N/A return (reqp);
2N/A}
2N/A
2N/A/*
2N/A * An AIO request is identified by an aio_result_t pointer. The library
2N/A * maps this aio_result_t pointer to its internal representation using a
2N/A * hash table. This function adds an aio_result_t pointer to the hash table.
2N/A */
2N/Astatic int
2N/A_aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2N/A{
2N/A aio_hash_t *hashp;
2N/A aio_req_t **prev;
2N/A aio_req_t *next;
2N/A
2N/A hashp = _aio_hash + AIOHASH(resultp);
2N/A lmutex_lock(&hashp->hash_lock);
2N/A prev = &hashp->hash_ptr;
2N/A while ((next = *prev) != NULL) {
2N/A if (resultp == next->req_resultp) {
2N/A lmutex_unlock(&hashp->hash_lock);
2N/A return (-1);
2N/A }
2N/A prev = &next->req_link;
2N/A }
2N/A *prev = reqp;
2N/A ASSERT(reqp->req_link == NULL);
2N/A lmutex_unlock(&hashp->hash_lock);
2N/A return (0);
2N/A}
2N/A
2N/A/*
2N/A * Remove an entry from the hash table.
2N/A */
2N/Aaio_req_t *
2N/A_aio_hash_del(aio_result_t *resultp)
2N/A{
2N/A aio_hash_t *hashp;
2N/A aio_req_t **prev;
2N/A aio_req_t *next = NULL;
2N/A
2N/A if (_aio_hash != NULL) {
2N/A hashp = _aio_hash + AIOHASH(resultp);
2N/A lmutex_lock(&hashp->hash_lock);
2N/A prev = &hashp->hash_ptr;
2N/A while ((next = *prev) != NULL) {
2N/A if (resultp == next->req_resultp) {
2N/A *prev = next->req_link;
2N/A next->req_link = NULL;
2N/A break;
2N/A }
2N/A prev = &next->req_link;
2N/A }
2N/A lmutex_unlock(&hashp->hash_lock);
2N/A }
2N/A return (next);
2N/A}
2N/A
2N/A/*
2N/A * find an entry in the hash table
2N/A */
2N/Aaio_req_t *
2N/A_aio_hash_find(aio_result_t *resultp)
2N/A{
2N/A aio_hash_t *hashp;
2N/A aio_req_t **prev;
2N/A aio_req_t *next = NULL;
2N/A
2N/A if (_aio_hash != NULL) {
2N/A hashp = _aio_hash + AIOHASH(resultp);
2N/A lmutex_lock(&hashp->hash_lock);
2N/A prev = &hashp->hash_ptr;
2N/A while ((next = *prev) != NULL) {
2N/A if (resultp == next->req_resultp)
2N/A break;
2N/A prev = &next->req_link;
2N/A }
2N/A lmutex_unlock(&hashp->hash_lock);
2N/A }
2N/A return (next);
2N/A}
2N/A
2N/A/*
2N/A * AIO interface for POSIX
2N/A */
2N/Aint
2N/A_aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2N/A int mode, int flg)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_args_t *ap;
2N/A int kerr;
2N/A
2N/A if (aiocbp == NULL) {
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A
2N/A /* initialize kaio */
2N/A if (!_kaio_ok)
2N/A _kaio_init();
2N/A
2N/A aiocbp->aio_state = NOCHECK;
2N/A
2N/A /*
2N/A * If we have been called because a list I/O
2N/A * kaio() failed, we dont want to repeat the
2N/A * system call
2N/A */
2N/A
2N/A if (flg & AIO_KAIO) {
2N/A /*
2N/A * Try kernel aio first.
2N/A * If errno is ENOTSUP/EBADFD,
2N/A * fall back to the thread implementation.
2N/A */
2N/A if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2N/A aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2N/A aiocbp->aio_state = CHECK;
2N/A kerr = (int)_kaio(mode, aiocbp);
2N/A if (kerr == 0)
2N/A return (0);
2N/A if (errno != ENOTSUP && errno != EBADFD) {
2N/A aiocbp->aio_resultp.aio_errno = errno;
2N/A aiocbp->aio_resultp.aio_return = -1;
2N/A aiocbp->aio_state = NOCHECK;
2N/A return (-1);
2N/A }
2N/A if (errno == EBADFD)
2N/A SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2N/A }
2N/A }
2N/A
2N/A aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2N/A aiocbp->aio_state = USERAIO;
2N/A
2N/A if (!__uaio_ok && __uaio_init() == -1)
2N/A return (-1);
2N/A
2N/A if ((reqp = _aio_req_alloc()) == NULL) {
2N/A errno = EAGAIN;
2N/A return (-1);
2N/A }
2N/A
2N/A /*
2N/A * If an LIO request, add the list head to the aio request
2N/A */
2N/A reqp->req_head = lio_head;
2N/A reqp->req_type = AIO_POSIX_REQ;
2N/A reqp->req_op = mode;
2N/A reqp->req_largefile = 0;
2N/A
2N/A if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2N/A reqp->req_sigevent.sigev_signo =
2N/A aiocbp->aio_sigevent.sigev_signo;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2N/A port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2N/A /*
2N/A * Reuse the sigevent structure to contain the port number
2N/A * and the user value. Same for SIGEV_THREAD, below.
2N/A */
2N/A reqp->req_sigevent.sigev_signo =
2N/A pn->portnfy_port;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A pn->portnfy_user;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2N/A /*
2N/A * The sigevent structure contains the port number
2N/A * and the user value. Same for SIGEV_PORT, above.
2N/A */
2N/A reqp->req_sigevent.sigev_signo =
2N/A aiocbp->aio_sigevent.sigev_signo;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A }
2N/A
2N/A reqp->req_resultp = &aiocbp->aio_resultp;
2N/A reqp->req_aiocbp = aiocbp;
2N/A ap = &reqp->req_args;
2N/A ap->fd = aiocbp->aio_fildes;
2N/A ap->buf = (caddr_t)aiocbp->aio_buf;
2N/A ap->bufsz = aiocbp->aio_nbytes;
2N/A ap->offset = aiocbp->aio_offset;
2N/A
2N/A if ((flg & AIO_NO_DUPS) &&
2N/A _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2N/A aio_panic("_aio_rw(): request already in hash table");
2N/A _aio_req_free(reqp);
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A _aio_req_add(reqp, nextworker, mode);
2N/A return (0);
2N/A}
2N/A
2N/A#if !defined(_LP64)
2N/A/*
2N/A * 64-bit AIO interface for POSIX
2N/A */
2N/Aint
2N/A_aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2N/A int mode, int flg)
2N/A{
2N/A aio_req_t *reqp;
2N/A aio_args_t *ap;
2N/A int kerr;
2N/A
2N/A if (aiocbp == NULL) {
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A
2N/A /* initialize kaio */
2N/A if (!_kaio_ok)
2N/A _kaio_init();
2N/A
2N/A aiocbp->aio_state = NOCHECK;
2N/A
2N/A /*
2N/A * If we have been called because a list I/O
2N/A * kaio() failed, we dont want to repeat the
2N/A * system call
2N/A */
2N/A
2N/A if (flg & AIO_KAIO) {
2N/A /*
2N/A * Try kernel aio first.
2N/A * If errno is ENOTSUP/EBADFD,
2N/A * fall back to the thread implementation.
2N/A */
2N/A if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2N/A aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2N/A aiocbp->aio_state = CHECK;
2N/A kerr = (int)_kaio(mode, aiocbp);
2N/A if (kerr == 0)
2N/A return (0);
2N/A if (errno != ENOTSUP && errno != EBADFD) {
2N/A aiocbp->aio_resultp.aio_errno = errno;
2N/A aiocbp->aio_resultp.aio_return = -1;
2N/A aiocbp->aio_state = NOCHECK;
2N/A return (-1);
2N/A }
2N/A if (errno == EBADFD)
2N/A SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2N/A }
2N/A }
2N/A
2N/A aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2N/A aiocbp->aio_state = USERAIO;
2N/A
2N/A if (!__uaio_ok && __uaio_init() == -1)
2N/A return (-1);
2N/A
2N/A if ((reqp = _aio_req_alloc()) == NULL) {
2N/A errno = EAGAIN;
2N/A return (-1);
2N/A }
2N/A
2N/A /*
2N/A * If an LIO request, add the list head to the aio request
2N/A */
2N/A reqp->req_head = lio_head;
2N/A reqp->req_type = AIO_POSIX_REQ;
2N/A reqp->req_op = mode;
2N/A reqp->req_largefile = 1;
2N/A
2N/A if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2N/A reqp->req_sigevent.sigev_signo =
2N/A aiocbp->aio_sigevent.sigev_signo;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2N/A port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2N/A reqp->req_sigevent.sigev_signo =
2N/A pn->portnfy_port;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A pn->portnfy_user;
2N/A } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2N/A reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2N/A reqp->req_sigevent.sigev_signo =
2N/A aiocbp->aio_sigevent.sigev_signo;
2N/A reqp->req_sigevent.sigev_value.sival_ptr =
2N/A aiocbp->aio_sigevent.sigev_value.sival_ptr;
2N/A }
2N/A
2N/A reqp->req_resultp = &aiocbp->aio_resultp;
2N/A reqp->req_aiocbp = aiocbp;
2N/A ap = &reqp->req_args;
2N/A ap->fd = aiocbp->aio_fildes;
2N/A ap->buf = (caddr_t)aiocbp->aio_buf;
2N/A ap->bufsz = aiocbp->aio_nbytes;
2N/A ap->offset = aiocbp->aio_offset;
2N/A
2N/A if ((flg & AIO_NO_DUPS) &&
2N/A _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2N/A aio_panic("_aio_rw64(): request already in hash table");
2N/A _aio_req_free(reqp);
2N/A errno = EINVAL;
2N/A return (-1);
2N/A }
2N/A _aio_req_add(reqp, nextworker, mode);
2N/A return (0);
2N/A}
2N/A#endif /* !defined(_LP64) */