program-client.c revision 004be038dfe290f71e3d4a4b14d88673e8b55fb2
0N/A/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file
0N/A */
0N/A
0N/A#include "lib.h"
0N/A#include "ioloop.h"
0N/A#include "array.h"
0N/A#include "str.h"
0N/A#include "safe-mkstemp.h"
0N/A#include "istream-private.h"
0N/A#include "istream-seekable.h"
0N/A#include "ostream.h"
0N/A
0N/A#include "program-client-private.h"
0N/A
0N/A#include <unistd.h>
0N/A
0N/A#define MAX_OUTPUT_BUFFER_SIZE 16384
0N/A#define MAX_OUTPUT_MEMORY_BUFFER (1024*128)
0N/A
0N/Astatic
0N/Avoid program_client_callback(struct program_client *pclient, int result, void *context)
0N/A{
0N/A program_client_callback_t *callback = pclient->callback;
0N/A i_assert(pclient->callback != NULL);
0N/A pclient->callback = NULL;
0N/A callback(result, context);
0N/A}
0N/A
0N/Astatic
0N/Aint program_client_seekable_fd_callback(const char **path_r, void *context)
0N/A{
0N/A struct program_client *pclient = (struct program_client *)context;
0N/A string_t *path;
0N/A int fd;
0N/A
0N/A path = t_str_new(128);
0N/A str_append(path, pclient->temp_prefix);
0N/A fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
0N/A if (fd == -1) {
0N/A i_error("safe_mkstemp(%s) failed: %m", str_c(path));
0N/A return -1;
0N/A }
0N/A
0N/A /* we just want the fd, unlink it */
0N/A if (i_unlink(str_c(path)) < 0) {
0N/A /* shouldn't happen.. */
0N/A i_close_fd(&fd);
0N/A return -1;
0N/A }
0N/A
0N/A *path_r = str_c(path);
0N/A return fd;
0N/A}
0N/A
0N/Astatic
0N/Avoid program_client_timeout(struct program_client *pclient)
0N/A{
0N/A i_error("program `%s' execution timed out (> %d secs)",
0N/A pclient->path, pclient->set.input_idle_timeout_secs);
0N/A program_client_fail(pclient, PROGRAM_CLIENT_ERROR_RUN_TIMEOUT);
0N/A}
0N/A
0N/Astatic
0N/Avoid program_client_connect_timeout(struct program_client *pclient)
0N/A{
0N/A i_error("program `%s' socket connection timed out (> %d msecs)",
0N/A pclient->path, pclient->set.client_connect_timeout_msecs);
0N/A program_client_fail(pclient, PROGRAM_CLIENT_ERROR_CONNECT_TIMEOUT);
0N/A}
0N/A
0N/Astatic
0N/Aint program_client_connect(struct program_client *pclient)
0N/A{
0N/A int ret;
0N/A
0N/A if (pclient->set.client_connect_timeout_msecs != 0) {
0N/A pclient->to = timeout_add(pclient->set.client_connect_timeout_msecs,
0N/A program_client_connect_timeout, pclient);
0N/A }
0N/A
0N/A if ((ret = pclient->connect(pclient)) < 0) {
0N/A program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
0N/A return -1;
0N/A }
0N/A return ret;
0N/A}
0N/A
0N/Astatic
0N/Aint program_client_close_output(struct program_client *pclient)
0N/A{
0N/A int ret;
0N/A
0N/A if (pclient->program_output != NULL)
0N/A o_stream_destroy(&pclient->program_output);
0N/A if ((ret = pclient->close_output(pclient)) < 0)
0N/A return -1;
0N/A pclient->program_output = NULL;
0N/A
0N/A return ret;
0N/A}
0N/A
0N/Astatic
0N/Avoid program_client_disconnect_extra_fds(struct program_client *pclient)
0N/A{
0N/A struct program_client_extra_fd *efds;
0N/A unsigned int i, count;
0N/A
0N/A if (!array_is_created(&pclient->extra_fds))
0N/A return;
0N/A
0N/A efds = array_get_modifiable(&pclient->extra_fds, &count);
0N/A for(i = 0; i < count; i++) {
0N/A if (efds[i].input != NULL)
0N/A i_stream_unref(&efds[i].input);
0N/A if (efds[i].io != NULL)
0N/A io_remove(&efds[i].io);
0N/A if (efds[i].parent_fd != -1 && close(efds[i].parent_fd) < 0)
0N/A i_error("close(fd=%d) failed: %m", efds[i].parent_fd);
0N/A }
0N/A}
0N/A
0N/Astatic
0N/Avoid program_client_disconnect(struct program_client *pclient, bool force)
0N/A{
0N/A bool error = FALSE;
0N/A int ret;
0N/A
0N/A if (pclient->disconnected)
0N/A return;
0N/A
0N/A if ((ret = program_client_close_output(pclient)) < 0)
0N/A error = TRUE;
0N/A
0N/A program_client_disconnect_extra_fds(pclient);
0N/A if ((ret = pclient->disconnect(pclient, force)) < 0)
0N/A error = TRUE;
0N/A
0N/A if (pclient->program_input != NULL) {
0N/A if (pclient->output_seekable)
0N/A i_stream_unref(&pclient->program_input);
0N/A else
0N/A i_stream_destroy(&pclient->program_input);
0N/A }
0N/A if (pclient->program_output != NULL)
0N/A o_stream_destroy(&pclient->program_output);
0N/A
0N/A if (pclient->to != NULL)
0N/A timeout_remove(&pclient->to);
0N/A if (pclient->io != NULL)
0N/A io_remove(&pclient->io);
0N/A
0N/A if (pclient->fd_in != -1 && close(pclient->fd_in) < 0)
0N/A i_error("close(%s) failed: %m", pclient->path);
0N/A if (pclient->fd_out != -1 && pclient->fd_out != pclient->fd_in
0N/A && close(pclient->fd_out) < 0)
0N/A i_error("close(%s/out) failed: %m", pclient->path);
0N/A pclient->fd_in = pclient->fd_out = -1;
0N/A
0N/A pclient->disconnected = TRUE;
0N/A if (error && pclient->error == PROGRAM_CLIENT_ERROR_NONE) {
0N/A pclient->error = PROGRAM_CLIENT_ERROR_OTHER;
0N/A }
0N/A
0N/A program_client_callback(pclient,
0N/A pclient->error != PROGRAM_CLIENT_ERROR_NONE ?
0N/A -1 :
0N/A pclient->exit_code,
0N/A pclient->context);
0N/A}
0N/A
0N/Avoid program_client_fail(struct program_client *pclient, enum program_client_error error)
0N/A{
0N/A if (pclient->error != PROGRAM_CLIENT_ERROR_NONE)
0N/A return;
0N/A
0N/A pclient->error = error;
0N/A program_client_disconnect(pclient, TRUE);
0N/A}
0N/A
static
bool program_client_input_pending(struct program_client *pclient)
{
struct program_client_extra_fd *efds = NULL;
unsigned int count, i;
if (pclient->program_input != NULL &&
!pclient->program_input->closed &&
!i_stream_is_eof(pclient->program_input)) {
return TRUE;
}
if (array_is_created(&pclient->extra_fds)) {
efds = array_get_modifiable(&pclient->extra_fds, &count);
for(i = 0; i < count; i++) {
if (efds[i].input != NULL &&
!efds[i].input->closed &&
!i_stream_is_eof(efds[i].input)) {
return TRUE;
}
}
}
return FALSE;
}
static
int program_client_program_output(struct program_client *pclient)
{
struct istream *input = pclient->input;
struct ostream *output = pclient->program_output;
enum ostream_send_istream_result res;
int ret = 0;
if ((ret = o_stream_flush(output)) <= 0) {
if (ret < 0) {
i_error("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
}
return ret;
}
if (input != NULL && output != NULL) {
res = o_stream_send_istream(output, input);
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
i_stream_unref(&pclient->input);
input = NULL;
break;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
return 1;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return 0;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
i_error("read(%s) failed: %s",
i_stream_get_name(input),
i_stream_get_error(input));
program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
return -1;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
i_error("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
return -1;
}
}
if (input == NULL) {
if (!program_client_input_pending(pclient)) {
program_client_disconnect(pclient, FALSE);
} else if (program_client_close_output(pclient) < 0) {
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_OTHER);
}
}
return 1;
}
static
void program_client_program_input(struct program_client *pclient)
{
struct istream *input = pclient->program_input;
struct ostream *output = pclient->output;
enum ostream_send_istream_result res;
const unsigned char *data;
size_t size;
int ret = 0;
if (pclient->output_seekable && pclient->seekable_output == NULL) {
struct istream *input_list[2] = { input, NULL };
input = i_stream_create_seekable(input_list, MAX_OUTPUT_MEMORY_BUFFER,
program_client_seekable_fd_callback,
pclient);
i_stream_unref(&pclient->program_input);
pclient->program_input = input;
pclient->seekable_output = input;
i_stream_ref(pclient->seekable_output);
}
if (input != NULL) {
if (output != NULL) {
res = o_stream_send_istream(output, input);
switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
break;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
return;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
i_error("read(%s) failed: %s",
i_stream_get_name(input),
i_stream_get_error(input));
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_IO);
return;
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
i_error("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_IO);
return;
}
} else {
while ((ret =
i_stream_read_more(input, &data, &size)) > 0 ||
ret == -2) {
i_stream_skip(input, size);
}
if (ret == 0)
return;
if (ret < 0) {
if (input->stream_errno != 0) {
i_error("read(%s) failed: %s",
i_stream_get_name(input),
i_stream_get_error(input));
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_IO);
return;
}
}
}
if (!program_client_input_pending(pclient))
program_client_disconnect(pclient, FALSE);
} else
program_client_disconnect(pclient, FALSE);
}
static
void program_client_extra_fd_input(struct program_client_extra_fd *efd)
{
struct program_client *pclient = efd->pclient;
i_assert(efd->callback != NULL);
efd->callback(efd->context, efd->input);
if (efd->input->closed || i_stream_is_eof(efd->input)) {
if (!program_client_input_pending(pclient))
program_client_disconnect(pclient, FALSE);
}
}
int program_client_connected(struct program_client *pclient)
{
int ret = 1;
pclient->start_time = ioloop_time;
if (pclient->to != NULL)
timeout_remove(&pclient->to);
if (pclient->set.input_idle_timeout_secs != 0) {
pclient->to =
timeout_add(pclient->set.input_idle_timeout_secs *
1000, program_client_timeout, pclient);
}
/* run output */
if (pclient->program_output != NULL &&
(ret = program_client_program_output(pclient)) == 0) {
if (pclient->program_output != NULL) {
o_stream_set_flush_callback(pclient->program_output,
program_client_program_output, pclient);
}
}
return ret;
}
void program_client_init(struct program_client *pclient, pool_t pool, const char *path,
const char *const *args, const struct program_client_settings *set)
{
pclient->pool = pool;
pclient->path = p_strdup(pool, path);
if (args != NULL)
pclient->args = p_strarray_dup(pool, args);
pclient->set = *set;
pclient->debug = set->debug;
pclient->fd_in = -1;
pclient->fd_out = -1;
}
void program_client_set_input(struct program_client *pclient, struct istream *input)
{
if (pclient->input != NULL)
i_stream_unref(&pclient->input);
if (input != NULL)
i_stream_ref(input);
pclient->input = input;
}
void program_client_set_output(struct program_client *pclient, struct ostream *output)
{
if (pclient->output != NULL)
o_stream_unref(&pclient->output);
if (output != NULL)
o_stream_ref(output);
pclient->output = output;
pclient->output_seekable = FALSE;
i_free(pclient->temp_prefix);
}
void program_client_set_output_seekable(struct program_client *pclient, const char *temp_prefix)
{
if (pclient->output != NULL)
o_stream_unref(&pclient->output);
pclient->temp_prefix = i_strdup(temp_prefix);
pclient->output_seekable = TRUE;
}
struct istream *program_client_get_output_seekable(struct program_client *pclient)
{
struct istream *input = pclient->seekable_output;
pclient->seekable_output = NULL;
i_stream_seek(input, 0);
return input;
}
#undef program_client_set_extra_fd
void program_client_set_extra_fd(struct program_client *pclient, int fd,
program_client_fd_callback_t *callback, void *context)
{
struct program_client_extra_fd *efds;
struct program_client_extra_fd *efd = NULL;
unsigned int i, count;
i_assert(fd > 1);
if (!array_is_created(&pclient->extra_fds))
p_array_init(&pclient->extra_fds, pclient->pool, 2);
efds = array_get_modifiable(&pclient->extra_fds, &count);
for(i = 0; i < count; i++) {
if (efds[i].child_fd == fd) {
efd = &efds[i];
break;
}
}
if (efd == NULL) {
efd = array_append_space(&pclient->extra_fds);
efd->pclient = pclient;
efd->child_fd = fd;
efd->parent_fd = -1;
}
efd->callback = callback;
efd->context = context;
}
void program_client_set_env(struct program_client *pclient, const char *name, const char *value)
{
const char *env;
if (!array_is_created(&pclient->envs))
p_array_init(&pclient->envs, pclient->pool, 16);
env = p_strdup_printf(pclient->pool, "%s=%s", name, value);
array_append(&pclient->envs, &env, 1);
}
void program_client_init_streams(struct program_client *pclient)
{
/* Create streams for normal program I/O */
if (pclient->fd_out >= 0) {
pclient->program_output =
o_stream_create_fd(pclient->fd_out,
MAX_OUTPUT_BUFFER_SIZE);
o_stream_set_name(pclient->program_output, "program stdin");
}
if (pclient->fd_in >= 0) {
struct istream *input;
input = i_stream_create_fd(pclient->fd_in, (size_t)-1);
pclient->program_input = input;
i_stream_set_name(pclient->program_input, "program stdout");
pclient->io = io_add(pclient->fd_in, IO_READ,
program_client_program_input, pclient);
}
/* Create streams for additional output through side-channel fds */
if (array_is_created(&pclient->extra_fds)) {
struct program_client_extra_fd *efds = NULL;
unsigned int count, i;
efds = array_get_modifiable(&pclient->extra_fds, &count);
for(i = 0; i < count; i++) {
i_assert(efds[i].parent_fd >= 0);
efds[i].input = i_stream_create_fd
(efds[i].parent_fd, (size_t)-1);
i_stream_set_name(efds[i].input,
t_strdup_printf("program output fd=%d",
efds[i].child_fd));
efds[i].io = io_add(efds[i].parent_fd, IO_READ,
program_client_extra_fd_input, &efds[i]);
}
}
}
void program_client_destroy(struct program_client **_pclient)
{
struct program_client *pclient = *_pclient;
program_client_disconnect(pclient, TRUE);
if (pclient->input != NULL)
i_stream_unref(&pclient->input);
if (pclient->program_input != NULL)
i_stream_unref(&pclient->program_input);
if (pclient->program_output != NULL)
o_stream_unref(&pclient->program_output);
if (pclient->output != NULL)
o_stream_unref(&pclient->output);
if (pclient->seekable_output != NULL)
i_stream_unref(&pclient->seekable_output);
if (pclient->io != NULL)
io_remove(&pclient->io);
i_free(pclient->temp_prefix);
pool_unref(&pclient->pool);
*_pclient = NULL;
}
void program_client_switch_ioloop(struct program_client *pclient)
{
if (pclient->input != NULL)
i_stream_switch_ioloop(pclient->input);
if (pclient->program_input != NULL)
i_stream_switch_ioloop(pclient->program_input);
if (pclient->seekable_output != NULL)
i_stream_switch_ioloop(pclient->seekable_output);
if (pclient->output != NULL)
o_stream_switch_ioloop(pclient->output);
if (pclient->program_output != NULL)
o_stream_switch_ioloop(pclient->program_output);
if (pclient->to != NULL)
pclient->to = io_loop_move_timeout(&pclient->to);
if (pclient->io != NULL)
pclient->io = io_loop_move_io(&pclient->io);
}
static
void program_client_run_callback(int result, int *context)
{
*context = result;
io_loop_stop(current_ioloop);
}
int program_client_run(struct program_client *pclient)
{
int ret = -2;
struct ioloop *prev_ioloop = current_ioloop;
struct ioloop *ioloop = io_loop_create();
program_client_switch_ioloop(pclient);
program_client_run_async(pclient, program_client_run_callback, &ret);
if (ret == -2) {
io_loop_run(ioloop);
}
io_loop_set_current(prev_ioloop);
program_client_switch_ioloop(pclient);
io_loop_set_current(ioloop);
io_loop_destroy(&ioloop);
if (pclient->error != PROGRAM_CLIENT_ERROR_NONE)
return -1;
return pclient->exit_code;
}
#undef program_client_run_async
void program_client_run_async(struct program_client *pclient, program_client_callback_t *callback, void *context)
{
int ret;
i_assert(callback != NULL);
pclient->disconnected = FALSE;
pclient->exit_code = 1;
pclient->error = PROGRAM_CLIENT_ERROR_NONE;
pclient->callback = callback;
pclient->context = context;
if ((ret = program_client_connect(pclient)) >= 0) {
/* run output */
if (ret > 0 && pclient->program_output != NULL &&
(ret = o_stream_flush(pclient->program_output)) == 0) {
o_stream_set_flush_callback
(pclient->program_output,
program_client_program_output, pclient);
}
if (ret < 0) {
i_error("write(%s) failed: %s",
o_stream_get_name(pclient->program_output),
o_stream_get_error(pclient->program_output));
pclient->error = PROGRAM_CLIENT_ERROR_IO;
program_client_callback(pclient, ret, context);
return;
}
} else {
program_client_callback(pclient, ret, context);
}
}