auth-worker-server.c revision 4426132a36ad4d5ba421f7bae9709f91fef37180
/* Copyright (C) 2005 Timo Sirainen */
#include "common.h"
#include "buffer.h"
#include "ioloop.h"
#include "network.h"
#include "istream.h"
#include "ostream.h"
#include "auth-request.h"
#include "auth-worker-client.h"
#include "auth-worker-server.h"
#include <stdlib.h>
#include <unistd.h>
#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240
#define AUTH_WORKER_MAX_IDLE_TIME (60*30)
struct auth_worker_request {
unsigned int id;
struct auth_request *auth_request;
auth_worker_callback_t *callback;
};
struct auth_worker_connection {
int fd;
struct io *io;
struct istream *input;
struct ostream *output;
unsigned int id_counter;
buffer_t *requests; /* struct auth_worker_request[] */
time_t last_used;
unsigned int request_count;
unsigned int requests_left;
};
static buffer_t *connections = NULL;
static unsigned int idle_count;
static unsigned int auth_workers_max;
static unsigned int auth_workers_max_request_count;
static char *worker_socket_path;
static struct timeout *to;
static void worker_input(struct auth_worker_connection *conn);
static struct auth_worker_connection *auth_worker_create(void)
{
struct auth_worker_connection *conn;
int fd, try;
if (connections->used / sizeof(conn) >= auth_workers_max)
return NULL;
for (try = 0;; try++) {
fd = net_connect_unix(worker_socket_path);
if (fd >= 0)
break;
if (errno == EAGAIN || errno == ECONNREFUSED) {
/* we're busy. */
} else if (errno == ENOENT) {
/* master didn't yet create it? */
} else {
i_fatal("net_connect_unix(%s) failed: %m",
worker_socket_path);
}
if (try == 5) {
i_fatal("net_connect_unix(%s) "
"failed after %d tries: %m",
worker_socket_path, try);
}
/* not created yet? try again */
sleep(1);
}
conn = i_new(struct auth_worker_connection, 1);
conn->fd = fd;
conn->input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH,
FALSE);
conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
conn->io = io_add(fd, IO_READ, worker_input, conn);
conn->requests = buffer_create_dynamic(default_pool, 128);
conn->requests_left = auth_workers_max_request_count;
idle_count++;
buffer_append(connections, &conn, sizeof(conn));
return conn;
}
static void auth_worker_destroy(struct auth_worker_connection *conn)
{
struct auth_worker_connection **connp;
struct auth_worker_request *request;
size_t i, size;
const char *reply;
connp = buffer_get_modifiable_data(connections, &size);
size /= sizeof(*connp);
for (i = 0; i < size; i++) {
if (connp[i] == conn) {
buffer_delete(connections, i * sizeof(*connp),
sizeof(*connp));
break;
}
}
if (conn->request_count == 0)
idle_count--;
/* abort all pending requests */
request = buffer_get_modifiable_data(conn->requests, &size);
size /= sizeof(*request);
reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE);
for (i = 0; i < size; i++) {
if (request[i].id != 0) {
request[i].callback(request[i].auth_request, reply);
auth_request_unref(&request[i].auth_request);
}
}
buffer_free(conn->requests);
io_remove(&conn->io);
i_stream_destroy(&conn->input);
o_stream_destroy(&conn->output);
if (close(conn->fd) < 0)
i_error("close(auth worker) failed: %m");
i_free(conn);
}
static struct auth_worker_request *
auth_worker_request_lookup(struct auth_worker_connection *conn,
unsigned int id)
{
struct auth_worker_request *request;
size_t i, size;
request = buffer_get_modifiable_data(conn->requests, &size);
size /= sizeof(*request);
for (i = 0; i < size; i++) {
if (request[i].id == id)
return &request[i];
}
return NULL;
}
static struct auth_worker_connection *auth_worker_find_free(void)
{
struct auth_worker_connection **conn, *best;
size_t i, size, outbuf_size, best_size;
conn = buffer_get_modifiable_data(connections, &size);
size /= sizeof(*conn);
if (idle_count > 0) {
/* there exists at least one idle connection, use it */
for (i = 0; i < size; i++) {
if (conn[i]->request_count == 0)
return conn[i];
}
i_unreached();
}
/* first the connection with least data in output buffer */
best = NULL;
best_size = (size_t)-1;
for (i = 0; i < size; i++) {
outbuf_size = o_stream_get_buffer_used_size(conn[i]->output);
if (outbuf_size < best_size && conn[i]->requests_left > 0) {
best = conn[i];
best_size = outbuf_size;
}
}
return best;
}
static void auth_worker_handle_request(struct auth_worker_connection *conn,
struct auth_worker_request *request,
const char *line)
{
request->callback(request->auth_request, line);
auth_request_unref(&request->auth_request);
/* mark the record empty so it can be used for future requests */
memset(request, 0, sizeof(*request));
/* update counters */
conn->request_count--;
if (conn->request_count == 0)
idle_count++;
}
static void worker_input(struct auth_worker_connection *conn)
{
struct auth_worker_request *request;
const char *line, *id_str;
unsigned int id;
switch (i_stream_read(conn->input)) {
case 0:
return;
case -1:
/* disconnected */
auth_worker_destroy(conn);
return;
case -2:
/* buffer full */
i_error("BUG: Auth worker sent us more than %d bytes",
(int)AUTH_WORKER_MAX_LINE_LENGTH);
auth_worker_destroy(conn);
return;
}
while ((line = i_stream_next_line(conn->input)) != NULL) {
id_str = line;
line = strchr(line, '\t');
if (line == NULL)
continue;
t_push();
id = (unsigned int)strtoul(t_strcut(id_str, '\t'), NULL, 10);
request = auth_worker_request_lookup(conn, id);
t_pop();
if (request != NULL)
auth_worker_handle_request(conn, request, line + 1);
}
if (conn->requests_left == 0) {
auth_worker_destroy(conn);
if (idle_count == 0)
auth_worker_create();
}
}
static struct auth_worker_request *
auth_worker_request_get(struct auth_worker_connection *conn)
{
struct auth_worker_request *request;
size_t i, size;
request = buffer_get_modifiable_data(conn->requests, &size);
size /= sizeof(*request);
for (i = 0; i < size; i++) {
if (request[i].id == 0)
return &request[i];
}
return buffer_append_space_unsafe(conn->requests, sizeof(*request));
}
void auth_worker_call(struct auth_request *auth_request, const char *data,
auth_worker_callback_t *callback)
{
struct auth_worker_connection *conn;
struct auth_worker_request *request;
const char *reply;
struct const_iovec iov[3];
conn = auth_worker_find_free();
if (conn == NULL) {
/* no connections currently. can happen if all have been
idle for last 10 minutes. create a new one. */
conn = auth_worker_create();
if (conn == NULL) {
auth_request_log_error(auth_request, "worker-server",
"Couldn't create new auth worker");
reply = t_strdup_printf("FAIL\t%d",
PASSDB_RESULT_INTERNAL_FAILURE);
callback(auth_request, reply);
return;
}
}
i_assert(conn->requests_left > 0);
iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter);
iov[0].iov_len = strlen(iov[0].iov_base);
iov[1].iov_base = data;
iov[1].iov_len = strlen(data);
iov[2].iov_base = "\n";
iov[2].iov_len = 1;
if (o_stream_get_buffer_used_size(conn->output) +
iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) {
auth_request_log_error(auth_request, "worker-server",
"All auth workers are busy");
reply = t_strdup_printf("FAIL\t%d",
PASSDB_RESULT_INTERNAL_FAILURE);
callback(auth_request, reply);
return;
}
/* find an empty request */
request = auth_worker_request_get(conn);
request->id = conn->id_counter;
request->auth_request = auth_request;
request->callback = callback;
auth_request_ref(auth_request);
o_stream_sendv(conn->output, iov, 3);
if (idle_count == 0) {
/* this request was queued, we need new workers */
auth_worker_create();
}
conn->last_used = ioloop_time;
conn->requests_left--;
if (conn->request_count++ == 0)
idle_count--;
}
static void auth_worker_timeout(void *context __attr_unused__)
{
struct auth_worker_connection **conn;
size_t i, size;
conn = buffer_get_modifiable_data(connections, &size);
size /= sizeof(*conn);
if (idle_count <= 1)
return;
/* remove connections which we haven't used for a long time.
this works because auth_worker_find_free() always returns the
first free connection. */
for (i = 0; i < size; i++) {
if (conn[i]->last_used +
AUTH_WORKER_MAX_IDLE_TIME < ioloop_time) {
/* remove just one. easier.. */
auth_worker_destroy(conn[i]);
break;
}
}
}
void auth_worker_server_init(void)
{
const char *env;
if (connections != NULL) {
/* already initialized */
return;
}
env = getenv("AUTH_WORKER_PATH");
if (env == NULL)
i_fatal("AUTH_WORKER_PATH environment not set");
worker_socket_path = i_strdup(env);
env = getenv("AUTH_WORKER_MAX_COUNT");
if (env == NULL)
i_fatal("AUTH_WORKER_MAX_COUNT environment not set");
auth_workers_max = atoi(env);
env = getenv("AUTH_WORKER_MAX_REQUEST_COUNT");
if (env == NULL)
i_fatal("AUTH_WORKER_MAX_REQUEST_COUNT environment not set");
auth_workers_max_request_count = atoi(env);
if (auth_workers_max_request_count == 0)
auth_workers_max_request_count = (unsigned int)-1;
connections = buffer_create_dynamic(default_pool,
sizeof(struct auth_worker_connection) * 16);
to = timeout_add(1000 * 60, auth_worker_timeout, NULL);
auth_worker_create();
}
void auth_worker_server_deinit(void)
{
struct auth_worker_connection **connp;
if (connections == NULL)
return;
while (connections->used > 0) {
connp = buffer_get_modifiable_data(connections, NULL);
auth_worker_destroy(*connp);
}
buffer_free(connections);
connections = NULL;
timeout_remove(&to);
i_free(worker_socket_path);
}