/* $Id$ */
/** @file
* IPRT - Async I/O manager.
*/
/*
* Copyright (C) 2013 Oracle Corporation
*
* This file is part of VirtualBox Open Source Edition (OSE), as
* available from http://www.virtualbox.org. This file is free software;
* General Public License (GPL) as published by the Free Software
* Foundation, in version 2 as it comes in the "COPYING" file of the
* VirtualBox OSE distribution. VirtualBox OSE is distributed in the
* hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
*
* The contents of this file may alternatively be used under the terms
* of the Common Development and Distribution License Version 1.0
* (CDDL) only, as it comes in the "COPYING.CDDL" file of the
* VirtualBox OSE distribution, in which case the provisions of the
* CDDL are applicable instead of those of the GPL.
*
* You may elect to license modified versions of this file under the
* terms and conditions of either the GPL or the CDDL or both.
*/
/*******************************************************************************
* Header Files *
*******************************************************************************/
#include <iprt/critsect.h>
#include <iprt/memcache.h>
#include <iprt/semaphore.h>
#include <iprt/queueatomic.h>
/*******************************************************************************
* Structures and Typedefs *
*******************************************************************************/
/** Pointer to an internal async I/O file instance. */
/**
* Blocking event types.
*/
typedef enum RTAIOMGREVENT
{
/** Invalid tye */
/** No event pending. */
/** A file is added to the manager. */
/** A file is about to be closed. */
/** The async I/O manager is shut down. */
/** 32bit hack */
/**
* Async I/O manager instance data.
*/
typedef struct RTAIOMGRINT
{
/** Magic value. */
/** Reference count. */
/** Async I/O context handle. */
/** async I/O thread. */
/** List of files assigned to this manager. */
/** Number of requests active currently. */
unsigned cReqsActive;
/** Number of maximum requests active. */
/** Memory cache for requests. */
/** Critical section protecting the blocking event handling. */
/** Event semaphore for blocking external events.
* The caller waits on it until the async I/O manager
* finished processing the event. */
/** Blocking event type */
/** Event type data */
union
{
/** The file to be added */
/** The file to be closed */
} RTAIOMGRINT;
/** Pointer to an internal async I/O manager instance. */
/**
* Async I/O manager file instance data.
*/
typedef struct RTAIOMGRFILEINT
{
/** Magic value. */
/** Reference count. */
/** Flags. */
/** Opaque user data passed on creation. */
void *pvUser;
/** File handle. */
/** async I/O manager this file belongs to. */
/** Work queue for new requests. */
/** Completion callback for this file. */
/** Data for exclusive use by the assigned async I/O manager. */
struct
{
/** List node of assigned files for a async I/O manager. */
/** List of requests waiting for submission. */
/** Number of requests currently being processed for this endpoint
* (excluded flush requests). */
unsigned cReqsActive;
} AioMgr;
/** Flag whether the file is closed. */
/**
* Request type.
*/
typedef enum RTAIOMGRREQTYPE
{
/** Invalid request type. */
/** Read reques type. */
/** Write request. */
/** Flush request. */
/** Prefetech request. */
/** 32bit hack. */
/** Pointer to a reques type. */
/**
* Async I/O manager request.
*/
typedef struct RTAIOMGRREQ
{
/** Atomic queue work item. */
/** Node for a waiting list. */
/** Request flags. */
/** Transfer type. */
/** Assigned file request. */
/** File the request belongs to. */
/** Opaque user data. */
void *pvUser;
/** Start offset */
/** Data segment. */
/** When non-zero the segment uses a bounce buffer because the provided buffer
* doesn't meet host requirements. */
/** Pointer to the used bounce buffer if any. */
void *pvBounceBuffer;
/** Start offset in the bounce buffer to copy from. */
} RTAIOMGRREQ;
/** Pointer to a I/O manager request. */
/** Flag whether the request was prepared already. */
/*******************************************************************************
* Defined Constants And Macros *
*******************************************************************************/
/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
do { \
} while (0)
/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
/** Validates a handle and returns (void) if not valid. */
do { \
} while (0)
/*******************************************************************************
* Internal Functions *
*******************************************************************************/
/**
* Removes an endpoint from the currently assigned manager.
*
* @returns TRUE if there are still requests pending on the current manager for this endpoint.
* FALSE otherwise.
* @param pEndpointRemove The endpoint to remove.
*/
{
/* Make sure that there is no request pending on this manager for the endpoint. */
{
return false;
}
return true;
}
/**
* Allocate a new I/O request.
*
* @returns Pointer to the allocated request or NULL if out of memory.
* @param pThis The async I/O manager instance.
*/
{
}
/**
* Frees an I/O request.
*
* @returns nothing.
* @param pThis The async I/O manager instance.
* @param pReq The request to free.
*/
{
if (pReq->cbBounceBuffer)
{
pReq->cbBounceBuffer = 0;
}
}
{
pThis->cReqsActive--;
/*
* It is possible that the request failed on Linux with kernels < 2.6.23
* if the passed buffer was allocated with remap_pfn_range or if the file
* is on an NFS endpoint which does not support async and direct I/O at the same time.
* The endpoint will be migrated to a failsafe manager in case a request fails.
*/
if (RT_FAILURE(rcReq))
{
}
else
{
/*
* Restart an incomplete transfer.
* This usually means that the request will return an error now
* but to get the cause of the error (disk full, file too big, I/O error, ...)
* the transfer needs to be continued.
*/
|| ( pReq->cbBounceBuffer
{
if (pReq->cbBounceBuffer)
{
}
else
{
}
{
}
else
{
("Invalid transfer type\n"));
}
("Unexpected return code rc=%Rrc\n", rc));
}
{
/* Write it now. */
("Unexpected return code rc=%Rrc\n", rc));
}
else
{
{
}
/* Call completion callback */
}
} /* request completed successfully */
}
/**
* Wrapper around rtAioMgrReqCompleteRc().
*/
{
}
/**
* Wrapper around RTFIleAioCtxSubmit() which is also doing error handling.
*/
{
if (RT_FAILURE(rc))
{
{
/* Append any not submitted task to the waiting list. */
{
if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
{
pThis->cReqsActive--;
}
}
rc = VINF_SUCCESS;
}
else /* Another kind of error happened (full disk, ...) */
{
/* An error happened. Find out which one caused the error and resubmit all other tasks. */
{
if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED)
{
/* We call ourself again to do any error handling which might come up now. */
}
else if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
}
}
}
return VINF_SUCCESS;
}
/**
* Adds a list of requests to the waiting list.
*
* @returns nothing.
* @param pFile The file instance to add the requests to.
* @param pReqsHead The head of the request list to add.
*/
{
while (pReqsHead)
{
}
}
/**
* Prepare the native I/o request ensuring that all alignment prerequisites of
* the host are met.
*
* @returns IPRT statuse code.
* @param pFile The file instance data.
* @param pReq The request to prepare.
*/
{
/*
* Check if the alignment requirements are met.
* Offset, transfer size and buffer address
* need to be on a 512 boundary.
*/
if ( !fAlignedReq
/** @todo: || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */)
{
/* Create bounce buffer. */
/** @todo: I think we need something like a RTMemAllocAligned method here.
* Current assumption is that the maximum alignment is 4096byte
* (GPT disk on Windows)
* so we can use RTMemPageAlloc here.
*/
{
{
{
/* We have to fill the buffer first before we can update the data. */
}
else
}
}
else
rc = VERR_NO_MEMORY;
}
else
pReq->cbBounceBuffer = 0;
if (RT_SUCCESS(rc))
{
{
}
else /* Read or prefetch request. */
}
return rc;
}
/**
* Prepare a new request for enqueuing.
*
* @returns IPRT status code.
* @param pReq The request to prepare.
* @param phReqIo Where to store the handle to the native I/O request on success.
*/
{
{
case RTAIOMGRREQTYPE_FLUSH:
{
break;
}
case RTAIOMGRREQTYPE_READ:
case RTAIOMGRREQTYPE_WRITE:
{
break;
}
default:
} /* switch transfer type */
if (RT_SUCCESS(rc))
return rc;
}
/**
* Prepare newly submitted requests for processing.
*
* @returns IPRT status code
* @param pThis The async I/O manager instance data.
* @param pFile The file instance.
* @param pReqsNew The list of new requests to prepare.
*/
{
unsigned cRequests = 0;
/* Go through the list and queue the requests. */
while ( pReqsNew
&& RT_SUCCESS(rc))
{
("Files do not match\n"));
("Request on the new list is already prepared\n"));
if (RT_FAILURE(rc))
else
cRequests++;
/* Queue the requests if the array is full. */
{
cRequests = 0;
("Unexpected return code\n"));
}
}
if (cRequests)
{
("Unexpected return code rc=%Rrc\n", rc));
}
if (pReqsNew)
{
/* Add the rest of the tasks to the pending list */
}
/* Insufficient resources are not fatal. */
rc = VINF_SUCCESS;
return rc;
}
/**
* Queues waiting requests.
*
* @returns IPRT status code.
* @param pThis The async I/O manager instance data.
* @param pFile The file to get the requests from.
*/
{
unsigned cRequests = 0;
/* Go through the list and queue the requests. */
{
("Files do not match\n"));
{
if (RT_FAILURE(rc))
else
cRequests++;
}
else
{
cRequests++;
}
/* Queue the requests if the array is full. */
{
cRequests = 0;
("Unexpected return code\n"));
}
}
if (cRequests)
{
("Unexpected return code rc=%Rrc\n", rc));
}
/* Insufficient resources are not fatal. */
rc = VINF_SUCCESS;
return rc;
}
/**
* Adds all pending requests for the given file.
*
* @returns IPRT status code.
* @param pThis The async I/O manager instance data.
* @param pFile The file to get the requests from.
*/
{
/* Check the pending list first */
if ( RT_SUCCESS(rc)
{
if (pReqsNew)
{
}
}
return rc;
}
/**
* Checks all files for new requests.
*
* @returns IPRT status code.
* @param pThis The I/O manager instance data.
*/
{
{
if (RT_FAILURE(rc))
return rc;
}
return rc;
}
/**
* Process a blocking event from the outside.
*
* @returns IPRT status code.
* @param pThis The async I/O manager instance data.
*/
{
bool fNotifyWaiter = false;
switch (pThis->enmBlockingEvent)
{
case RTAIOMGREVENT_NO_EVENT:
/* Nothing to do. */
break;
case RTAIOMGREVENT_FILE_ADD:
{
fNotifyWaiter = true;
break;
}
case RTAIOMGREVENT_FILE_CLOSE:
{
{
/* Make sure all requests finished. Process the queues a last time first. */
}
fNotifyWaiter = true;
break;
}
case RTAIOMGREVENT_SHUTDOWN:
{
if (!pThis->cReqsActive)
fNotifyWaiter = true;
break;
}
default:
}
if (fNotifyWaiter)
{
/* Release the waiting thread. */
}
return rc;
}
/**
* async I/O manager worker loop.
*
* @returns IPRT status code.
* @param hThreadSelf The thread handle this worker belongs to.
* @param pvUser Opaque user data (Pointer to async I/O manager instance).
*/
{
bool fRunning = true;
do
{
if (rc == VERR_INTERRUPTED)
{
/* Process external event. */
}
else if (RT_FAILURE(rc))
{
/* Something bad happened. */
/** @todo: */
}
else
{
/* Requests completed. */
for (uint32_t i = 0; i < cReqsCompleted; i++)
/* Check files for new requests and queue waiting requests. */
}
} while ( fRunning
&& RT_SUCCESS(rc));
return rc;
}
/**
* Wakes up the async I/O manager.
*
* @returns IPRT status code.
* @param pThis The async I/O manager.
*/
{
}
/**
* Waits until the async I/O manager handled the given event.
*
* @returns IPRT status code.
* @param pThis The async I/O manager.
* @param enmEvent The event to pass to the manager.
*/
{
/* Wakeup the async I/O manager */
if (RT_FAILURE(rc))
return rc;
/* Wait for completion. */
return rc;
}
/**
* Add a given file to the given I/O manager.
*
* @returns IPRT status code.
* @param pThis The async I/O manager.
* @param pFile The file to add.
*/
{
/* Update the assigned I/O manager. */
return rc;
}
/**
* Removes a given file from the given I/O manager.
*
* @returns IPRT status code.
* @param pThis The async I/O manager.
* @param pFile The file to remove.
*/
{
return rc;
}
/**
* Process a shutdown event.
*
* @returns IPRT status code.
* @param pThis The async I/O manager to shut down.
*/
{
return rc;
}
/**
* Destroys an async I/O manager.
*
* @returns nothing.
* @param pThis The async I/O manager instance to destroy.
*/
{
int rc;
}
/**
* Queues a new request for processing.
*/
{
}
/**
* Destroys an async I/O manager file.
*
* @returns nothing.
* @param pThis The async I/O manager file.
*/
{
}
/**
* Queues a new I/O request.
*
* @returns IPRT status code.
* @param hAioMgrFile The I/O manager file handle.
* @param off Start offset of the I/o request.
* @param pSgBuf Data S/G buffer.
* @param cbIo How much to transfer.
* @param pvUser Opaque user data.
*/
{
int rc;
{
{
}
else
{
/** @todo: Real S/G buffer support. */
}
}
else
rc = VERR_NO_MEMORY;
return rc;
}
/**
* Request constructor for the memory cache.
*
* @returns IPRT status code.
* @param hMemCache The cache handle.
* @param pvObj The memory object that should be initialized.
* @param pvUser The user argument.
*/
{
}
/**
* Request destructor for the memory cache.
*
* @param hMemCache The cache handle.
* @param pvObj The memory object that should be destroyed.
* @param pvUser The user argument.
*/
{
}
{
if (pThis)
{
if (RT_SUCCESS(rc))
{
if (RT_SUCCESS(rc))
{
if (RT_SUCCESS(rc))
{
: cReqsMax,
if (RT_SUCCESS(rc))
{
if (RT_FAILURE(rc))
{
}
}
if (RT_FAILURE(rc))
}
if (RT_FAILURE(rc))
}
if (RT_FAILURE(rc))
}
if (RT_FAILURE(rc))
}
else
rc = VERR_NO_MEMORY;
if (RT_SUCCESS(rc))
return rc;
}
{
return cRefs;
}
{
if (pThis == NIL_RTAIOMGR)
return 0;
if (cRefs == 0)
return cRefs;
}
RTDECL(int) RTAioMgrFileCreate(RTAIOMGR hAioMgr, RTFILE hFile, PFNRTAIOMGRREQCOMPLETE pfnReqComplete,
{
int rc = VINF_SUCCESS;
if (pThis)
{
if (RT_FAILURE(rc))
else
}
else
rc = VERR_NO_MEMORY;
if (RT_SUCCESS(rc))
*phAioMgrFile = pThis;
return rc;
}
{
return cRefs;
}
{
if (pThis == NIL_RTAIOMGRFILE)
return 0;
if (cRefs == 0)
return cRefs;
}
{
}
{
}
{
}
{
if (RT_UNLIKELY(!pReq))
return VERR_NO_MEMORY;
return VERR_FILE_AIO_IN_PROGRESS;
}