Lines Matching refs:pPool

98     struct RTREQPOOLINT    *pPool;
221 * @param pPool The pool. cMsCurPushBack will be changed.
223 static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
225 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
226 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
227 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
234 cMsCurPushBack += pPool->cMsMinPushBack;
236 pPool->cMsCurPushBack = cMsCurPushBack;
245 * @param pPool The pool.
250 static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
253 RTCritSectEnter(&pPool->CritSect);
259 Assert(pPool->cIdleThreads > 0);
260 ASMAtomicDecU32(&pPool->cIdleThreads);
265 Assert(pPool->cCurThreads > 0);
266 pPool->cCurThreads--;
267 rtReqPoolRecalcPushBack(pPool);
280 if ( RTListIsEmpty(&pPool->WorkerThreads)
281 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
282 RTSemEventMultiSignal(pPool->hThreadTermEvt);
284 RTCritSectLeave(&pPool->CritSect);
294 * @param pPool The pool.
298 static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
306 ASMAtomicIncU32(&pPool->cCurActiveRequests);
317 ASMAtomicDecU32(&pPool->cCurActiveRequests);
337 PRTREQPOOLINT pPool = pThread->pPool;
346 while (!pPool->fDestructing)
357 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
361 ASMAtomicIncU32(&pPool->cIdleThreads);
362 RTCritSectEnter(&pPool->CritSect);
367 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
369 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
371 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
380 RTCritSectLeave(&pPool->CritSect);
382 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
387 pReq = pPool->pPendingRequests;
390 pPool->pPendingRequests = pReq->pNext;
392 pPool->ppPendingRequests = &pPool->pPendingRequests;
393 Assert(pPool->cCurPendingRequests > 0);
394 pPool->cCurPendingRequests--;
401 ASMAtomicDecU32(&pPool->cIdleThreads);
403 ASMAtomicDecU32(&pPool->cIdleThreads);
404 RTCritSectLeave(&pPool->CritSect);
406 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
418 else if (pPool->cCurThreads > pPool->cMinThreads)
421 if (cNsIdle >= pPool->cNsMinIdle)
422 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
426 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
428 ASMAtomicDecU32(&pPool->cIdleThreads);
430 uint32_t const cMsSleep = pPool->cMsIdleSleep;
432 RTCritSectLeave(&pPool->CritSect);
437 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
444 * @param pPool The pool needing new worker thread.
447 static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
454 pThread->pPool = pPool;
458 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
459 pPool->cCurThreads++;
460 pPool->cThreadsCreated++;
463 pPool->enmThreadType, 0 /*fFlags*/, "%s%02u", pPool->szName, pPool->cThreadsCreated);
465 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
468 pPool->cCurThreads--;
482 * @param pPool The pool.
485 static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
503 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
508 RTCritSectLeave(&pPool->CritSect);
517 RTCritSectEnter(&pPool->CritSect);
525 DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
527 RTCritSectEnter(&pPool->CritSect);
529 pPool->cReqSubmitted++;
534 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
542 ASMAtomicDecU32(&pPool->cIdleThreads);
546 RTCritSectLeave(&pPool->CritSect);
549 Assert(RTListIsEmpty(&pPool->IdleThreads));
555 *pPool->ppPendingRequests = pReq;
556 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
557 pPool->cCurPendingRequests++;
563 if ( pPool->cIdleThreads > 0
564 || pPool->cCurThreads >= pPool->cMaxThreads)
566 RTCritSectLeave(&pPool->CritSect);
573 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
574 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
576 int rc = rtReqPoolPushBack(pPool, pReq);
585 rtReqPoolCreateNewWorker(pPool);
587 RTCritSectLeave(&pPool->CritSect);
596 * @param pPool The request thread pool.
599 DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
601 if ( pPool
602 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
604 RTCritSectEnter(&pPool->CritSect);
605 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
607 pReq->pNext = pPool->pFreeRequests;
608 pPool->pFreeRequests = pReq;
609 ASMAtomicIncU32(&pPool->cCurFreeRequests);
611 RTCritSectLeave(&pPool->CritSect);
615 RTCritSectLeave(&pPool->CritSect);
654 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
655 if (!pPool)
658 pPool->u32Magic = RTREQPOOL_MAGIC;
659 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
661 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
662 pPool->cMaxThreads = cMaxThreads;
663 pPool->cMinThreads = cMinThreads;
664 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
665 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
666 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
667 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
668 pPool->cMsMaxPushBack = cMsMaxPushBack;
669 pPool->cMsMinPushBack = cMsMinPushBack;
670 pPool->cMaxFreeRequests = cMaxThreads * 2;
671 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
672 pPool->fDestructing = false;
673 pPool->cMsCurPushBack = 0;
674 pPool->cCurThreads = 0;
675 pPool->cThreadsCreated = 0;
676 pPool->uLastThreadCreateNanoTs = 0;
677 RTListInit(&pPool->WorkerThreads);
678 pPool->cReqProcessed = 0;
679 pPool->cNsTotalReqProcessing= 0;
680 pPool->cNsTotalReqQueued = 0;
681 pPool->cRefs = 1;
682 pPool->cIdleThreads = 0;
683 RTListInit(&pPool->IdleThreads);
684 pPool->pPendingRequests = NULL;
685 pPool->ppPendingRequests = &pPool->pPendingRequests;
686 pPool->cCurPendingRequests = 0;
687 pPool->cCurActiveRequests = 0;
688 pPool->cReqSubmitted = 0;
689 pPool->pFreeRequests = NULL;
690 pPool->cCurFreeRequests = 0;
692 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
695 rc = RTCritSectInit(&pPool->CritSect);
698 *phPool = pPool;
702 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
704 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
705 RTMemFree(pPool);
713 PRTREQPOOLINT pPool = hPool;
714 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
715 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
718 RTCritSectEnter(&pPool->CritSect);
728 pPool->enmThreadType = (RTTHREADTYPE)uValue;
733 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
734 pPool->cMinThreads = (uint32_t)uValue;
735 if (pPool->cMinThreads > pPool->cMaxThreads)
736 pPool->cMaxThreads = pPool->cMinThreads;
737 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
738 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
739 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
740 rtReqPoolRecalcPushBack(pPool);
745 pPool->cMaxThreads = (uint32_t)uValue;
746 if (pPool->cMaxThreads < pPool->cMinThreads)
748 pPool->cMinThreads = pPool->cMaxThreads;
751 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
752 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
753 rtReqPoolRecalcPushBack(pPool);
760 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
761 pPool->cMsMinIdle = (uint32_t)uValue;
762 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
763 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
764 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
768 pPool->cMsMinIdle = UINT32_MAX;
769 pPool->cNsMinIdle = UINT64_MAX;
770 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
776 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
777 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
778 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
780 pPool->cMsMinIdle = UINT32_MAX;
781 pPool->cNsMinIdle = UINT64_MAX;
787 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
789 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
792 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
793 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
794 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
803 pPool->cMsMinPushBack = (uint32_t)uValue;
804 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
805 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
806 rtReqPoolRecalcPushBack(pPool);
814 pPool->cMsMaxPushBack = (uint32_t)uValue;
815 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
816 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
817 rtReqPoolRecalcPushBack(pPool);
823 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
824 if (pPool->cMaxFreeRequests < 16)
825 pPool->cMaxFreeRequests = 16;
830 pPool->cMaxFreeRequests = (uint32_t)uValue;
833 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
835 PRTREQINT pReq = pPool->pFreeRequests;
836 pPool->pFreeRequests = pReq->pNext;
837 ASMAtomicDecU32(&pPool->cCurFreeRequests);
852 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
858 RTCritSectLeave(&pPool->CritSect);
867 PRTREQPOOLINT pPool = hPool;
868 AssertPtrReturn(pPool, UINT64_MAX);
869 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
872 RTCritSectEnter(&pPool->CritSect);
878 u64 = pPool->enmThreadType;
882 u64 = pPool->cMinThreads;
886 u64 = pPool->cMaxThreads;
890 u64 = pPool->cMsMinIdle;
894 u64 = pPool->cMsIdleSleep;
898 u64 = pPool->cThreadsPushBackThreshold;
902 u64 = pPool->cMsMinPushBack;
906 u64 = pPool->cMsMaxPushBack;
910 u64 = pPool->cMaxFreeRequests;
919 RTCritSectLeave(&pPool->CritSect);
928 PRTREQPOOLINT pPool = hPool;
929 AssertPtrReturn(pPool, UINT64_MAX);
930 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
933 RTCritSectEnter(&pPool->CritSect);
938 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
939 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
940 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
941 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
942 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
943 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
944 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
945 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
946 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
947 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
948 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
955 RTCritSectLeave(&pPool->CritSect);
964 PRTREQPOOLINT pPool = hPool;
965 AssertPtrReturn(pPool, UINT32_MAX);
966 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
968 return ASMAtomicIncU32(&pPool->cRefs);
980 PRTREQPOOLINT pPool = hPool;
981 AssertPtrReturn(pPool, UINT32_MAX);
982 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
987 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
990 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
992 RTCritSectEnter(&pPool->CritSect);
998 ASMAtomicWriteBool(&pPool->fDestructing, true);
1000 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1007 Assert(!pPool->pPendingRequests);
1008 while (pPool->pPendingRequests)
1010 PRTREQINT pReq = pPool->pPendingRequests;
1011 pPool->pPendingRequests = pReq->pNext;
1014 pPool->ppPendingRequests = NULL;
1015 pPool->cCurPendingRequests = 0;
1018 while (!RTListIsEmpty(&pPool->WorkerThreads))
1020 RTCritSectLeave(&pPool->CritSect);
1021 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1022 RTCritSectEnter(&pPool->CritSect);
1029 PRTREQINT pReq = pPool->pFreeRequests;
1032 pPool->pFreeRequests = pReq->pNext;
1033 pPool->cCurFreeRequests--;
1038 RTCritSectLeave(&pPool->CritSect);
1039 RTCritSectDelete(&pPool->CritSect);
1040 RTMemFree(pPool);
1050 PRTREQPOOLINT pPool = hPool;
1051 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1052 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1057 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1059 RTCritSectEnter(&pPool->CritSect);
1060 PRTREQINT pReq = pPool->pFreeRequests;
1063 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1064 pPool->pFreeRequests = pReq->pNext;
1066 RTCritSectLeave(&pPool->CritSect);
1069 Assert(pReq->uOwner.hPool == pPool);
1080 RTCritSectLeave(&pPool->CritSect);
1086 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);