1N/A/*
1N/A * Copyright (c) 2003-2004, 2007, 2009-2011 Sendmail, Inc. and its suppliers.
1N/A * All rights reserved.
1N/A *
1N/A * By using this file, you agree to the terms and conditions set
1N/A * forth in the LICENSE file which can be found at the top level of
1N/A * the sendmail distribution.
1N/A *
1N/A * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
1N/A * Jose-Marcio.Martins@ensmp.fr
1N/A */
1N/A
1N/A#include <sm/gen.h>
1N/ASM_RCSID("@(#)$Id: worker.c,v 8.19 2011/02/14 23:33:48 ca Exp $")
1N/A
1N/A#include "libmilter.h"
1N/A
1N/A#if _FFR_WORKERS_POOL
1N/A
1N/Atypedef struct taskmgr_S taskmgr_T;
1N/A
1N/A#define TM_SIGNATURE 0x23021957
1N/A
1N/Astruct taskmgr_S
1N/A{
1N/A long tm_signature; /* has the controller been initialized */
1N/A sthread_t tm_tid; /* thread id of controller */
1N/A smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
1N/A
1N/A int tm_nb_workers; /* number of workers in the pool */
1N/A int tm_nb_idle; /* number of workers waiting */
1N/A
1N/A int tm_p[2]; /* poll control pipe */
1N/A
1N/A smutex_t tm_w_mutex; /* linked list access mutex */
1N/A scond_t tm_w_cond; /* */
1N/A};
1N/A
1N/Astatic taskmgr_T Tskmgr = {0};
1N/A
1N/A#define WRK_CTX_HEAD Tskmgr.tm_ctx_head
1N/A
1N/A#define RD_PIPE (Tskmgr.tm_p[0])
1N/A#define WR_PIPE (Tskmgr.tm_p[1])
1N/A
1N/A#define PIPE_SEND_SIGNAL() \
1N/A do \
1N/A { \
1N/A char evt = 0x5a; \
1N/A int fd = WR_PIPE; \
1N/A if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
1N/A smi_log(SMI_LOG_ERR, \
1N/A "Error writing to event pipe: %s", \
1N/A sm_errstring(errno)); \
1N/A } while (0)
1N/A
1N/A#ifndef USE_PIPE_WAKE_POLL
1N/A# define USE_PIPE_WAKE_POLL 1
1N/A#endif /* USE_PIPE_WAKE_POLL */
1N/A
1N/A/* poll check periodicity (default 10000 - 10 s) */
1N/A#define POLL_TIMEOUT 10000
1N/A
1N/A/* worker conditional wait timeout (default 10 s) */
1N/A#define COND_TIMEOUT 10
1N/A
1N/A/* functions */
1N/Astatic int mi_close_session __P((SMFICTX_PTR));
1N/A
1N/Astatic void *mi_worker __P((void *));
1N/Astatic void *mi_pool_controller __P((void *));
1N/A
1N/Astatic int mi_list_add_ctx __P((SMFICTX_PTR));
1N/Astatic int mi_list_del_ctx __P((SMFICTX_PTR));
1N/A
1N/A/*
1N/A** periodicity of cleaning up old sessions (timedout)
1N/A** sessions list will be checked to find old inactive
1N/A** sessions each DT_CHECK_OLD_SESSIONS sec
1N/A*/
1N/A
1N/A#define DT_CHECK_OLD_SESSIONS 600
1N/A
1N/A#ifndef OLD_SESSION_TIMEOUT
1N/A# define OLD_SESSION_TIMEOUT ctx->ctx_timeout
1N/A#endif /* OLD_SESSION_TIMEOUT */
1N/A
1N/A/* session states - with respect to the pool of workers */
1N/A#define WKST_INIT 0 /* initial state */
1N/A#define WKST_READY_TO_RUN 1 /* command ready do be read */
1N/A#define WKST_RUNNING 2 /* session running on a worker */
1N/A#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
1N/A#define WKST_WAITING 4 /* waiting for new command */
1N/A#define WKST_CLOSING 5 /* session finished */
1N/A
1N/A#ifndef MIN_WORKERS
1N/A# define MIN_WORKERS 2 /* minimum number of threads to keep around */
1N/A#endif
1N/A
1N/A#define MIN_IDLE 1 /* minimum number of idle threads */
1N/A
1N/A
1N/A/*
1N/A** Macros for threads and mutex management
1N/A*/
1N/A
1N/A#define TASKMGR_LOCK() \
1N/A do \
1N/A { \
1N/A if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
1N/A smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
1N/A } while (0)
1N/A
1N/A#define TASKMGR_UNLOCK() \
1N/A do \
1N/A { \
1N/A if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
1N/A smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
1N/A } while (0)
1N/A
1N/A#define TASKMGR_COND_WAIT() \
1N/A scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
1N/A
1N/A#define TASKMGR_COND_SIGNAL() \
1N/A do \
1N/A { \
1N/A if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
1N/A smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
1N/A } while (0)
1N/A
1N/A#define LAUNCH_WORKER(ctx) \
1N/A do \
1N/A { \
1N/A int r; \
1N/A sthread_t tid; \
1N/A \
1N/A if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
1N/A smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
1N/A sm_errstring(r)); \
1N/A } while (0)
1N/A
1N/A#if POOL_DEBUG
1N/A# define POOL_LEV_DPRINTF(lev, x) \
1N/A do { \
1N/A if ((lev) < ctx->ctx_dbg) \
1N/A sm_dprintf x; \
1N/A } while (0)
1N/A#else /* POOL_DEBUG */
1N/A# define POOL_LEV_DPRINTF(lev, x)
1N/A#endif /* POOL_DEBUG */
1N/A
1N/A/*
1N/A** MI_START_SESSION -- Start a session in the pool of workers
1N/A**
1N/A** Parameters:
1N/A** ctx -- context structure
1N/A**
1N/A** Returns:
1N/A** MI_SUCCESS/MI_FAILURE
1N/A*/
1N/A
1N/Aint
1N/Ami_start_session(ctx)
1N/A SMFICTX_PTR ctx;
1N/A{
1N/A static long id = 0;
1N/A
1N/A /* this can happen if the milter is shutting down */
1N/A if (Tskmgr.tm_signature != TM_SIGNATURE)
1N/A return MI_FAILURE;
1N/A SM_ASSERT(ctx != NULL);
1N/A POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
1N/A TASKMGR_LOCK();
1N/A
1N/A if (mi_list_add_ctx(ctx) != MI_SUCCESS)
1N/A {
1N/A TASKMGR_UNLOCK();
1N/A return MI_FAILURE;
1N/A }
1N/A
1N/A ctx->ctx_sid = id++;
1N/A
1N/A /* if there is an idle worker, signal it, otherwise start new worker */
1N/A if (Tskmgr.tm_nb_idle > 0)
1N/A {
1N/A ctx->ctx_wstate = WKST_READY_TO_RUN;
1N/A TASKMGR_COND_SIGNAL();
1N/A }
1N/A else
1N/A {
1N/A ctx->ctx_wstate = WKST_RUNNING;
1N/A LAUNCH_WORKER(ctx);
1N/A }
1N/A TASKMGR_UNLOCK();
1N/A return MI_SUCCESS;
1N/A}
1N/A
1N/A/*
1N/A** MI_CLOSE_SESSION -- Close a session and clean up data structures
1N/A**
1N/A** Parameters:
1N/A** ctx -- context structure
1N/A**
1N/A** Returns:
1N/A** MI_SUCCESS/MI_FAILURE
1N/A*/
1N/A
1N/Astatic int
1N/Ami_close_session(ctx)
1N/A SMFICTX_PTR ctx;
1N/A{
1N/A SM_ASSERT(ctx != NULL);
1N/A
1N/A (void) mi_list_del_ctx(ctx);
1N/A mi_clr_ctx(ctx);
1N/A
1N/A return MI_SUCCESS;
1N/A}
1N/A
1N/A/*
1N/A** NONBLOCKING -- set nonblocking mode for a file descriptor.
1N/A**
1N/A** Parameters:
1N/A** fd -- file descriptor
1N/A** name -- name for (error) logging
1N/A**
1N/A** Returns:
1N/A** MI_SUCCESS/MI_FAILURE
1N/A*/
1N/A
1N/Astatic int
1N/Anonblocking(int fd, const char *name)
1N/A{
1N/A int r;
1N/A
1N/A errno = 0;
1N/A r = fcntl(fd, F_GETFL, 0);
1N/A if (r == -1)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
1N/A name, sm_errstring(errno));
1N/A return MI_FAILURE;
1N/A }
1N/A errno = 0;
1N/A r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
1N/A if (r == -1)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
1N/A name, sm_errstring(errno));
1N/A return MI_FAILURE;
1N/A }
1N/A return MI_SUCCESS;
1N/A}
1N/A
1N/A/*
1N/A** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
1N/A** Must be called before starting sessions.
1N/A**
1N/A** Parameters:
1N/A** none
1N/A**
1N/A** Returns:
1N/A** MI_SUCCESS/MI_FAILURE
1N/A*/
1N/A
1N/Aint
1N/Ami_pool_controller_init()
1N/A{
1N/A sthread_t tid;
1N/A int r, i;
1N/A
1N/A if (Tskmgr.tm_signature == TM_SIGNATURE)
1N/A return MI_SUCCESS;
1N/A
1N/A SM_TAILQ_INIT(&WRK_CTX_HEAD);
1N/A Tskmgr.tm_tid = (sthread_t) -1;
1N/A Tskmgr.tm_nb_workers = 0;
1N/A Tskmgr.tm_nb_idle = 0;
1N/A
1N/A if (pipe(Tskmgr.tm_p) != 0)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
1N/A sm_errstring(errno));
1N/A return MI_FAILURE;
1N/A }
1N/A r = nonblocking(WR_PIPE, "WR_PIPE");
1N/A if (r != MI_SUCCESS)
1N/A return r;
1N/A r = nonblocking(RD_PIPE, "RD_PIPE");
1N/A if (r != MI_SUCCESS)
1N/A return r;
1N/A
1N/A (void) smutex_init(&Tskmgr.tm_w_mutex);
1N/A (void) scond_init(&Tskmgr.tm_w_cond);
1N/A
1N/A /* Launch the pool controller */
1N/A if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
1N/A sm_errstring(r));
1N/A return MI_FAILURE;
1N/A }
1N/A Tskmgr.tm_tid = tid;
1N/A Tskmgr.tm_signature = TM_SIGNATURE;
1N/A
1N/A /* Create the pool of workers */
1N/A for (i = 0; i < MIN_WORKERS; i++)
1N/A {
1N/A if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
1N/A sm_errstring(r));
1N/A return MI_FAILURE;
1N/A }
1N/A }
1N/A
1N/A return MI_SUCCESS;
1N/A}
1N/A
1N/A/*
1N/A** MI_POOL_CONTROLLER -- manage the pool of workers
1N/A** This thread must be running when listener begins
1N/A** starting sessions
1N/A**
1N/A** Parameters:
1N/A** arg -- unused
1N/A**
1N/A** Returns:
1N/A** NULL
1N/A**
1N/A** Control flow:
1N/A** for (;;)
1N/A** Look for timed out sessions
1N/A** Select sessions to wait for sendmail command
1N/A** Poll set of file descriptors
1N/A** if timeout
1N/A** continue
1N/A** For each file descriptor ready
1N/A** launch new thread if no worker available
1N/A** else
1N/A** signal waiting worker
1N/A*/
1N/A
1N/A/* Poll structure array (pollfd) size step */
1N/A#define PFD_STEP 256
1N/A
1N/A#define WAIT_FD(i) (pfd[i].fd)
1N/A#define WAITFN "POLL"
1N/A
1N/Astatic void *
1N/Ami_pool_controller(arg)
1N/A void *arg;
1N/A{
1N/A struct pollfd *pfd = NULL;
1N/A int dim_pfd = 0;
1N/A bool rebuild_set = true;
1N/A int pcnt = 0; /* error count for poll() failures */
1N/A time_t lastcheck;
1N/A
1N/A Tskmgr.tm_tid = sthread_get_id();
1N/A if (pthread_detach(Tskmgr.tm_tid) != 0)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
1N/A return NULL;
1N/A }
1N/A
1N/A pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
1N/A if (pfd == NULL)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
1N/A sm_errstring(errno));
1N/A return NULL;
1N/A }
1N/A dim_pfd = PFD_STEP;
1N/A
1N/A lastcheck = time(NULL);
1N/A for (;;)
1N/A {
1N/A SMFICTX_PTR ctx;
1N/A int nfd, rfd, i;
1N/A time_t now;
1N/A
1N/A POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
1N/A
1N/A if (mi_stop() != MILTER_CONT)
1N/A break;
1N/A
1N/A TASKMGR_LOCK();
1N/A
1N/A now = time(NULL);
1N/A
1N/A /* check for timed out sessions? */
1N/A if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
1N/A {
1N/A ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
1N/A while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
1N/A {
1N/A SMFICTX_PTR ctx_nxt;
1N/A
1N/A ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
1N/A if (ctx->ctx_wstate == WKST_WAITING)
1N/A {
1N/A if (ctx->ctx_wait == 0)
1N/A ctx->ctx_wait = now;
1N/A else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
1N/A < now)
1N/A {
1N/A /* if session timed out, close it */
1N/A sfsistat (*fi_close) __P((SMFICTX *));
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("Closing old connection: sd=%d id=%d",
1N/A ctx->ctx_sd,
1N/A ctx->ctx_sid));
1N/A
1N/A if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
1N/A (void) (*fi_close)(ctx);
1N/A
1N/A mi_close_session(ctx);
1N/A }
1N/A }
1N/A ctx = ctx_nxt;
1N/A }
1N/A lastcheck = now;
1N/A }
1N/A
1N/A if (rebuild_set)
1N/A {
1N/A /*
1N/A ** Initialize poll set.
1N/A ** Insert into the poll set the file descriptors of
1N/A ** all sessions waiting for a command from sendmail.
1N/A */
1N/A
1N/A nfd = 0;
1N/A
1N/A /* begin with worker pipe */
1N/A pfd[nfd].fd = RD_PIPE;
1N/A pfd[nfd].events = MI_POLL_RD_FLAGS;
1N/A pfd[nfd].revents = 0;
1N/A nfd++;
1N/A
1N/A SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
1N/A {
1N/A /*
1N/A ** update ctx_wait - start of wait moment -
1N/A ** for timeout
1N/A */
1N/A
1N/A if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
1N/A ctx->ctx_wait = now;
1N/A
1N/A /* add the session to the pollfd array? */
1N/A if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
1N/A (ctx->ctx_wstate == WKST_WAITING))
1N/A {
1N/A /*
1N/A ** Resize the pollfd array if it
1N/A ** isn't large enough.
1N/A */
1N/A
1N/A if (nfd >= dim_pfd)
1N/A {
1N/A struct pollfd *tpfd;
1N/A size_t new;
1N/A
1N/A new = (dim_pfd + PFD_STEP) *
1N/A sizeof(*tpfd);
1N/A tpfd = (struct pollfd *)
1N/A realloc(pfd, new);
1N/A if (tpfd != NULL)
1N/A {
1N/A pfd = tpfd;
1N/A dim_pfd += PFD_STEP;
1N/A }
1N/A else
1N/A {
1N/A smi_log(SMI_LOG_ERR,
1N/A "Failed to realloc pollfd array:%s",
1N/A sm_errstring(errno));
1N/A }
1N/A }
1N/A
1N/A /* add the session to pollfd array */
1N/A if (nfd < dim_pfd)
1N/A {
1N/A ctx->ctx_wstate = WKST_WAITING;
1N/A pfd[nfd].fd = ctx->ctx_sd;
1N/A pfd[nfd].events = MI_POLL_RD_FLAGS;
1N/A pfd[nfd].revents = 0;
1N/A nfd++;
1N/A }
1N/A }
1N/A }
1N/A rebuild_set = false;
1N/A }
1N/A
1N/A TASKMGR_UNLOCK();
1N/A
1N/A /* Everything is ready, let's wait for an event */
1N/A rfd = poll(pfd, nfd, POLL_TIMEOUT);
1N/A
1N/A POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
1N/A WAITFN, now, nfd));
1N/A
1N/A /* timeout */
1N/A if (rfd == 0)
1N/A continue;
1N/A
1N/A rebuild_set = true;
1N/A
1N/A /* error */
1N/A if (rfd < 0)
1N/A {
1N/A if (errno == EINTR)
1N/A continue;
1N/A pcnt++;
1N/A smi_log(SMI_LOG_ERR,
1N/A "%s() failed (%s), %s",
1N/A WAITFN, sm_errstring(errno),
1N/A pcnt >= MAX_FAILS_S ? "abort" : "try again");
1N/A
1N/A if (pcnt >= MAX_FAILS_S)
1N/A goto err;
1N/A }
1N/A pcnt = 0;
1N/A
1N/A /* something happened */
1N/A for (i = 0; i < nfd; i++)
1N/A {
1N/A if (pfd[i].revents == 0)
1N/A continue;
1N/A
1N/A POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
1N/A WAITFN, i, nfd,
1N/A WAIT_FD(i)));
1N/A
1N/A /* has a worker signaled an end of task ? */
1N/A if (WAIT_FD(i) == RD_PIPE)
1N/A {
1N/A char evts[256];
1N/A ssize_t r;
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("PIPE WILL READ evt = %08X %08X",
1N/A pfd[i].events, pfd[i].revents));
1N/A
1N/A r = 1;
1N/A while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
1N/A && r != -1)
1N/A {
1N/A r = read(RD_PIPE, evts, sizeof(evts));
1N/A }
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
1N/A i, RD_PIPE, (int) r, evts[0]));
1N/A
1N/A if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
1N/A {
1N/A /* Exception handling */
1N/A }
1N/A continue;
1N/A }
1N/A
1N/A /* no ! sendmail wants to send a command */
1N/A SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
1N/A {
1N/A if (ctx->ctx_wstate != WKST_WAITING)
1N/A continue;
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("Checking context sd=%d - fd=%d ",
1N/A ctx->ctx_sd , WAIT_FD(i)));
1N/A
1N/A if (ctx->ctx_sd == pfd[i].fd)
1N/A {
1N/A TASKMGR_LOCK();
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("TASK: found %d for fd[%d]=%d",
1N/A ctx->ctx_sid, i, WAIT_FD(i)));
1N/A
1N/A if (Tskmgr.tm_nb_idle > 0)
1N/A {
1N/A ctx->ctx_wstate = WKST_READY_TO_RUN;
1N/A TASKMGR_COND_SIGNAL();
1N/A }
1N/A else
1N/A {
1N/A ctx->ctx_wstate = WKST_RUNNING;
1N/A LAUNCH_WORKER(ctx);
1N/A }
1N/A TASKMGR_UNLOCK();
1N/A break;
1N/A }
1N/A }
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("TASK %s FOUND - Checking PIPE for fd[%d]",
1N/A ctx != NULL ? "" : "NOT", WAIT_FD(i)));
1N/A }
1N/A }
1N/A
1N/A err:
1N/A if (pfd != NULL)
1N/A free(pfd);
1N/A
1N/A Tskmgr.tm_signature = 0;
1N/A for (;;)
1N/A {
1N/A SMFICTX_PTR ctx;
1N/A
1N/A ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
1N/A if (ctx == NULL)
1N/A break;
1N/A mi_close_session(ctx);
1N/A }
1N/A
1N/A (void) smutex_destroy(&Tskmgr.tm_w_mutex);
1N/A (void) scond_destroy(&Tskmgr.tm_w_cond);
1N/A
1N/A return NULL;
1N/A}
1N/A
1N/A/*
1N/A** Look for a task ready to run.
1N/A** Value of ctx is NULL or a pointer to a task ready to run.
1N/A*/
1N/A
1N/A#define GET_TASK_READY_TO_RUN() \
1N/A SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
1N/A { \
1N/A if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
1N/A { \
1N/A ctx->ctx_wstate = WKST_RUNNING; \
1N/A break; \
1N/A } \
1N/A }
1N/A
1N/A/*
1N/A** MI_WORKER -- worker thread
1N/A** executes tasks distributed by the mi_pool_controller
1N/A** or by mi_start_session
1N/A**
1N/A** Parameters:
1N/A** arg -- pointer to context structure
1N/A**
1N/A** Returns:
1N/A** NULL pointer
1N/A*/
1N/A
1N/Astatic void *
1N/Ami_worker(arg)
1N/A void *arg;
1N/A{
1N/A SMFICTX_PTR ctx;
1N/A bool done;
1N/A sthread_t t_id;
1N/A int r;
1N/A
1N/A ctx = (SMFICTX_PTR) arg;
1N/A done = false;
1N/A if (ctx != NULL)
1N/A ctx->ctx_wstate = WKST_RUNNING;
1N/A
1N/A t_id = sthread_get_id();
1N/A if (pthread_detach(t_id) != 0)
1N/A {
1N/A smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
1N/A if (ctx != NULL)
1N/A ctx->ctx_wstate = WKST_READY_TO_RUN;
1N/A return NULL;
1N/A }
1N/A
1N/A TASKMGR_LOCK();
1N/A Tskmgr.tm_nb_workers++;
1N/A TASKMGR_UNLOCK();
1N/A
1N/A while (!done)
1N/A {
1N/A if (mi_stop() != MILTER_CONT)
1N/A break;
1N/A
1N/A /* let's handle next task... */
1N/A if (ctx != NULL)
1N/A {
1N/A int res;
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("worker %d: new task -> let's handle it",
1N/A t_id));
1N/A res = mi_engine(ctx);
1N/A POOL_LEV_DPRINTF(4,
1N/A ("worker %d: mi_engine returned %d", t_id, res));
1N/A
1N/A TASKMGR_LOCK();
1N/A if (res != MI_CONTINUE)
1N/A {
1N/A ctx->ctx_wstate = WKST_CLOSING;
1N/A
1N/A /*
1N/A ** Delete context from linked list of
1N/A ** sessions and close session.
1N/A */
1N/A
1N/A mi_close_session(ctx);
1N/A }
1N/A else
1N/A {
1N/A ctx->ctx_wstate = WKST_READY_TO_WAIT;
1N/A
1N/A POOL_LEV_DPRINTF(4,
1N/A ("writing to event pipe..."));
1N/A
1N/A /*
1N/A ** Signal task controller to add new session
1N/A ** to poll set.
1N/A */
1N/A
1N/A PIPE_SEND_SIGNAL();
1N/A }
1N/A TASKMGR_UNLOCK();
1N/A ctx = NULL;
1N/A
1N/A }
1N/A
1N/A /* check if there is any task waiting to be served */
1N/A TASKMGR_LOCK();
1N/A
1N/A GET_TASK_READY_TO_RUN();
1N/A
1N/A /* Got a task? */
1N/A if (ctx != NULL)
1N/A {
1N/A TASKMGR_UNLOCK();
1N/A continue;
1N/A }
1N/A
1N/A /*
1N/A ** if not, let's check if there is enough idle workers
1N/A ** if yes: quit
1N/A */
1N/A
1N/A if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
1N/A Tskmgr.tm_nb_idle > MIN_IDLE)
1N/A done = true;
1N/A
1N/A POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
1N/A Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
1N/A
1N/A if (done)
1N/A {
1N/A POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
1N/A Tskmgr.tm_nb_workers--;
1N/A TASKMGR_UNLOCK();
1N/A continue;
1N/A }
1N/A
1N/A /*
1N/A ** if no task ready to run, wait for another one
1N/A */
1N/A
1N/A Tskmgr.tm_nb_idle++;
1N/A TASKMGR_COND_WAIT();
1N/A Tskmgr.tm_nb_idle--;
1N/A
1N/A /* look for a task */
1N/A GET_TASK_READY_TO_RUN();
1N/A
1N/A TASKMGR_UNLOCK();
1N/A }
1N/A return NULL;
1N/A}
1N/A
1N/A/*
1N/A** MI_LIST_ADD_CTX -- add new session to linked list
1N/A**
1N/A** Parameters:
1N/A** ctx -- context structure
1N/A**
1N/A** Returns:
1N/A** MI_FAILURE/MI_SUCCESS
1N/A*/
1N/A
1N/Astatic int
1N/Ami_list_add_ctx(ctx)
1N/A SMFICTX_PTR ctx;
1N/A{
1N/A SM_ASSERT(ctx != NULL);
1N/A SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
1N/A return MI_SUCCESS;
1N/A}
1N/A
1N/A/*
1N/A** MI_LIST_DEL_CTX -- remove session from linked list when finished
1N/A**
1N/A** Parameters:
1N/A** ctx -- context structure
1N/A**
1N/A** Returns:
1N/A** MI_FAILURE/MI_SUCCESS
1N/A*/
1N/A
1N/Astatic int
1N/Ami_list_del_ctx(ctx)
1N/A SMFICTX_PTR ctx;
1N/A{
1N/A SM_ASSERT(ctx != NULL);
1N/A if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
1N/A return MI_FAILURE;
1N/A
1N/A SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
1N/A return MI_SUCCESS;
1N/A}
1N/A#endif /* _FFR_WORKERS_POOL */