reqpool.cpp revision dbf778dbca739883cb93737abe3e5095011dbe6a
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * IPRT - Request Pool.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * Copyright (C) 2006-2011 Oracle Corporation
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * This file is part of VirtualBox Open Source Edition (OSE), as
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * available from http://www.virtualbox.org. This file is free software;
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * you can redistribute it and/or modify it under the terms of the GNU
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * General Public License (GPL) as published by the Free Software
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * Foundation, in version 2 as it comes in the "COPYING" file of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * The contents of this file may alternatively be used under the terms
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * of the Common Development and Distribution License Version 1.0
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * VirtualBox OSE distribution, in which case the provisions of the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * CDDL are applicable instead of those of the GPL.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * You may elect to license modified versions of this file under the
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync * terms and conditions of either the GPL or the CDDL or both.
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync/*******************************************************************************
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync* Header Files *
17ef1920962b3df57bf6d2704ced1586396d96f0vboxsync*******************************************************************************/
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync/*******************************************************************************
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync* Defined Constants And Macros *
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync*******************************************************************************/
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync/** The max number of worker threads. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync/** The max number of milliseconds to push back. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync/** The max number of free requests to keep around. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync/*******************************************************************************
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync* Structures and Typedefs *
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync*******************************************************************************/
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Node in the RTREQPOOLINT::IdleThreads list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Node in the RTREQPOOLINT::WorkerThreads list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The submit timestamp of the pending request. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The submit timestamp of the request processing. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** When this CPU went idle the last time. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The number of requests processed by this thread. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Total time the requests processed by this thread took to process. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Total time the requests processed by this thread had to wait in
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * the queue before being scheduled. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The CPU this was scheduled last time we checked. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The submitter will put an incoming request here when scheduling an idle
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * thread. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The request the thread is currently processing. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The thread handle. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Nano seconds timestamp representing the birth time of the thread. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Pointer to the request thread pool instance the thread is associated
8867771015571c5542d39e393d7fe6304421a928vboxsync/** Pointer to a worker thread. */
8867771015571c5542d39e393d7fe6304421a928vboxsync * Request thread pool instance data.
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsynctypedef struct RTREQPOOLINT
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Magic value (RTREQPOOL_MAGIC). */
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync /** The request pool name. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** @name Config
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The worker thread type. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The maximum number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The minimum number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** The number of milliseconds a thread needs to be idle before it is
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync * considered for retirement. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /** cMsMinIdle in nano seconds. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /** The idle thread sleep interval in milliseconds. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** The number of threads which should be spawned before throttling kicks
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The max number of milliseconds to push back a submitter before creating
8867771015571c5542d39e393d7fe6304421a928vboxsync * a new worker thread once the threshold has been reached. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The minimum number of milliseconds to push back a submitter before
8867771015571c5542d39e393d7fe6304421a928vboxsync * creating a new worker thread once the threshold has been reached. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The max number of free requests in the recycle LIFO. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Signaled by terminating worker threads. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Destruction indicator. The worker threads checks in their loop. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync bool volatile fDestructing;
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The current submitter push back in milliseconds.
8867771015571c5542d39e393d7fe6304421a928vboxsync * This is recalculated when worker threads come and go. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The current number of worker threads. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Statistics: The total number of threads created. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Statistics: The timestamp when the last thread was created. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Linked list of worker threads. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** The number of requests processed and counted in the time totals. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** Total time the requests processed by this thread took to process. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** Total time the requests processed by this thread had to wait in
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync * the queue before being scheduled. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Reference counter. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** The number of idle thread or threads in the process of becoming
8867771015571c5542d39e393d7fe6304421a928vboxsync * idle. This is increased before the to-be-idle thread tries to enter
8867771015571c5542d39e393d7fe6304421a928vboxsync * the critical section and add itself to the list. */
fe14fe6d46ce87a9b25cbdacb3a20b1f87bf34c7vboxsync /** Linked list of idle threads. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Head of the request FIFO. */
8867771015571c5542d39e393d7fe6304421a928vboxsync /** Where to insert the next request. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** The number of requests currently pending. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** The number of requests currently being executed. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** The number of requests submitted. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Head of the request recycling LIFO. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** The number of requests in the recycling LIFO. This is read without
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * entering the critical section, thus volatile. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** Critical section serializing access to members of this structure. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * Used by exiting thread and the pool destruction code to cancel unexpected
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * requests.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param pReq The request.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * Recalculate the max pushback interval when adding or removing worker threads.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param pPool The pool. cMsCurPushBack will be changed.
8867771015571c5542d39e393d7fe6304421a928vboxsyncstatic void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
8867771015571c5542d39e393d7fe6304421a928vboxsync uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
8867771015571c5542d39e393d7fe6304421a928vboxsync cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * Performs thread exit.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @returns Thread termination status code (VINF_SUCCESS).
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param pPool The pool.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param pThread The thread.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param fLocked Whether we are inside the critical section
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsyncstatic int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Get out of the idle list. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Get out of the thread list. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* This shouldn't happen... */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* If we're the last thread terminating, ping the destruction thread before
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync we leave the critical section. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync * Process one request.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync * @param pPool The pool.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync * @param pThread The worker thread.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync * @param pReq The request to process.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsyncstatic void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
8867771015571c5542d39e393d7fe6304421a928vboxsync * Update thread state.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Do the actual processing.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Update thread statistics and state.
8867771015571c5542d39e393d7fe6304421a928vboxsync pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
dbf778dbca739883cb93737abe3e5095011dbe6avboxsync pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * The Worker Thread Procedure.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @returns VINF_SUCCESS.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param hThreadSelf The thread handle (unused).
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * @param pvArg Pointer to the thread data.
8867771015571c5542d39e393d7fe6304421a928vboxsyncstatic DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
8867771015571c5542d39e393d7fe6304421a928vboxsync * The work loop.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * Process pending work.
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Check if anything is scheduled directly to us. */
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /* Update the global statistics. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if (cReqPrevProcessedStat != pThread->cReqProcessed)
224132d5ac9f727f499ec4507bc1da2a06080c03vboxsync pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Recheck the todo request pointer after entering the critsect. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Any pending requests in the queue? */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync pPool->ppPendingRequests = &pPool->pPendingRequests;
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync /* Un-idle ourselves and process the request. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync * Nothing to do, go idle.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if (cReqPrevProcessedIdle != pThread->cReqProcessed)
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Create a new worker thread.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The pool needing new worker thread.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @remarks Caller owns the critical section
c999f225d03074008a0c21cdd5d3594da476e243vboxsyncstatic void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
c999f225d03074008a0c21cdd5d3594da476e243vboxsync PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
c999f225d03074008a0c21cdd5d3594da476e243vboxsync RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
c999f225d03074008a0c21cdd5d3594da476e243vboxsync int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
88f85f14bc6b8d928002d328b2c8c05a8f42179evboxsync pPool->enmThreadType, 0 /*fFlags*/, "%s%02u", pPool->szName, pPool->cThreadsCreated);
c999f225d03074008a0c21cdd5d3594da476e243vboxsync pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Repel the submitter, giving the worker threads a chance to process the
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * incoming request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @returns Success if a worker picked up the request, failure if not. The
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * critical section has been left on success, while we'll be inside it
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * on failure.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The pool.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pReq The incoming request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncstatic int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Lazily create the push back semaphore that we'll be blociing on.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Prepare the request and semaphore.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncDECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Try schedule the request to a thread that's currently idle.
8867771015571c5542d39e393d7fe6304421a928vboxsync PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /** @todo CPU affinity??? */
8867771015571c5542d39e393d7fe6304421a928vboxsync * Put the request in the pending queue.
8867771015571c5542d39e393d7fe6304421a928vboxsync pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
8867771015571c5542d39e393d7fe6304421a928vboxsync * If there is an incoming worker thread already or we've reached the
8867771015571c5542d39e393d7fe6304421a928vboxsync * maximum number of worker threads, we're done.
8867771015571c5542d39e393d7fe6304421a928vboxsync * Push back before creating a new worker thread.
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
8867771015571c5542d39e393d7fe6304421a928vboxsync && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Create a new thread for processing the request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * For simplicity, we don't bother leaving the critical section while doing so.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Frees a requst.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @returns true if recycled, false if not.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pPool The request thread pool.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * @param pReq The request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncDECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync return true;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync return false;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync * Validate and massage the config.
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync * Create and initialize the pool.
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pPool->ppPendingRequests = &pPool->pPendingRequests;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsyncRTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync else if (uValue == 0)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync /* Wake up all idle threads if required. */
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsyncRTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
c1b4529a269091edd0274bd98d35b75663fd66c0vboxsync case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Ignore NULL and validate the request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Drop a reference, free it when it reaches zero.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Indicate to the worker threads that we're shutting down. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Cancel pending requests. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Wait for the workers to shut down. */
4c464a0800535c29782d7ab5bcd7820bc7115cd7vboxsync RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /** @todo should we wait forever here? */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync /* Free recycled requests. */
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync /* Finally, free the critical section and pool instance. */
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsyncRTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Try recycle old requests.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync * Allocate a new request.
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
9de47c4ec7b0fc9a384e4b815153de399da7b8devboxsync LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync * Check input.
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync * Allocate and initialize the request.
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync * Submit the request.
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
f5edc444546b57af847ae33f2bd1e10442496e47vboxsync int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
f5edc444546b57af847ae33f2bd1e10442496e47vboxsyncRTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)