/*
* Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*
* Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
* Jose-Marcio.Martins@ensmp.fr
*/
#include "libmilter.h"
struct taskmgr_S
{
};
#define PIPE_SEND_SIGNAL() \
do \
{ \
char evt = 0x5a; \
"Error writing to event pipe: %s", \
sm_errstring(errno)); \
} while (0)
#ifndef USE_PIPE_WAKE_POLL
#endif /* USE_PIPE_WAKE_POLL */
/* poll check periodicity (default 10000 - 10 s) */
/* worker conditional wait timeout (default 10 s) */
/* functions */
static void *mi_pool_controller __P((void *));
/*
** periodicity of cleaning up old sessions (timedout)
** sessions list will be checked to find old inactive
** sessions each DT_CHECK_OLD_SESSIONS sec
*/
#ifndef OLD_SESSION_TIMEOUT
#endif /* OLD_SESSION_TIMEOUT */
/* session states - with respect to the pool of workers */
#ifndef MIN_WORKERS
#endif
/*
** Macros for threads and mutex management
*/
#define TASKMGR_LOCK() \
do \
{ \
} while (0)
#define TASKMGR_UNLOCK() \
do \
{ \
} while (0)
#define TASKMGR_COND_WAIT() \
#define TASKMGR_COND_SIGNAL() \
do \
{ \
} while (0)
do \
{ \
int r; \
\
sm_errstring(r)); \
} while (0)
#if POOL_DEBUG
do \
{ \
sm_dprintf x; \
} while (0)
#else /* POOL_DEBUG */
#endif /* POOL_DEBUG */
/*
** MI_START_SESSION -- Start a session in the pool of workers
**
** Parameters:
** ctx -- context structure
**
** Returns:
*/
int
{
static long id = 0;
/* this can happen if the milter is shutting down */
return MI_FAILURE;
TASKMGR_LOCK();
{
return MI_FAILURE;
}
/* if there is an idle worker, signal it, otherwise start new worker */
if (Tskmgr.tm_nb_idle > 0)
{
}
else
{
}
return MI_SUCCESS;
}
/*
** MI_CLOSE_SESSION -- Close a session and clean up data structures
**
** Parameters:
** ctx -- context structure
**
** Returns:
*/
static int
{
(void) mi_list_del_ctx(ctx);
return MI_SUCCESS;
}
/*
** NONBLOCKING -- set nonblocking mode for a file descriptor.
**
** Parameters:
** fd -- file descriptor
** name -- name for (error) logging
**
** Returns:
*/
static int
{
int r;
errno = 0;
if (r == -1)
{
return MI_FAILURE;
}
errno = 0;
if (r == -1)
{
return MI_FAILURE;
}
return MI_SUCCESS;
}
/*
** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
** Must be called before starting sessions.
**
** Parameters:
** none
**
** Returns:
*/
int
{
int r, i;
return MI_SUCCESS;
Tskmgr.tm_nb_workers = 0;
Tskmgr.tm_nb_idle = 0;
{
return MI_FAILURE;
}
if (r != MI_SUCCESS)
return r;
if (r != MI_SUCCESS)
return r;
/* Launch the pool controller */
{
sm_errstring(r));
return MI_FAILURE;
}
/* Create the pool of workers */
for (i = 0; i < MIN_WORKERS; i++)
{
{
sm_errstring(r));
return MI_FAILURE;
}
}
return MI_SUCCESS;
}
/*
** MI_POOL_CONTROLLER -- manage the pool of workers
** This thread must be running when listener begins
** starting sessions
**
** Parameters:
** arg -- unused
**
** Returns:
** NULL
**
** Control flow:
** for (;;)
** Look for timed out sessions
** Select sessions to wait for sendmail command
** Poll set of file descriptors
** if timeout
** continue
** For each file descriptor ready
** launch new thread if no worker available
** else
** signal waiting worker
*/
/* Poll structure array (pollfd) size step */
static void *
void *arg;
{
int dim_pfd = 0;
bool rebuild_set = true;
{
return NULL;
}
{
return NULL;
}
for (;;)
{
int nfd, r, i;
if (mi_stop() != MILTER_CONT)
break;
TASKMGR_LOCK();
/* check for timed out sessions? */
{
{
{
< now)
{
/* if session timed out, close it */
("Closing old connection: sd=%d id=%d",
}
}
}
}
if (rebuild_set)
{
/*
** Initialize poll set.
** Insert into the poll set the file descriptors of
** all sessions waiting for a command from sendmail.
*/
nfd = 0;
/* begin with worker pipe */
nfd++;
{
/*
** update ctx_wait - start of wait moment -
** for timeout
*/
/* add the session to the pollfd array? */
{
/*
** Resize the pollfd array if it
** isn't large enough.
*/
{
sizeof(*tpfd);
{
}
else
{
"Failed to realloc pollfd array:%s",
}
}
/* add the session to pollfd array */
{
nfd++;
}
}
}
rebuild_set = false;
}
/* Everything is ready, let's wait for an event */
/* timeout */
if (r == 0)
continue;
rebuild_set = true;
/* error */
if (r < 0)
{
continue;
pcnt++;
"%s() failed (%s), %s",
if (pcnt >= MAX_FAILS_S)
goto err;
continue;
}
pcnt = 0;
/* something happened */
for (i = 0; i < nfd; i++)
{
continue;
WAIT_FD(i)));
/* has a worker signaled an end of task? */
{
ssize_t r;
("PIPE WILL READ evt = %08X %08X",
r = 1;
&& r != -1)
{
}
("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
{
/* Exception handling */
}
continue;
}
/*
** Not the pipe for workers waking us,
** so must be something on an MTA connection.
*/
TASKMGR_LOCK();
{
continue;
("Checking context sd=%d - fd=%d ",
{
("TASK: found %d for fd[%d]=%d",
if (Tskmgr.tm_nb_idle > 0)
{
}
else
{
}
break;
}
}
("TASK %s FOUND - Checking PIPE for fd[%d]",
}
}
err:
Tskmgr.tm_signature = 0;
#if 0
/*
** Do not clean up ctx -- it can cause double-free()s.
** The program is shutting down anyway, so it's not worth the trouble.
** There is a more complex solution that prevents race conditions
** while accessing ctx, but that's maybe for a later version.
*/
for (;;)
{
break;
}
#endif
return NULL;
}
/*
** Look for a task ready to run.
** Value of ctx is NULL or a pointer to a task ready to run.
*/
#define GET_TASK_READY_TO_RUN() \
{ \
{ \
break; \
} \
}
/*
** MI_WORKER -- worker thread
** executes tasks distributed by the mi_pool_controller
** or by mi_start_session
**
** Parameters:
** arg -- pointer to context structure
**
** Returns:
** NULL pointer
*/
static void *
void *arg;
{
bool done;
int r;
done = false;
t_id = sthread_get_id();
if (pthread_detach(t_id) != 0)
{
return NULL;
}
TASKMGR_LOCK();
while (!done)
{
if (mi_stop() != MILTER_CONT)
break;
/* let's handle next task... */
{
int res;
("worker %d: new task -> let's handle it",
t_id));
TASKMGR_LOCK();
if (res != MI_CONTINUE)
{
/*
** Delete context from linked list of
** sessions and close session.
*/
}
else
{
("writing to event pipe..."));
/*
** Signal task controller to add new session
** to poll set.
*/
}
}
/* check if there is any task waiting to be served */
TASKMGR_LOCK();
/* Got a task? */
{
continue;
}
/*
** if not, let's check if there is enough idle workers
** if yes: quit
*/
done = true;
if (done)
{
continue;
}
/*
** if no task ready to run, wait for another one
*/
Tskmgr.tm_nb_idle++;
Tskmgr.tm_nb_idle--;
/* look for a task */
}
return NULL;
}
/*
** MI_LIST_ADD_CTX -- add new session to linked list
**
** Parameters:
** ctx -- context structure
**
** Returns:
*/
static int
{
return MI_SUCCESS;
}
/*
** MI_LIST_DEL_CTX -- remove session from linked list when finished
**
** Parameters:
** ctx -- context structure
**
** Returns:
*/
static int
{
if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
return MI_FAILURE;
return MI_SUCCESS;
}
#endif /* _FFR_WORKERS_POOL */