reqpool.cpp revision 9de47c4ec7b0fc9a384e4b815153de399da7b8de
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * IPRT - Request Pool.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Copyright (C) 2006-2011 Oracle Corporation
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * This file is part of VirtualBox Open Source Edition (OSE), as
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * available from http://www.virtualbox.org. This file is free software;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * you can redistribute it and/or modify it under the terms of the GNU
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * General Public License (GPL) as published by the Free Software
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Foundation, in version 2 as it comes in the "COPYING" file of the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * The contents of this file may alternatively be used under the terms
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * of the Common Development and Distribution License Version 1.0
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * VirtualBox OSE distribution, in which case the provisions of the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * CDDL are applicable instead of those of the GPL.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * You may elect to license modified versions of this file under the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * terms and conditions of either the GPL or the CDDL or both.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync/*******************************************************************************
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync* Header Files *
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync*******************************************************************************/
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync/*******************************************************************************
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync* Structures and Typedefs *
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync*******************************************************************************/
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Node in the RTREQPOOLINT::IdleThreads list. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Node in the RTREQPOOLINT::WorkerThreads list. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The submit timestamp of the pending request. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The submit timestamp of the request processing. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** When this CPU went idle the last time. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The number of requests processed by this thread. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Total time the requests processed by this thread took to process. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Total time the requests processed by this thread had to wait in
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * the queue before being scheduled. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The CPU this was scheduled last time we checked. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The submitter will put an incoming request here when scheduling an idle
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * thread. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The request the thread is currently processing. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The thread handle. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Nano seconds timestamp representing the birth time of the thread. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Pointer to the request thread pool instance the thread is associated
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync/** Pointer to a worker thread. */
605036d491298181444650ae12453c9207a7cf01vboxsync * Request thread pool instance data.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsynctypedef struct RTREQPOOLINT
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Magic value (RTREQPOOL_MAGIC). */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** @name Config
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The worker thread type. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The maximum number of worker threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The number of threads which should be spawned before throttling kicks
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The minimum number of worker threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The number of milliseconds a thread needs to be idle before it is
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * considered for retirement. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The max number of milliseconds to push back a submitter before creating
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * a new worker thread once the threshold has been reached. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The minimum number of milliseconds to push back a submitter before
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * creating a new worker thread once the threshold has been reached. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The max number of free requests in the recycle LIFO. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Signaled by terminating worker threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Destruction indicator. The worker threads checks in their loop. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync bool volatile fDestructing;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The current submitter push back in milliseconds.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * This is recalculated when worker threads come and go. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The current number of worker threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Statistics: The total number of threads created. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Statistics: The timestamp when the last thread was created. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Linked list of worker threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Reference counter. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The number of idle thread or threads in the process of becoming
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * idle. This is increased before the to-be-idle thread tries to enter
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * the critical section and add itself to the list. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Linked list of idle threads. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Head of the request FIFO. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Where to insert the next request. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Head of the request recycling LIFO. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** The number of requests in the recycling LIFO. This is read without
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * entering the critical section, thus volatile. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** Critical section serializing access to members of this structure. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncstatic void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncstatic void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Update thread state.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Do the actual processing.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** @todo */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Update thread statistics and state.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncstatic DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync/** @todo rework this... */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * The work loop.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Pending work?
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Recheck the todo request pointer after entering the critsect. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Any pending requests in the queue? */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pPool->ppPendingRequests = &pPool->pPendingRequests;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Un-idle ourselves and process the request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Nothing to do, go idle.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Clean up on the way out.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** @todo .... */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Create a new worker thread.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @param pPool The pool needing new worker thread.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @remarks Caller owns the critical section
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncstatic void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Repel the submitter, giving the worker threads a chance to process the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * incoming request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @returns Success if a worker picked up the request, failure if not. The
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * critical section has been left on success, while we'll be inside it
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * on failure.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @param pPool The pool.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @param pReq The incoming request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncstatic int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Lazily create the push back semaphore that we'll be blociing on.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Prepare the request and semaphore.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncDECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Try schedule the request to a thread that's currently idle.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** @todo CPU affinity... */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Put the request in the pending queue.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * If there is an incoming worker thread already or we've reached the
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * maximum number of worker threads, we're done.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Push back before creating a new worker thread.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Create a new thread for processing the request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * For simplicity, we don't bother leaving the critical section while doing so.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Frees a requst.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @returns true if recycled, false if not.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @param pPool The request thread pool.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * @param pReq The request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncDECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync return true;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync return false;
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Ignore NULL and validate the request.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Drop a reference, free it when it reaches zero.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Indicate to the worker threads that we're shutting down. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Cancel pending requests. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Wait for the workers to shut down. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /** @todo should we wait forever here? */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Free recycled requests. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync /* Finally, free the handle. */
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsyncRTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync * Try recycle old requests.
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
7da9e7e719adde3baba3f6fa1d0bcfb170cf9911vboxsync LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
605036d491298181444650ae12453c9207a7cf01vboxsync * Allocate a new request.
605036d491298181444650ae12453c9207a7cf01vboxsync int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
605036d491298181444650ae12453c9207a7cf01vboxsync LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));