reqpool.cpp revision 4c464a0800535c29782d7ab5bcd7820bc7115cd7
/* $Id$ */
/** @file
* IPRT - Request Pool.
*/
/*
* Copyright (C) 2006-2011 Oracle Corporation
*
* This file is part of VirtualBox Open Source Edition (OSE), as
* available from http://www.virtualbox.org. This file is free software;
* General Public License (GPL) as published by the Free Software
* Foundation, in version 2 as it comes in the "COPYING" file of the
* VirtualBox OSE distribution. VirtualBox OSE is distributed in the
* hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
*
* The contents of this file may alternatively be used under the terms
* of the Common Development and Distribution License Version 1.0
* (CDDL) only, as it comes in the "COPYING.CDDL" file of the
* VirtualBox OSE distribution, in which case the provisions of the
* CDDL are applicable instead of those of the GPL.
*
* You may elect to license modified versions of this file under the
* terms and conditions of either the GPL or the CDDL or both.
*/
/*******************************************************************************
* Header Files *
*******************************************************************************/
#include <iprt/critsect.h>
#include <iprt/semaphore.h>
/*******************************************************************************
* Structures and Typedefs *
*******************************************************************************/
typedef struct RTREQPOOLTHREAD
{
/** Node in the RTREQPOOLINT::IdleThreads list. */
/** Node in the RTREQPOOLINT::WorkerThreads list. */
/** The submit timestamp of the pending request. */
/** The submit timestamp of the request processing. */
/** When this CPU went idle the last time. */
/** The number of requests processed by this thread. */
/** Total time the requests processed by this thread took to process. */
/** Total time the requests processed by this thread had to wait in
* the queue before being scheduled. */
/** The CPU this was scheduled last time we checked. */
/** The submitter will put an incoming request here when scheduling an idle
* thread. */
/** The request the thread is currently processing. */
PRTREQINT volatile pPendingReq;
/** The thread handle. */
/** Nano seconds timestamp representing the birth time of the thread. */
/** Pointer to the request thread pool instance the thread is associated
* with. */
struct RTREQPOOLINT *pPool;
/** Pointer to a worker thread. */
typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
/**
* Request thread pool instance data.
*/
typedef struct RTREQPOOLINT
{
/** Magic value (RTREQPOOL_MAGIC). */
/** @name Config
* @{ */
/** The worker thread type. */
/** The maximum number of worker threads. */
/** The number of threads which should be spawned before throttling kicks
* in. */
/** The minimum number of worker threads. */
/** The number of milliseconds a thread needs to be idle before it is
* considered for retirement. */
/** cMsMinIdle in nano seconds. */
/** The idle thread sleep interval in milliseconds. */
/** The max number of milliseconds to push back a submitter before creating
* a new worker thread once the threshold has been reached. */
/** The minimum number of milliseconds to push back a submitter before
* creating a new worker thread once the threshold has been reached. */
/** The max number of free requests in the recycle LIFO. */
/** @} */
/** Signaled by terminating worker threads. */
/** Destruction indicator. The worker threads checks in their loop. */
bool volatile fDestructing;
/** The current submitter push back in milliseconds.
* This is recalculated when worker threads come and go. */
/** The current number of worker threads. */
/** Statistics: The total number of threads created. */
/** Statistics: The timestamp when the last thread was created. */
/** Linked list of worker threads. */
/** Reference counter. */
/** The number of idle thread or threads in the process of becoming
* idle. This is increased before the to-be-idle thread tries to enter
* the critical section and add itself to the list. */
uint32_t volatile cIdleThreads;
/** Linked list of idle threads. */
/** Head of the request FIFO. */
/** Where to insert the next request. */
/** Head of the request recycling LIFO. */
/** The number of requests in the recycling LIFO. This is read without
* entering the critical section, thus volatile. */
uint32_t volatile cCurFreeRequests;
/** Critical section serializing access to members of this structure. */
} RTREQPOOLINT;
/**
* Used by exiting thread and the pool destruction code to cancel unexpected
* requests.
*
* @param pReq The request.
*/
{
}
/**
* Recalculate the max pushback interval when adding or removing worker threads.
*
* @param pPool The pool. cMsCurPushBack will be changed.
*/
{
else
}
/**
* Performs thread exit.
*
* @returns Thread termination status code (VINF_SUCCESS).
* @param pPool The pool.
* @param pThread The thread.
* @param fLocked Whether we are inside the critical section
* already.
*/
{
if (!fLocked)
/* Get out of the idle list. */
{
}
/* Get out of the thread list. */
pPool->cCurThreads--;
/* This shouldn't happen... */
if (pReq)
{
AssertFailed();
}
/* If we're the last thread terminating, ping the destruction thread before
we leave the critical section. */
return VINF_SUCCESS;
}
{
/*
* Update thread state.
*/
/*
* Do the actual processing.
*/
/** @todo */
/*
* Update thread statistics and state.
*/
pThread->cReqProcessed++;
}
/**
* The Worker Thread Procedure.
*
* @returns VINF_SUCCESS.
* @param hThreadSelf The thread handle (unused).
* @param pvArg Pointer to the thread data.
*/
{
/*
* The work loop.
*/
while (!pPool->fDestructing)
{
/*
* Process pending work.
*/
/* Check if anything is scheduled directly to us. */
if (pReq)
{
continue;
}
/* Recheck the todo request pointer after entering the critsect. */
if (pReq)
{
continue;
}
/* Any pending requests in the queue? */
if (pReq)
{
/* Un-idle ourselves and process the request. */
{
}
continue;
}
/*
* Nothing to do, go idle.
*/
{
}
{
}
else
}
}
/**
* Create a new worker thread.
*
* @param pPool The pool needing new worker thread.
* @remarks Caller owns the critical section
*/
{
if (!pThread)
return;
pPool->cCurThreads++;
pPool->cThreadsCreated++;
static uint32_t s_idThread = 0;
if (RT_SUCCESS(rc))
else
{
pPool->cCurThreads--;
}
}
/**
* Repel the submitter, giving the worker threads a chance to process the
* incoming request.
*
* @returns Success if a worker picked up the request, failure if not. The
* critical section has been left on success, while we'll be inside it
* on failure.
* @param pPool The pool.
* @param pReq The incoming request.
*/
{
/*
* Lazily create the push back semaphore that we'll be blociing on.
*/
int rc;
if (hEvt == NIL_RTSEMEVENTMULTI)
{
if (RT_FAILURE(rc))
return rc;
}
/*
* Prepare the request and semaphore.
*/
pReq->fSignalPushBack = true;
/*
* Block.
*/
if (RT_FAILURE(rc))
{
}
return rc;
}
{
/*
* Try schedule the request to a thread that's currently idle.
*/
if (pThread)
{
/** @todo CPU affinity... */
return;
}
/*
* Put the request in the pending queue.
*/
/*
* If there is an incoming worker thread already or we've reached the
* maximum number of worker threads, we're done.
*/
if ( pPool->cIdleThreads > 0
{
return;
}
/*
* Push back before creating a new worker thread.
*/
{
if (RT_SUCCESS(rc))
return;
}
/*
* Create a new thread for processing the request.
* For simplicity, we don't bother leaving the critical section while doing so.
*/
return;
}
/**
* Frees a requst.
*
* @returns true if recycled, false if not.
* @param pPool The request thread pool.
* @param pReq The request.
*/
{
if ( pPool
{
{
return true;
}
}
return false;
}
typedef enum RTREQPOOLCFGVAR
{
RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff
{
return VERR_NOT_SUPPORTED;
}
{
return VERR_NOT_SUPPORTED;
}
typedef enum RTREQPOOLSTAT
{
RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff
{
return UINT64_MAX;
}
{
}
{
/*
* Ignore NULL and validate the request.
*/
if (!hPool)
return 0;
/*
* Drop a reference, free it when it reaches zero.
*/
if (cRefs == 0)
{
AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
#ifdef RT_STRICT
#endif
/* Indicate to the worker threads that we're shutting down. */
{
}
/* Cancel pending requests. */
while (pPool->pPendingRequests)
{
}
/* Wait for the workers to shut down. */
{
/** @todo should we wait forever here? */
}
/* Free recycled requests. */
for (;;)
{
if (!pReq)
break;
}
/* Finally, free the handle. */
}
return cRefs;
}
{
/*
* Try recycle old requests.
*/
{
if (pReq)
{
if (RT_SUCCESS(rc))
{
return rc;
}
}
else
}
/*
* Allocate a new request.
*/
return VINF_SUCCESS;
}