posix_aio.c revision 6e628f2786bf7adece487b606a56068e35e3fcd2
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2006 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
/*
* posix_aio.c implements the POSIX async. I/O functions.
*
* aio_read
* aio_write
* aio_error
* aio_return
* aio_suspend
* lio_listio
* aio_fsync
* aio_cancel
*/
#include "synonyms.h"
#include "thr_uberdata.h"
#include "asyncio.h"
#include <atomic.h>
#include <sys/file.h>
#include <sys/port.h>
extern int __fdsync(int, int);
cond_t _aio_waitn_cv = DEFAULTCV; /* wait for end of aio_waitn */
static int _aio_check_timeout(const timespec_t *, timespec_t *, int *);
/* defines for timedwait in __aio_waitn() and __aio_suspend() */
#define AIO_TIMEOUT_INDEF -1
#define AIO_TIMEOUT_POLL 0
#define AIO_TIMEOUT_WAIT 1
#define AIO_TIMEOUT_UNDEF 2
/*
* List I/O stuff
*/
static void _lio_list_decr(aio_lio_t *);
static long aio_list_max = 0;
int
aio_read(aiocb_t *aiocbp)
{
if (aiocbp == NULL || aiocbp->aio_reqprio != 0) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread(aiocbp) != 0)
return (-1);
aiocbp->aio_lio_opcode = LIO_READ;
return (_aio_rw(aiocbp, NULL, &__nextworker_rw, AIOAREAD,
(AIO_KAIO | AIO_NO_DUPS)));
}
int
aio_write(aiocb_t *aiocbp)
{
if (aiocbp == NULL || aiocbp->aio_reqprio != 0) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread(aiocbp) != 0)
return (-1);
aiocbp->aio_lio_opcode = LIO_WRITE;
return (_aio_rw(aiocbp, NULL, &__nextworker_rw, AIOAWRITE,
(AIO_KAIO | AIO_NO_DUPS)));
}
/*
* __lio_listio() cancellation handler.
*/
/* ARGSUSED */
static void
_lio_listio_cleanup(aio_lio_t *head)
{
int freeit = 0;
ASSERT(MUTEX_HELD(&head->lio_mutex));
if (head->lio_refcnt == 0) {
ASSERT(head->lio_nent == 0);
freeit = 1;
}
head->lio_waiting = 0;
sig_mutex_unlock(&head->lio_mutex);
if (freeit)
_aio_lio_free(head);
}
int
lio_listio(int mode, aiocb_t *_RESTRICT_KYWD const *_RESTRICT_KYWD list,
int nent, struct sigevent *_RESTRICT_KYWD sigevp)
{
int aio_ufs = 0;
int oerrno = 0;
aio_lio_t *head = NULL;
aiocb_t *aiocbp;
int state = 0;
int EIOflg = 0;
int rw;
int do_kaio = 0;
int error;
int i;
if (!_kaio_ok)
_kaio_init();
if (aio_list_max == 0)
aio_list_max = sysconf(_SC_AIO_LISTIO_MAX);
if (nent <= 0 || nent > aio_list_max) {
errno = EINVAL;
return (-1);
}
switch (mode) {
case LIO_WAIT:
state = NOCHECK;
break;
case LIO_NOWAIT:
state = CHECK;
break;
default:
errno = EINVAL;
return (-1);
}
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) == NULL)
continue;
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread(aiocbp) != 0)
return (-1);
if (aiocbp->aio_lio_opcode == LIO_NOP)
aiocbp->aio_state = NOCHECK;
else {
aiocbp->aio_state = state;
if (KAIO_SUPPORTED(aiocbp->aio_fildes))
do_kaio++;
else
aiocbp->aio_resultp.aio_errno = ENOTSUP;
}
}
if (_aio_sigev_thread_init(sigevp) != 0)
return (-1);
if (do_kaio) {
error = (int)_kaio(AIOLIO, mode, list, nent, sigevp);
if (error == 0)
return (0);
oerrno = errno;
} else {
oerrno = errno = ENOTSUP;
error = -1;
}
if (error == -1 && errno == ENOTSUP) {
error = errno = 0;
/*
* If LIO_WAIT, or notification required, allocate a list head.
*/
if (mode == LIO_WAIT ||
(sigevp != NULL &&
(sigevp->sigev_notify == SIGEV_SIGNAL ||
sigevp->sigev_notify == SIGEV_THREAD ||
sigevp->sigev_notify == SIGEV_PORT)))
head = _aio_lio_alloc();
if (head) {
sig_mutex_lock(&head->lio_mutex);
head->lio_mode = mode;
head->lio_largefile = 0;
if (mode == LIO_NOWAIT && sigevp != NULL) {
if (sigevp->sigev_notify == SIGEV_THREAD) {
head->lio_port = sigevp->sigev_signo;
head->lio_event = AIOLIO;
head->lio_sigevent = sigevp;
head->lio_sigval.sival_ptr =
sigevp->sigev_value.sival_ptr;
} else if (sigevp->sigev_notify == SIGEV_PORT) {
port_notify_t *pn =
sigevp->sigev_value.sival_ptr;
head->lio_port = pn->portnfy_port;
head->lio_event = AIOLIO;
head->lio_sigevent = sigevp;
head->lio_sigval.sival_ptr =
pn->portnfy_user;
} else { /* SIGEV_SIGNAL */
head->lio_signo = sigevp->sigev_signo;
head->lio_sigval.sival_ptr =
sigevp->sigev_value.sival_ptr;
}
}
head->lio_nent = head->lio_refcnt = nent;
sig_mutex_unlock(&head->lio_mutex);
}
/*
* find UFS requests, errno == ENOTSUP/EBADFD,
*/
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) == NULL ||
aiocbp->aio_lio_opcode == LIO_NOP ||
(aiocbp->aio_resultp.aio_errno != ENOTSUP &&
aiocbp->aio_resultp.aio_errno != EBADFD)) {
if (head)
_lio_list_decr(head);
continue;
}
if (aiocbp->aio_resultp.aio_errno == EBADFD)
SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
if (aiocbp->aio_reqprio != 0) {
aiocbp->aio_resultp.aio_errno = EINVAL;
aiocbp->aio_resultp.aio_return = -1;
EIOflg = 1;
if (head)
_lio_list_decr(head);
continue;
}
/*
* submit an AIO request with flags AIO_NO_KAIO
* to avoid the kaio() syscall in _aio_rw()
*/
switch (aiocbp->aio_lio_opcode) {
case LIO_READ:
rw = AIOAREAD;
break;
case LIO_WRITE:
rw = AIOAWRITE;
break;
}
error = _aio_rw(aiocbp, head, &__nextworker_rw, rw,
(AIO_NO_KAIO | AIO_NO_DUPS));
if (error == 0)
aio_ufs++;
else {
if (head)
_lio_list_decr(head);
aiocbp->aio_resultp.aio_errno = error;
EIOflg = 1;
}
}
}
if (EIOflg) {
errno = EIO;
return (-1);
}
if (mode == LIO_WAIT && oerrno == ENOTSUP) {
/*
* call kaio(AIOLIOWAIT) to get all outstanding
* kernel AIO requests
*/
if ((nent - aio_ufs) > 0)
(void) _kaio(AIOLIOWAIT, mode, list, nent, sigevp);
if (head != NULL && head->lio_nent > 0) {
sig_mutex_lock(&head->lio_mutex);
while (head->lio_refcnt > 0) {
int err;
head->lio_waiting = 1;
pthread_cleanup_push(_lio_listio_cleanup, head);
err = sig_cond_wait(&head->lio_cond_cv,
&head->lio_mutex);
pthread_cleanup_pop(0);
head->lio_waiting = 0;
if (err && head->lio_nent > 0) {
sig_mutex_unlock(&head->lio_mutex);
errno = err;
return (-1);
}
}
sig_mutex_unlock(&head->lio_mutex);
ASSERT(head->lio_nent == 0 && head->lio_refcnt == 0);
_aio_lio_free(head);
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) != NULL &&
aiocbp->aio_resultp.aio_errno) {
errno = EIO;
return (-1);
}
}
}
return (0);
}
return (error);
}
static void
_lio_list_decr(aio_lio_t *head)
{
sig_mutex_lock(&head->lio_mutex);
head->lio_nent--;
head->lio_refcnt--;
sig_mutex_unlock(&head->lio_mutex);
}
/*
* __aio_suspend() cancellation handler.
*/
/* ARGSUSED */
static void
_aio_suspend_cleanup(int *counter)
{
ASSERT(MUTEX_HELD(&__aio_mutex));
(*counter)--; /* _aio_kernel_suspend or _aio_suscv_cnt */
sig_mutex_unlock(&__aio_mutex);
}
static int
__aio_suspend(void **list, int nent, const timespec_t *timo, int largefile)
{
int cv_err; /* error code from cond_xxx() */
int kerr; /* error code from _kaio(AIOSUSPEND) */
int i;
timespec_t twait; /* copy of timo for internal calculations */
timespec_t *wait = NULL;
int timedwait;
int req_outstanding;
aiocb_t **listp;
aiocb_t *aiocbp;
#if !defined(_LP64)
aiocb64_t **listp64;
aiocb64_t *aiocbp64;
#endif
hrtime_t hrtstart;
hrtime_t hrtend;
hrtime_t hrtres;
#if defined(_LP64)
if (largefile)
aio_panic("__aio_suspend: largefile set when _LP64 defined");
#endif
if (nent <= 0) {
errno = EINVAL;
return (-1);
}
if (timo) {
if (timo->tv_sec < 0 || timo->tv_nsec < 0 ||
timo->tv_nsec >= NANOSEC) {
errno = EINVAL;
return (-1);
}
/* Initialize start time if time monitoring desired */
if (timo->tv_sec > 0 || timo->tv_nsec > 0) {
timedwait = AIO_TIMEOUT_WAIT;
hrtstart = gethrtime();
} else {
/* content of timeout = 0 : polling */
timedwait = AIO_TIMEOUT_POLL;
}
} else {
/* timeout pointer = NULL : wait indefinitely */
timedwait = AIO_TIMEOUT_INDEF;
}
#if !defined(_LP64)
if (largefile) {
listp64 = (aiocb64_t **)list;
for (i = 0; i < nent; i++) {
if ((aiocbp64 = listp64[i]) != NULL &&
aiocbp64->aio_state == CHECK)
aiocbp64->aio_state = CHECKED;
}
} else
#endif /* !_LP64 */
{
listp = (aiocb_t **)list;
for (i = 0; i < nent; i++) {
if ((aiocbp = listp[i]) != NULL &&
aiocbp->aio_state == CHECK)
aiocbp->aio_state = CHECKED;
}
}
sig_mutex_lock(&__aio_mutex);
/*
* The next "if -case" is required to accelerate the
* access to completed RAW-IO requests.
*/
if ((_aio_doneq_cnt + _aio_outstand_cnt) == 0) {
/* Only kernel requests pending */
/*
* _aio_kernel_suspend is used to detect completed non RAW-IO
* requests.
* As long as this thread resides in the kernel (_kaio) further
* asynchronous non RAW-IO requests could be submitted.
*/
_aio_kernel_suspend++;
/*
* Always do the kaio() call without using the KAIO_SUPPORTED()
* checks because it is not mandatory to have a valid fd
* set in the list entries, only the resultp must be set.
*
* _kaio(AIOSUSPEND ...) return values :
* 0: everythink ok, completed request found
* -1: error
* 1: no error : _aiodone awaked the _kaio(AIOSUSPEND,,)
* system call using _kaio(AIONOTIFY). It means, that some
* non RAW-IOs completed inbetween.
*/
pthread_cleanup_push(_aio_suspend_cleanup,
&_aio_kernel_suspend);
pthread_cleanup_push(sig_mutex_lock, &__aio_mutex);
sig_mutex_unlock(&__aio_mutex);
_cancel_prologue();
kerr = (int)_kaio(largefile? AIOSUSPEND64 : AIOSUSPEND,
list, nent, timo, -1);
_cancel_epilogue();
pthread_cleanup_pop(1); /* sig_mutex_lock(&__aio_mutex) */
pthread_cleanup_pop(0);
_aio_kernel_suspend--;
if (!kerr) {
sig_mutex_unlock(&__aio_mutex);
return (0);
}
} else {
kerr = 1; /* simulation: _kaio detected AIONOTIFY */
}
/*
* Return kernel error code if no other IOs are outstanding.
*/
req_outstanding = _aio_doneq_cnt + _aio_outstand_cnt;
sig_mutex_unlock(&__aio_mutex);
if (req_outstanding == 0) {
/* no IOs outstanding in the thread pool */
if (kerr == 1)
/* return "no IOs completed" */
errno = EAGAIN;
return (-1);
}
/*
* IOs using the thread pool are outstanding.
*/
if (timedwait == AIO_TIMEOUT_WAIT) {
/* time monitoring */
hrtend = hrtstart + (hrtime_t)timo->tv_sec * (hrtime_t)NANOSEC +
(hrtime_t)timo->tv_nsec;
hrtres = hrtend - gethrtime();
if (hrtres <= 0)
hrtres = 1;
twait.tv_sec = hrtres / (hrtime_t)NANOSEC;
twait.tv_nsec = hrtres % (hrtime_t)NANOSEC;
wait = &twait;
} else if (timedwait == AIO_TIMEOUT_POLL) {
twait = *timo; /* content of timo = 0 : polling */
wait = &twait;
}
for (;;) {
int error;
int inprogress;
/* first scan file system requests */
inprogress = 0;
for (i = 0; i < nent; i++) {
#if !defined(_LP64)
if (largefile) {
if ((aiocbp64 = listp64[i]) == NULL)
continue;
error = aiocbp64->aio_resultp.aio_errno;
} else
#endif
{
if ((aiocbp = listp[i]) == NULL)
continue;
error = aiocbp->aio_resultp.aio_errno;
}
if (error == EINPROGRESS)
inprogress = 1;
else if (error != ECANCELED) {
errno = 0;
return (0);
}
}
sig_mutex_lock(&__aio_mutex);
/*
* If there aren't outstanding I/Os in the thread pool then
* we have to return here, provided that all kernel RAW-IOs
* also completed.
* If the kernel was notified to return, then we have to check
* possible pending RAW-IOs.
*/
if (_aio_outstand_cnt == 0 && inprogress == 0 && kerr != 1) {
sig_mutex_unlock(&__aio_mutex);
errno = EAGAIN;
break;
}
/*
* There are outstanding IOs in the thread pool or the kernel
* was notified to return.
* Check pending RAW-IOs first.
*/
if (kerr == 1) {
/*
* _aiodone just notified the kernel about
* completed non RAW-IOs (AIONOTIFY was detected).
*/
if (timedwait == AIO_TIMEOUT_WAIT) {
/* Update remaining timeout for the kernel */
hrtres = hrtend - gethrtime();
if (hrtres <= 0) {
/* timer expired */
sig_mutex_unlock(&__aio_mutex);
errno = EAGAIN;
break;
}
wait->tv_sec = hrtres / (hrtime_t)NANOSEC;
wait->tv_nsec = hrtres % (hrtime_t)NANOSEC;
}
_aio_kernel_suspend++;
pthread_cleanup_push(_aio_suspend_cleanup,
&_aio_kernel_suspend);
pthread_cleanup_push(sig_mutex_lock, &__aio_mutex);
sig_mutex_unlock(&__aio_mutex);
_cancel_prologue();
kerr = (int)_kaio(largefile? AIOSUSPEND64 : AIOSUSPEND,
list, nent, wait, -1);
_cancel_epilogue();
pthread_cleanup_pop(1);
pthread_cleanup_pop(0);
_aio_kernel_suspend--;
if (!kerr) {
sig_mutex_unlock(&__aio_mutex);
return (0);
}
}
if (timedwait == AIO_TIMEOUT_POLL) {
sig_mutex_unlock(&__aio_mutex);
errno = EAGAIN;
break;
}
if (timedwait == AIO_TIMEOUT_WAIT) {
/* Update remaining timeout */
hrtres = hrtend - gethrtime();
if (hrtres <= 0) {
/* timer expired */
sig_mutex_unlock(&__aio_mutex);
errno = EAGAIN;
break;
}
wait->tv_sec = hrtres / (hrtime_t)NANOSEC;
wait->tv_nsec = hrtres % (hrtime_t)NANOSEC;
}
if (_aio_outstand_cnt == 0) {
sig_mutex_unlock(&__aio_mutex);
continue;
}
_aio_suscv_cnt++; /* ID for _aiodone (wake up) */
pthread_cleanup_push(_aio_suspend_cleanup, &_aio_suscv_cnt);
if (timedwait == AIO_TIMEOUT_WAIT) {
cv_err = sig_cond_reltimedwait(&_aio_iowait_cv,
&__aio_mutex, wait);
if (cv_err == ETIME)
cv_err = EAGAIN;
} else {
/* wait indefinitely */
cv_err = sig_cond_wait(&_aio_iowait_cv, &__aio_mutex);
}
/* this decrements _aio_suscv_cnt and drops __aio_mutex */
pthread_cleanup_pop(1);
if (cv_err) {
errno = cv_err;
break;
}
}
return (-1);
}
int
aio_suspend(const aiocb_t * const list[], int nent,
const timespec_t *timeout)
{
return (__aio_suspend((void **)list, nent, timeout, 0));
}
int
aio_error(const aiocb_t *aiocbp)
{
const aio_result_t *resultp = &aiocbp->aio_resultp;
int error;
if ((error = resultp->aio_errno) == EINPROGRESS) {
if (aiocbp->aio_state == CHECK) {
/*
* Always do the kaio() call without using the
* KAIO_SUPPORTED() checks because it is not
* mandatory to have a valid fd set in the
* aiocb, only the resultp must be set.
*/
if ((int)_kaio(AIOERROR, aiocbp) == EINVAL) {
errno = EINVAL;
return (-1);
}
error = resultp->aio_errno;
} else if (aiocbp->aio_state == CHECKED) {
((aiocb_t *)aiocbp)->aio_state = CHECK;
}
}
return (error);
}
ssize_t
aio_return(aiocb_t *aiocbp)
{
aio_result_t *resultp = &aiocbp->aio_resultp;
aio_req_t *reqp;
int error;
ssize_t retval;
/*
* The _aiodone() function stores resultp->aio_return before
* storing resultp->aio_errno (with an membar_producer() in
* between). We use membar_consumer() below to ensure proper
* memory ordering between _aiodone() and ourself.
*/
error = resultp->aio_errno;
membar_consumer();
retval = resultp->aio_return;
/*
* we use this condition to indicate either that
* aio_return() has been called before or should
* not have been called yet.
*/
if ((retval == -1 && error == EINVAL) || error == EINPROGRESS) {
errno = error;
return (-1);
}
/*
* Before we return, mark the result as being returned so that later
* calls to aio_return() will return the fact that the result has
* already been returned.
*/
sig_mutex_lock(&__aio_mutex);
/* retest, in case more than one thread actually got in here */
if (resultp->aio_return == -1 && resultp->aio_errno == EINVAL) {
sig_mutex_unlock(&__aio_mutex);
errno = EINVAL;
return (-1);
}
resultp->aio_return = -1;
resultp->aio_errno = EINVAL;
if ((reqp = _aio_hash_del(resultp)) == NULL)
sig_mutex_unlock(&__aio_mutex);
else {
aiocbp->aio_state = NOCHECK;
ASSERT(reqp->req_head == NULL);
(void) _aio_req_remove(reqp);
sig_mutex_unlock(&__aio_mutex);
_aio_req_free(reqp);
}
if (retval == -1)
errno = error;
return (retval);
}
void
_lio_remove(aio_req_t *reqp)
{
aio_lio_t *head;
int refcnt;
if ((head = reqp->req_head) != NULL) {
sig_mutex_lock(&head->lio_mutex);
ASSERT(head->lio_refcnt == head->lio_nent);
refcnt = --head->lio_nent;
head->lio_refcnt--;
sig_mutex_unlock(&head->lio_mutex);
if (refcnt == 0)
_aio_lio_free(head);
reqp->req_head = NULL;
}
}
/*
* This function returns the number of asynchronous I/O requests submitted.
*/
static int
__aio_fsync_bar(aiocb_t *aiocbp, aio_lio_t *head, aio_worker_t *aiowp,
int workerscnt)
{
int i;
int error;
aio_worker_t *next = aiowp;
for (i = 0; i < workerscnt; i++) {
error = _aio_rw(aiocbp, head, &next, AIOFSYNC, AIO_NO_KAIO);
if (error != 0) {
sig_mutex_lock(&head->lio_mutex);
head->lio_mode = LIO_DESTROY; /* ignore fsync */
head->lio_nent -= workerscnt - i;
head->lio_refcnt -= workerscnt - i;
sig_mutex_unlock(&head->lio_mutex);
errno = EAGAIN;
return (i);
}
next = next->work_forw;
}
return (i);
}
int
aio_fsync(int op, aiocb_t *aiocbp)
{
aio_lio_t *head;
struct stat statb;
int fret;
if (aiocbp == NULL)
return (0);
if (op != O_DSYNC && op != O_SYNC) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (fstat(aiocbp->aio_fildes, &statb) < 0)
return (-1);
if (_aio_sigev_thread(aiocbp) != 0)
return (-1);
/*
* Kernel aio_fsync() is not supported.
* We force user-level aio_fsync() just
* for the notification side-effect.
*/
if (!__uaio_ok && __uaio_init() == -1)
return (-1);
/*
* The first asynchronous I/O request in the current process will
* create a bunch of workers (via __uaio_init()). If the number
* of workers is zero then the number of pending asynchronous I/O
* requests is zero. In such a case only execute the standard
* fsync(3C) or fdatasync(3RT) as appropriate.
*/
if (__rw_workerscnt == 0) {
if (op == O_DSYNC)
return (__fdsync(aiocbp->aio_fildes, FDSYNC));
else
return (__fdsync(aiocbp->aio_fildes, FSYNC));
}
/*
* re-use aio_offset as the op field.
* O_DSYNC - fdatasync()
* O_SYNC - fsync()
*/
aiocbp->aio_offset = op;
aiocbp->aio_lio_opcode = AIOFSYNC;
/*
* Create a list of fsync requests. The worker that
* gets the last request will do the fsync request.
*/
head = _aio_lio_alloc();
if (head == NULL) {
errno = EAGAIN;
return (-1);
}
head->lio_mode = LIO_FSYNC;
head->lio_nent = head->lio_refcnt = __rw_workerscnt;
head->lio_largefile = 0;
/*
* Insert an fsync request on every worker's queue.
*/
fret = __aio_fsync_bar(aiocbp, head, __workers_rw, __rw_workerscnt);
if (fret != __rw_workerscnt) {
/*
* Fewer fsync requests than workers means that it was
* not possible to submit fsync requests to all workers.
* Actions:
* a) number of fsync requests submitted is 0:
* => free allocated memory (aio_lio_t).
* b) number of fsync requests submitted is > 0:
* => the last worker executing the fsync request
* will free the aio_lio_t struct.
*/
if (fret == 0)
_aio_lio_free(head);
return (-1);
}
return (0);
}
int
aio_cancel(int fd, aiocb_t *aiocbp)
{
aio_req_t *reqp;
aio_worker_t *aiowp;
int done = 0;
int canceled = 0;
struct stat buf;
if (fstat(fd, &buf) < 0)
return (-1);
if (aiocbp != NULL) {
if (fd != aiocbp->aio_fildes) {
errno = EINVAL;
return (-1);
}
if (aiocbp->aio_state == USERAIO) {
sig_mutex_lock(&__aio_mutex);
reqp = _aio_hash_find(&aiocbp->aio_resultp);
if (reqp == NULL) {
sig_mutex_unlock(&__aio_mutex);
return (AIO_ALLDONE);
}
aiowp = reqp->req_worker;
sig_mutex_lock(&aiowp->work_qlock1);
(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
sig_mutex_unlock(&aiowp->work_qlock1);
sig_mutex_unlock(&__aio_mutex);
if (done)
return (AIO_ALLDONE);
if (canceled)
return (AIO_CANCELED);
return (AIO_NOTCANCELED);
}
if (aiocbp->aio_state == USERAIO_DONE)
return (AIO_ALLDONE);
return ((int)_kaio(AIOCANCEL, fd, aiocbp));
}
return (aiocancel_all(fd));
}
/*
* __aio_waitn() cancellation handler.
*/
/* ARGSUSED */
static void
_aio_waitn_cleanup(void *arg)
{
ASSERT(MUTEX_HELD(&__aio_mutex));
/* check for pending aio_waitn() calls */
_aio_flags &= ~(AIO_LIB_WAITN | AIO_WAIT_INPROGRESS | AIO_IO_WAITING);
if (_aio_flags & AIO_LIB_WAITN_PENDING) {
_aio_flags &= ~AIO_LIB_WAITN_PENDING;
(void) cond_signal(&_aio_waitn_cv);
}
sig_mutex_unlock(&__aio_mutex);
}
/*
* aio_waitn can be used to reap the results of several I/O operations that
* were submitted asynchronously. The submission of I/Os can be done using
* existing POSIX interfaces: lio_listio, aio_write or aio_read.
* aio_waitn waits until "nwait" I/Os (supplied as a parameter) have
* completed and it returns the descriptors for these I/Os in "list". The
* maximum size of this list is given by "nent" and the actual number of I/Os
* completed is returned in "nwait". Otherwise aio_waitn might also
* return if the timeout expires. Additionally, aio_waitn returns 0 if
* successful or -1 if an error occurred.
*/
static int
__aio_waitn(void **list, uint_t nent, uint_t *nwait, const timespec_t *utimo)
{
int error = 0;
uint_t dnwait = 0; /* amount of requests in the waitn-done list */
uint_t kwaitcnt; /* expected "done" requests from kernel */
uint_t knentcnt; /* max. expected "done" requests from kernel */
int uerrno = 0;
int kerrno = 0; /* save errno from _kaio() call */
int timedwait = AIO_TIMEOUT_UNDEF;
aio_req_t *reqp;
timespec_t end;
timespec_t twait; /* copy of utimo for internal calculations */
timespec_t *wait = NULL;
if (nent == 0 || *nwait == 0 || *nwait > nent) {
errno = EINVAL;
return (-1);
}
/*
* Only one running aio_waitn call per process allowed.
* Further calls will be blocked here until the running
* call finishes.
*/
sig_mutex_lock(&__aio_mutex);
while (_aio_flags & AIO_LIB_WAITN) {
if (utimo && utimo->tv_sec == 0 && utimo->tv_nsec == 0) {
sig_mutex_unlock(&__aio_mutex);
*nwait = 0;
return (0);
}
_aio_flags |= AIO_LIB_WAITN_PENDING;
pthread_cleanup_push(sig_mutex_unlock, &__aio_mutex);
error = sig_cond_wait(&_aio_waitn_cv, &__aio_mutex);
pthread_cleanup_pop(0);
if (error != 0) {
sig_mutex_unlock(&__aio_mutex);
*nwait = 0;
errno = error;
return (-1);
}
}
pthread_cleanup_push(_aio_waitn_cleanup, NULL);
_aio_flags |= AIO_LIB_WAITN;
if (*nwait >= AIO_WAITN_MAXIOCBS) {
if (_aio_check_timeout(utimo, &end, &timedwait) != 0) {
error = -1;
dnwait = 0;
goto out;
}
if (timedwait != AIO_TIMEOUT_INDEF) {
twait = *utimo;
wait = &twait;
}
}
/*
* If both counters are still set to zero, then only
* kernel requests are currently outstanding (raw-I/Os).
*/
if ((_aio_doneq_cnt + _aio_outstand_cnt) == 0) {
for (;;) {
kwaitcnt = *nwait - dnwait;
knentcnt = nent - dnwait;
if (knentcnt > AIO_WAITN_MAXIOCBS)
knentcnt = AIO_WAITN_MAXIOCBS;
kwaitcnt = (kwaitcnt > knentcnt) ? knentcnt : kwaitcnt;
pthread_cleanup_push(sig_mutex_lock, &__aio_mutex);
sig_mutex_unlock(&__aio_mutex);
_cancel_prologue();
error = (int)_kaio(AIOWAITN, &list[dnwait], knentcnt,
&kwaitcnt, wait);
_cancel_epilogue();
pthread_cleanup_pop(1);
if (error == 0) {
dnwait += kwaitcnt;
if (dnwait >= *nwait ||
*nwait < AIO_WAITN_MAXIOCBS)
break;
if (timedwait == AIO_TIMEOUT_WAIT) {
error = _aio_get_timedelta(&end, wait);
if (error == -1) {
/* timer expired */
errno = ETIME;
break;
}
}
continue;
}
if (errno == EAGAIN) {
if (dnwait > 0)
error = 0;
break;
}
if (errno == ETIME || errno == EINTR) {
dnwait += kwaitcnt;
break;
}
/* fatal error */
break;
}
goto out;
}
/* File system I/Os outstanding ... */
if (timedwait == AIO_TIMEOUT_UNDEF) {
if (_aio_check_timeout(utimo, &end, &timedwait) != 0) {
error = -1;
dnwait = 0;
goto out;
}
if (timedwait != AIO_TIMEOUT_INDEF) {
twait = *utimo;
wait = &twait;
}
}
for (;;) {
uint_t sum_reqs;
/*
* Calculate sum of active non RAW-IO requests (sum_reqs).
* If the expected amount of completed requests (*nwait) is
* greater than the calculated sum (sum_reqs) then
* use _kaio to check pending RAW-IO requests.
*/
sum_reqs = _aio_doneq_cnt + dnwait + _aio_outstand_cnt;
kwaitcnt = (*nwait > sum_reqs) ? *nwait - sum_reqs : 0;
if (kwaitcnt != 0) {
/* possibly some kernel I/Os outstanding */
knentcnt = nent - dnwait;
if (knentcnt > AIO_WAITN_MAXIOCBS)
knentcnt = AIO_WAITN_MAXIOCBS;
kwaitcnt = (kwaitcnt > knentcnt) ? knentcnt : kwaitcnt;
_aio_flags |= AIO_WAIT_INPROGRESS;
pthread_cleanup_push(sig_mutex_lock, &__aio_mutex);
sig_mutex_unlock(&__aio_mutex);
_cancel_prologue();
error = (int)_kaio(AIOWAITN, &list[dnwait], knentcnt,
&kwaitcnt, wait);
_cancel_epilogue();
pthread_cleanup_pop(1);
_aio_flags &= ~AIO_WAIT_INPROGRESS;
if (error == 0) {
dnwait += kwaitcnt;
} else {
switch (errno) {
case EINVAL:
case EAGAIN:
/* don't wait for kernel I/Os */
kerrno = 0; /* ignore _kaio() errno */
*nwait = _aio_doneq_cnt +
_aio_outstand_cnt + dnwait;
error = 0;
break;
case EINTR:
case ETIME:
/* just scan for completed LIB I/Os */
dnwait += kwaitcnt;
timedwait = AIO_TIMEOUT_POLL;
kerrno = errno; /* save _kaio() errno */
error = 0;
break;
default:
kerrno = errno; /* save _kaio() errno */
break;
}
}
if (error)
break; /* fatal kernel error */
}
/* check completed FS requests in the "done" queue */
while (_aio_doneq_cnt && dnwait < nent) {
/* get done requests */
if ((reqp = _aio_req_remove(NULL)) != NULL) {
(void) _aio_hash_del(reqp->req_resultp);
list[dnwait++] = reqp->req_aiocbp;
_aio_req_mark_done(reqp);
_lio_remove(reqp);
_aio_req_free(reqp);
}
}
if (dnwait >= *nwait) {
/* min. requested amount of completed I/Os satisfied */
break;
}
if (timedwait == AIO_TIMEOUT_WAIT &&
(error = _aio_get_timedelta(&end, wait)) == -1) {
/* timer expired */
uerrno = ETIME;
break;
}
/*
* If some I/Os are outstanding and we have to wait for them,
* then sleep here. _aiodone() will call _aio_waitn_wakeup()
* to wakeup this thread as soon as the required amount of
* completed I/Os is done.
*/
if (_aio_outstand_cnt > 0 && timedwait != AIO_TIMEOUT_POLL) {
/*
* _aio_waitn_wakeup() will wake up this thread when:
* - _aio_waitncnt requests are completed or
* - _aio_outstand_cnt becomes zero.
* sig_cond_reltimedwait() could also return with
* a timeout error (ETIME).
*/
if (*nwait < _aio_outstand_cnt)
_aio_waitncnt = *nwait;
else
_aio_waitncnt = _aio_outstand_cnt;
_aio_flags |= AIO_IO_WAITING;
if (wait)
uerrno = sig_cond_reltimedwait(&_aio_iowait_cv,
&__aio_mutex, wait);
else
uerrno = sig_cond_wait(&_aio_iowait_cv,
&__aio_mutex);
_aio_flags &= ~AIO_IO_WAITING;
if (uerrno == ETIME) {
timedwait = AIO_TIMEOUT_POLL;
continue;
}
if (uerrno != 0)
timedwait = AIO_TIMEOUT_POLL;
}
if (timedwait == AIO_TIMEOUT_POLL) {
/* polling or timer expired */
break;
}
}
errno = uerrno == 0 ? kerrno : uerrno;
if (errno)
error = -1;
else
error = 0;
out:
*nwait = dnwait;
pthread_cleanup_pop(1); /* drops __aio_mutex */
return (error);
}
int
aio_waitn(aiocb_t *list[], uint_t nent, uint_t *nwait,
const timespec_t *timeout)
{
return (__aio_waitn((void **)list, nent, nwait, timeout));
}
void
_aio_waitn_wakeup(void)
{
/*
* __aio_waitn() sets AIO_IO_WAITING to notify _aiodone() that
* it is waiting for completed I/Os. The number of required
* completed I/Os is stored into "_aio_waitncnt".
* aio_waitn() is woken up when
* - there are no further outstanding I/Os
* (_aio_outstand_cnt == 0) or
* - the expected number of I/Os has completed.
* Only one __aio_waitn() function waits for completed I/Os at
* a time.
*
* __aio_suspend() increments "_aio_suscv_cnt" to notify
* _aiodone() that at least one __aio_suspend() call is
* waiting for completed I/Os.
* There could be more than one __aio_suspend() function
* waiting for completed I/Os. Because every function should
* be waiting for different I/Os, _aiodone() has to wake up all
* __aio_suspend() functions each time.
* Every __aio_suspend() function will compare the recently
* completed I/O with its own list.
*/
ASSERT(MUTEX_HELD(&__aio_mutex));
if (_aio_flags & AIO_IO_WAITING) {
if (_aio_waitncnt > 0)
_aio_waitncnt--;
if (_aio_outstand_cnt == 0 || _aio_waitncnt == 0 ||
_aio_suscv_cnt > 0)
(void) cond_broadcast(&_aio_iowait_cv);
} else {
/* Wake up waiting aio_suspend calls */
if (_aio_suscv_cnt > 0)
(void) cond_broadcast(&_aio_iowait_cv);
}
}
/*
* timedwait values :
* AIO_TIMEOUT_POLL : polling
* AIO_TIMEOUT_WAIT : timeout
* AIO_TIMEOUT_INDEF : wait indefinitely
*/
static int
_aio_check_timeout(const timespec_t *utimo, timespec_t *end, int *timedwait)
{
struct timeval curtime;
if (utimo) {
if (utimo->tv_sec < 0 || utimo->tv_nsec < 0 ||
utimo->tv_nsec >= NANOSEC) {
errno = EINVAL;
return (-1);
}
if (utimo->tv_sec > 0 || utimo->tv_nsec > 0) {
(void) gettimeofday(&curtime, NULL);
end->tv_sec = utimo->tv_sec + curtime.tv_sec;
end->tv_nsec = utimo->tv_nsec + 1000 * curtime.tv_usec;
if (end->tv_nsec >= NANOSEC) {
end->tv_nsec -= NANOSEC;
end->tv_sec += 1;
}
*timedwait = AIO_TIMEOUT_WAIT;
} else {
/* polling */
*timedwait = AIO_TIMEOUT_POLL;
}
} else {
*timedwait = AIO_TIMEOUT_INDEF; /* wait indefinitely */
}
return (0);
}
#if !defined(_LP64)
int
aio_read64(aiocb64_t *aiocbp)
{
if (aiocbp == NULL || aiocbp->aio_reqprio != 0) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread64(aiocbp) != 0)
return (-1);
aiocbp->aio_lio_opcode = LIO_READ;
return (_aio_rw64(aiocbp, NULL, &__nextworker_rw, AIOAREAD64,
(AIO_KAIO | AIO_NO_DUPS)));
}
int
aio_write64(aiocb64_t *aiocbp)
{
if (aiocbp == NULL || aiocbp->aio_reqprio != 0) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread64(aiocbp) != 0)
return (-1);
aiocbp->aio_lio_opcode = LIO_WRITE;
return (_aio_rw64(aiocbp, NULL, &__nextworker_rw, AIOAWRITE64,
(AIO_KAIO | AIO_NO_DUPS)));
}
int
lio_listio64(int mode, aiocb64_t *_RESTRICT_KYWD const *_RESTRICT_KYWD list,
int nent, struct sigevent *_RESTRICT_KYWD sigevp)
{
int aio_ufs = 0;
int oerrno = 0;
aio_lio_t *head = NULL;
aiocb64_t *aiocbp;
int state = 0;
int EIOflg = 0;
int rw;
int do_kaio = 0;
int error;
int i;
if (!_kaio_ok)
_kaio_init();
if (aio_list_max == 0)
aio_list_max = sysconf(_SC_AIO_LISTIO_MAX);
if (nent <= 0 || nent > aio_list_max) {
errno = EINVAL;
return (-1);
}
switch (mode) {
case LIO_WAIT:
state = NOCHECK;
break;
case LIO_NOWAIT:
state = CHECK;
break;
default:
errno = EINVAL;
return (-1);
}
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) == NULL)
continue;
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (_aio_sigev_thread64(aiocbp) != 0)
return (-1);
if (aiocbp->aio_lio_opcode == LIO_NOP)
aiocbp->aio_state = NOCHECK;
else {
aiocbp->aio_state = state;
if (KAIO_SUPPORTED(aiocbp->aio_fildes))
do_kaio++;
else
aiocbp->aio_resultp.aio_errno = ENOTSUP;
}
}
if (_aio_sigev_thread_init(sigevp) != 0)
return (-1);
if (do_kaio) {
error = (int)_kaio(AIOLIO64, mode, list, nent, sigevp);
if (error == 0)
return (0);
oerrno = errno;
} else {
oerrno = errno = ENOTSUP;
error = -1;
}
if (error == -1 && errno == ENOTSUP) {
error = errno = 0;
/*
* If LIO_WAIT, or notification required, allocate a list head.
*/
if (mode == LIO_WAIT ||
(sigevp != NULL &&
(sigevp->sigev_notify == SIGEV_SIGNAL ||
sigevp->sigev_notify == SIGEV_THREAD ||
sigevp->sigev_notify == SIGEV_PORT)))
head = _aio_lio_alloc();
if (head) {
sig_mutex_lock(&head->lio_mutex);
head->lio_mode = mode;
head->lio_largefile = 1;
if (mode == LIO_NOWAIT && sigevp != NULL) {
if (sigevp->sigev_notify == SIGEV_THREAD) {
head->lio_port = sigevp->sigev_signo;
head->lio_event = AIOLIO64;
head->lio_sigevent = sigevp;
head->lio_sigval.sival_ptr =
sigevp->sigev_value.sival_ptr;
} else if (sigevp->sigev_notify == SIGEV_PORT) {
port_notify_t *pn =
sigevp->sigev_value.sival_ptr;
head->lio_port = pn->portnfy_port;
head->lio_event = AIOLIO64;
head->lio_sigevent = sigevp;
head->lio_sigval.sival_ptr =
pn->portnfy_user;
} else { /* SIGEV_SIGNAL */
head->lio_signo = sigevp->sigev_signo;
head->lio_sigval.sival_ptr =
sigevp->sigev_value.sival_ptr;
}
}
head->lio_nent = head->lio_refcnt = nent;
sig_mutex_unlock(&head->lio_mutex);
}
/*
* find UFS requests, errno == ENOTSUP/EBADFD,
*/
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) == NULL ||
aiocbp->aio_lio_opcode == LIO_NOP ||
(aiocbp->aio_resultp.aio_errno != ENOTSUP &&
aiocbp->aio_resultp.aio_errno != EBADFD)) {
if (head)
_lio_list_decr(head);
continue;
}
if (aiocbp->aio_resultp.aio_errno == EBADFD)
SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
if (aiocbp->aio_reqprio != 0) {
aiocbp->aio_resultp.aio_errno = EINVAL;
aiocbp->aio_resultp.aio_return = -1;
EIOflg = 1;
if (head)
_lio_list_decr(head);
continue;
}
/*
* submit an AIO request with flags AIO_NO_KAIO
* to avoid the kaio() syscall in _aio_rw()
*/
switch (aiocbp->aio_lio_opcode) {
case LIO_READ:
rw = AIOAREAD64;
break;
case LIO_WRITE:
rw = AIOAWRITE64;
break;
}
error = _aio_rw64(aiocbp, head, &__nextworker_rw, rw,
(AIO_NO_KAIO | AIO_NO_DUPS));
if (error == 0)
aio_ufs++;
else {
if (head)
_lio_list_decr(head);
aiocbp->aio_resultp.aio_errno = error;
EIOflg = 1;
}
}
}
if (EIOflg) {
errno = EIO;
return (-1);
}
if (mode == LIO_WAIT && oerrno == ENOTSUP) {
/*
* call kaio(AIOLIOWAIT) to get all outstanding
* kernel AIO requests
*/
if ((nent - aio_ufs) > 0)
(void) _kaio(AIOLIOWAIT, mode, list, nent, sigevp);
if (head != NULL && head->lio_nent > 0) {
sig_mutex_lock(&head->lio_mutex);
while (head->lio_refcnt > 0) {
int err;
head->lio_waiting = 1;
pthread_cleanup_push(_lio_listio_cleanup, head);
err = sig_cond_wait(&head->lio_cond_cv,
&head->lio_mutex);
pthread_cleanup_pop(0);
head->lio_waiting = 0;
if (err && head->lio_nent > 0) {
sig_mutex_unlock(&head->lio_mutex);
errno = err;
return (-1);
}
}
sig_mutex_unlock(&head->lio_mutex);
ASSERT(head->lio_nent == 0 && head->lio_refcnt == 0);
_aio_lio_free(head);
for (i = 0; i < nent; i++) {
if ((aiocbp = list[i]) != NULL &&
aiocbp->aio_resultp.aio_errno) {
errno = EIO;
return (-1);
}
}
}
return (0);
}
return (error);
}
int
aio_suspend64(const aiocb64_t * const list[], int nent,
const timespec_t *timeout)
{
return (__aio_suspend((void **)list, nent, timeout, 1));
}
int
aio_error64(const aiocb64_t *aiocbp)
{
const aio_result_t *resultp = &aiocbp->aio_resultp;
int error;
if ((error = resultp->aio_errno) == EINPROGRESS) {
if (aiocbp->aio_state == CHECK) {
/*
* Always do the kaio() call without using the
* KAIO_SUPPORTED() checks because it is not
* mandatory to have a valid fd set in the
* aiocb, only the resultp must be set.
*/
if ((int)_kaio(AIOERROR64, aiocbp) == EINVAL) {
errno = EINVAL;
return (-1);
}
error = resultp->aio_errno;
} else if (aiocbp->aio_state == CHECKED) {
((aiocb64_t *)aiocbp)->aio_state = CHECK;
}
}
return (error);
}
ssize_t
aio_return64(aiocb64_t *aiocbp)
{
aio_result_t *resultp = &aiocbp->aio_resultp;
aio_req_t *reqp;
int error;
ssize_t retval;
/*
* The _aiodone() function stores resultp->aio_return before
* storing resultp->aio_errno (with an membar_producer() in
* between). We use membar_consumer() below to ensure proper
* memory ordering between _aiodone() and ourself.
*/
error = resultp->aio_errno;
membar_consumer();
retval = resultp->aio_return;
/*
* we use this condition to indicate either that
* aio_return() has been called before or should
* not have been called yet.
*/
if ((retval == -1 && error == EINVAL) || error == EINPROGRESS) {
errno = error;
return (-1);
}
/*
* Before we return, mark the result as being returned so that later
* calls to aio_return() will return the fact that the result has
* already been returned.
*/
sig_mutex_lock(&__aio_mutex);
/* retest, in case more than one thread actually got in here */
if (resultp->aio_return == -1 && resultp->aio_errno == EINVAL) {
sig_mutex_unlock(&__aio_mutex);
errno = EINVAL;
return (-1);
}
resultp->aio_return = -1;
resultp->aio_errno = EINVAL;
if ((reqp = _aio_hash_del(resultp)) == NULL)
sig_mutex_unlock(&__aio_mutex);
else {
aiocbp->aio_state = NOCHECK;
ASSERT(reqp->req_head == NULL);
(void) _aio_req_remove(reqp);
sig_mutex_unlock(&__aio_mutex);
_aio_req_free(reqp);
}
if (retval == -1)
errno = error;
return (retval);
}
static int
__aio_fsync_bar64(aiocb64_t *aiocbp, aio_lio_t *head, aio_worker_t *aiowp,
int workerscnt)
{
int i;
int error;
aio_worker_t *next = aiowp;
for (i = 0; i < workerscnt; i++) {
error = _aio_rw64(aiocbp, head, &next, AIOFSYNC, AIO_NO_KAIO);
if (error != 0) {
sig_mutex_lock(&head->lio_mutex);
head->lio_mode = LIO_DESTROY; /* ignore fsync */
head->lio_nent -= workerscnt - i;
head->lio_refcnt -= workerscnt - i;
sig_mutex_unlock(&head->lio_mutex);
errno = EAGAIN;
return (i);
}
next = next->work_forw;
}
return (i);
}
int
aio_fsync64(int op, aiocb64_t *aiocbp)
{
aio_lio_t *head;
struct stat statb;
int fret;
if (aiocbp == NULL)
return (0);
if (op != O_DSYNC && op != O_SYNC) {
errno = EINVAL;
return (-1);
}
if (_aio_hash_find(&aiocbp->aio_resultp) != NULL) {
errno = EBUSY;
return (-1);
}
if (fstat(aiocbp->aio_fildes, &statb) < 0)
return (-1);
if (_aio_sigev_thread64(aiocbp) != 0)
return (-1);
/*
* Kernel aio_fsync() is not supported.
* We force user-level aio_fsync() just
* for the notification side-effect.
*/
if (!__uaio_ok && __uaio_init() == -1)
return (-1);
/*
* The first asynchronous I/O request in the current process will
* create a bunch of workers (via __uaio_init()). If the number
* of workers is zero then the number of pending asynchronous I/O
* requests is zero. In such a case only execute the standard
* fsync(3C) or fdatasync(3RT) as appropriate.
*/
if (__rw_workerscnt == 0) {
if (op == O_DSYNC)
return (__fdsync(aiocbp->aio_fildes, FDSYNC));
else
return (__fdsync(aiocbp->aio_fildes, FSYNC));
}
/*
* re-use aio_offset as the op field.
* O_DSYNC - fdatasync()
* O_SYNC - fsync()
*/
aiocbp->aio_offset = op;
aiocbp->aio_lio_opcode = AIOFSYNC;
/*
* Create a list of fsync requests. The worker that
* gets the last request will do the fsync request.
*/
head = _aio_lio_alloc();
if (head == NULL) {
errno = EAGAIN;
return (-1);
}
head->lio_mode = LIO_FSYNC;
head->lio_nent = head->lio_refcnt = __rw_workerscnt;
head->lio_largefile = 1;
/*
* Insert an fsync request on every worker's queue.
*/
fret = __aio_fsync_bar64(aiocbp, head, __workers_rw, __rw_workerscnt);
if (fret != __rw_workerscnt) {
/*
* Fewer fsync requests than workers means that it was
* not possible to submit fsync requests to all workers.
* Actions:
* a) number of fsync requests submitted is 0:
* => free allocated memory (aio_lio_t).
* b) number of fsync requests submitted is > 0:
* => the last worker executing the fsync request
* will free the aio_lio_t struct.
*/
if (fret == 0)
_aio_lio_free(head);
return (-1);
}
return (0);
}
int
aio_cancel64(int fd, aiocb64_t *aiocbp)
{
aio_req_t *reqp;
aio_worker_t *aiowp;
int done = 0;
int canceled = 0;
struct stat buf;
if (fstat(fd, &buf) < 0)
return (-1);
if (aiocbp != NULL) {
if (fd != aiocbp->aio_fildes) {
errno = EINVAL;
return (-1);
}
if (aiocbp->aio_state == USERAIO) {
sig_mutex_lock(&__aio_mutex);
reqp = _aio_hash_find(&aiocbp->aio_resultp);
if (reqp == NULL) {
sig_mutex_unlock(&__aio_mutex);
return (AIO_ALLDONE);
}
aiowp = reqp->req_worker;
sig_mutex_lock(&aiowp->work_qlock1);
(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
sig_mutex_unlock(&aiowp->work_qlock1);
sig_mutex_unlock(&__aio_mutex);
if (done)
return (AIO_ALLDONE);
if (canceled)
return (AIO_CANCELED);
return (AIO_NOTCANCELED);
}
if (aiocbp->aio_state == USERAIO_DONE)
return (AIO_ALLDONE);
return ((int)_kaio(AIOCANCEL, fd, aiocbp));
}
return (aiocancel_all(fd));
}
int
aio_waitn64(aiocb64_t *list[], uint_t nent, uint_t *nwait,
const timespec_t *timeout)
{
return (__aio_waitn((void **)list, nent, nwait, timeout));
}
#endif /* !defined(_LP64) */