reqpool.cpp revision 9de47c4ec7b0fc9a384e4b815153de399da7b8de
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * IPRT - Request Pool.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * Copyright (C) 2006-2011 Oracle Corporation
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * This file is part of VirtualBox Open Source Edition (OSE), as
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * available from http://www.virtualbox.org. This file is free software;
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * you can redistribute it and/or modify it under the terms of the GNU
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * General Public License (GPL) as published by the Free Software
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * Foundation, in version 2 as it comes in the "COPYING" file of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * The contents of this file may alternatively be used under the terms
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * of the Common Development and Distribution License Version 1.0
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * VirtualBox OSE distribution, in which case the provisions of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * CDDL are applicable instead of those of the GPL.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * You may elect to license modified versions of this file under the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * terms and conditions of either the GPL or the CDDL or both.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync/*******************************************************************************
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync* Header Files *
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync*******************************************************************************/
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync/*******************************************************************************
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync* Structures and Typedefs *
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync*******************************************************************************/
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Node in the RTREQPOOLINT::IdleThreads list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Node in the RTREQPOOLINT::WorkerThreads list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The submit timestamp of the pending request. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The submit timestamp of the request processing. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** When this CPU went idle the last time. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The number of requests processed by this thread. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Total time the requests processed by this thread took to process. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Total time the requests processed by this thread had to wait in
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * the queue before being scheduled. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The CPU this was scheduled last time we checked. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The submitter will put an incoming request here when scheduling an idle
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * thread. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The request the thread is currently processing. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The thread handle. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Nano seconds timestamp representing the birth time of the thread. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Pointer to the request thread pool instance the thread is associated
8867771015571c5542d39e393d7fe6304421a928vboxsync/** Pointer to a worker thread. */
8867771015571c5542d39e393d7fe6304421a928vboxsync * Request thread pool instance data.
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsynctypedef struct RTREQPOOLINT
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Magic value (RTREQPOOL_MAGIC). */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** @name Config
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The worker thread type. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The maximum number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The number of threads which should be spawned before throttling kicks
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The minimum number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The number of milliseconds a thread needs to be idle before it is
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * considered for retirement. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The max number of milliseconds to push back a submitter before creating
8867771015571c5542d39e393d7fe6304421a928vboxsync * a new worker thread once the threshold has been reached. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The minimum number of milliseconds to push back a submitter before
8867771015571c5542d39e393d7fe6304421a928vboxsync * creating a new worker thread once the threshold has been reached. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The max number of free requests in the recycle LIFO. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Signaled by terminating worker threads. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Destruction indicator. The worker threads checks in their loop. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync bool volatile fDestructing;
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The current submitter push back in milliseconds.
8867771015571c5542d39e393d7fe6304421a928vboxsync * This is recalculated when worker threads come and go. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The current number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Statistics: The total number of threads created. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Statistics: The timestamp when the last thread was created. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Linked list of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Reference counter. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The number of idle thread or threads in the process of becoming
8867771015571c5542d39e393d7fe6304421a928vboxsync * idle. This is increased before the to-be-idle thread tries to enter
8867771015571c5542d39e393d7fe6304421a928vboxsync * the critical section and add itself to the list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Linked list of idle threads. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Head of the request FIFO. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Where to insert the next request. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Head of the request recycling LIFO. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The number of requests in the recycling LIFO. This is read without
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * entering the critical section, thus volatile. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Critical section serializing access to members of this structure. */
8867771015571c5542d39e393d7fe6304421a928vboxsyncstatic void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
8867771015571c5542d39e393d7fe6304421a928vboxsync uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
8867771015571c5542d39e393d7fe6304421a928vboxsync uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
8867771015571c5542d39e393d7fe6304421a928vboxsync uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
8867771015571c5542d39e393d7fe6304421a928vboxsync cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
8867771015571c5542d39e393d7fe6304421a928vboxsyncstatic void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
8867771015571c5542d39e393d7fe6304421a928vboxsync * Update thread state.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Do the actual processing.
8867771015571c5542d39e393d7fe6304421a928vboxsync /** @todo */
8867771015571c5542d39e393d7fe6304421a928vboxsync * Update thread statistics and state.
8867771015571c5542d39e393d7fe6304421a928vboxsync pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
8867771015571c5542d39e393d7fe6304421a928vboxsync pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
8867771015571c5542d39e393d7fe6304421a928vboxsyncstatic DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync/** @todo rework this... */
8867771015571c5542d39e393d7fe6304421a928vboxsync * The work loop.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Pending work?
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
8867771015571c5542d39e393d7fe6304421a928vboxsync /* Recheck the todo request pointer after entering the critsect. */
8867771015571c5542d39e393d7fe6304421a928vboxsync pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
8867771015571c5542d39e393d7fe6304421a928vboxsync /* Any pending requests in the queue? */
8867771015571c5542d39e393d7fe6304421a928vboxsync pPool->ppPendingRequests = &pPool->pPendingRequests;
8867771015571c5542d39e393d7fe6304421a928vboxsync * Un-idle ourselves and process the request.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Nothing to do, go idle.
8867771015571c5542d39e393d7fe6304421a928vboxsync RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
8867771015571c5542d39e393d7fe6304421a928vboxsync * Clean up on the way out.
8867771015571c5542d39e393d7fe6304421a928vboxsync /** @todo .... */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Create a new worker thread.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The pool needing new worker thread.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @remarks Caller owns the critical section
c999f225d03074008a0c21cdd5d3594da476e243vboxsyncstatic void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
c999f225d03074008a0c21cdd5d3594da476e243vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
c999f225d03074008a0c21cdd5d3594da476e243vboxsync RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
c999f225d03074008a0c21cdd5d3594da476e243vboxsync int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
c999f225d03074008a0c21cdd5d3594da476e243vboxsync pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Repel the submitter, giving the worker threads a chance to process the
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * incoming request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @returns Success if a worker picked up the request, failure if not. The
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * critical section has been left on success, while we'll be inside it
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * on failure.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The pool.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pReq The incoming request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncstatic int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Lazily create the push back semaphore that we'll be blociing on.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Prepare the request and semaphore.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncDECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Try schedule the request to a thread that's currently idle.
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
8867771015571c5542d39e393d7fe6304421a928vboxsync /** @todo CPU affinity... */
8867771015571c5542d39e393d7fe6304421a928vboxsync * Put the request in the pending queue.
8867771015571c5542d39e393d7fe6304421a928vboxsync pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
8867771015571c5542d39e393d7fe6304421a928vboxsync * If there is an incoming worker thread already or we've reached the
8867771015571c5542d39e393d7fe6304421a928vboxsync * maximum number of worker threads, we're done.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Push back before creating a new worker thread.
8867771015571c5542d39e393d7fe6304421a928vboxsync && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Create a new thread for processing the request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * For simplicity, we don't bother leaving the critical section while doing so.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Frees a requst.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @returns true if recycled, false if not.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The request thread pool.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pReq The request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncDECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync return true;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync return false;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Ignore NULL and validate the request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Drop a reference, free it when it reaches zero.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Indicate to the worker threads that we're shutting down. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Cancel pending requests. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Wait for the workers to shut down. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** @todo should we wait forever here? */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Free recycled requests. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Finally, free the handle. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncRTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Try recycle old requests.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Allocate a new request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));