dsync-proxy-client.c revision 3fe67ec75ccae1230bb9eb9f16affc48377f6441
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include "lib.h"
49e358eebea107aad9919dcc4bd88cee8519ba2eTimo Sirainen#include "array.h"
49e358eebea107aad9919dcc4bd88cee8519ba2eTimo Sirainen#include "aqueue.h"
49e358eebea107aad9919dcc4bd88cee8519ba2eTimo Sirainen#include "fd-set-nonblock.h"
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen#include "istream.h"
dd62b77c932d1b518f2a3e4bf80e36542becc256Timo Sirainen#include "istream-dot.h"
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include "ostream.h"
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include "str.h"
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen#include "strescape.h"
c06f4017027263cf3a08becc551f5126409e2a83Timo Sirainen#include "master-service.h"
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include "imap-util.h"
411d6baa37f31d90730e90c4a28c43e1974bbe58Timo Sirainen#include "dsync-proxy.h"
7e1f68ad71d3485f1882142837b01f7a98ca8467Timo Sirainen#include "dsync-worker-private.h"
7e1f68ad71d3485f1882142837b01f7a98ca8467Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include <stdlib.h>
252db51b6c0a605163326b3ea5d09e9936ca3b29Timo Sirainen#include <unistd.h>
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#define OUTBUF_THROTTLE_SIZE (1024*64)
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainenenum proxy_client_request_type {
2526d52441ef368215ab6bf04fd0356d3b09d235Timo Sirainen PROXY_CLIENT_REQUEST_TYPE_COPY,
2526d52441ef368215ab6bf04fd0356d3b09d235Timo Sirainen PROXY_CLIENT_REQUEST_TYPE_GET,
fe363b433b8038a69b55169da9dca27892ad7d18Timo Sirainen PROXY_CLIENT_REQUEST_TYPE_FINISH
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen};
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen
fe363b433b8038a69b55169da9dca27892ad7d18Timo Sirainenstruct proxy_client_request {
10c96a244935de4add8213ba0b894178dfb889a5Timo Sirainen enum proxy_client_request_type type;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen union {
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen dsync_worker_msg_callback_t *get;
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen dsync_worker_copy_callback_t *copy;
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen dsync_worker_finish_callback_t *finish;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen } callback;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen void *context;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen};
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainen
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainenstruct proxy_client_dsync_worker_mailbox_iter {
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen struct dsync_worker_mailbox_iter iter;
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen pool_t pool;
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen};
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainenstruct proxy_client_dsync_worker_subs_iter {
5ac0b0bf32898c63da086ae169674ecac151a31eTimo Sirainen struct dsync_worker_subs_iter iter;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen pool_t pool;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen};
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainenstruct proxy_client_dsync_worker {
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen struct dsync_worker worker;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen int fd_in, fd_out;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen struct io *io;
07e4875d250e7a7157cd99132aafc773cf3cdf83Timo Sirainen struct istream *input;
07e4875d250e7a7157cd99132aafc773cf3cdf83Timo Sirainen struct ostream *output;
07e4875d250e7a7157cd99132aafc773cf3cdf83Timo Sirainen struct timeout *to;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen mailbox_guid_t selected_box_guid;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen struct istream *save_input;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen struct io *save_io;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen bool save_input_last_lf;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen pool_t msg_get_pool;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen struct dsync_msg_static_data msg_get_data;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen ARRAY_DEFINE(request_array, struct proxy_client_request);
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen struct aqueue *request_queue;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen unsigned int handshake_received:1;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen unsigned int finished:1;
602a0434db30d8e3292d1c161a803d96a879a74fTimo Sirainen};
602a0434db30d8e3292d1c161a803d96a879a74fTimo Sirainen
602a0434db30d8e3292d1c161a803d96a879a74fTimo Sirainenextern struct dsync_worker_vfuncs proxy_client_dsync_worker;
602a0434db30d8e3292d1c161a803d96a879a74fTimo Sirainen
602a0434db30d8e3292d1c161a803d96a879a74fTimo Sirainenstatic void proxy_client_worker_input(struct proxy_client_dsync_worker *worker);
07e4875d250e7a7157cd99132aafc773cf3cdf83Timo Sirainenstatic void proxy_client_send_stream(struct proxy_client_dsync_worker *worker);
07e4875d250e7a7157cd99132aafc773cf3cdf83Timo Sirainen
7d207b1e77a7b5e3fda640e353acfc86d261fedfTimo Sirainenstatic void proxy_client_fail(struct proxy_client_dsync_worker *worker)
7d207b1e77a7b5e3fda640e353acfc86d261fedfTimo Sirainen{
7d207b1e77a7b5e3fda640e353acfc86d261fedfTimo Sirainen i_stream_close(worker->input);
7d207b1e77a7b5e3fda640e353acfc86d261fedfTimo Sirainen dsync_worker_set_failure(&worker->worker);
7d207b1e77a7b5e3fda640e353acfc86d261fedfTimo Sirainen master_service_stop(master_service);
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen}
7e1f68ad71d3485f1882142837b01f7a98ca8467Timo Sirainen
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainenstatic int
7e1f68ad71d3485f1882142837b01f7a98ca8467Timo Sirainenproxy_client_worker_read_line(struct proxy_client_dsync_worker *worker,
89e195dfb5c4b0efd9b9f459771a4467674e5b1fTimo Sirainen const char **line_r)
08fb191d6148feb3ed14e2d6c625cd248dd1c1d4Timo Sirainen{
08fb191d6148feb3ed14e2d6c625cd248dd1c1d4Timo Sirainen if (worker->worker.failed)
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen return -1;
89e195dfb5c4b0efd9b9f459771a4467674e5b1fTimo Sirainen
137ea7ca34005345aa2304a940149b7f3774d727Timo Sirainen *line_r = i_stream_read_next_line(worker->input);
89e195dfb5c4b0efd9b9f459771a4467674e5b1fTimo Sirainen if (*line_r == NULL) {
7e1f68ad71d3485f1882142837b01f7a98ca8467Timo Sirainen if (worker->input->stream_errno != 0) {
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen errno = worker->input->stream_errno;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen i_error("read() from worker server failed: %m");
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainen dsync_worker_set_failure(&worker->worker);
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainen return -1;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen }
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen if (worker->input->eof) {
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainen if (!worker->finished)
c06f4017027263cf3a08becc551f5126409e2a83Timo Sirainen i_error("read() from worker server failed: EOF");
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen dsync_worker_set_failure(&worker->worker);
699fdc186f982f70d990820796eaa0f12133e27cTimo Sirainen return -1;
699fdc186f982f70d990820796eaa0f12133e27cTimo Sirainen }
699fdc186f982f70d990820796eaa0f12133e27cTimo Sirainen }
c06f4017027263cf3a08becc551f5126409e2a83Timo Sirainen if (*line_r == NULL)
c06f4017027263cf3a08becc551f5126409e2a83Timo Sirainen return 0;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
282a436a74d8835edb45cc019b1c916013013fd3Timo Sirainen if (!worker->handshake_received) {
282a436a74d8835edb45cc019b1c916013013fd3Timo Sirainen if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) {
282a436a74d8835edb45cc019b1c916013013fd3Timo Sirainen i_error("Invalid server handshake: %s", *line_r);
282a436a74d8835edb45cc019b1c916013013fd3Timo Sirainen dsync_worker_set_failure(&worker->worker);
282a436a74d8835edb45cc019b1c916013013fd3Timo Sirainen return -1;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen }
worker->handshake_received = TRUE;
return proxy_client_worker_read_line(worker, line_r);
}
return 1;
}
static void
proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
{
struct istream *input = worker->msg_get_data.input;
const unsigned char *data;
size_t size;
i_assert(worker->io == NULL);
worker->msg_get_data.input = NULL;
worker->io = io_add(worker->fd_in, IO_READ,
proxy_client_worker_input, worker);
/* we'll need to read the input until EOF or we'll start treating the
input as commands. make sure saving read everything. */
while ((i_stream_read_data(input, &data, &size, 0)) > 0)
i_stream_skip(input, size);
}
static bool
proxy_client_worker_next_copy(const struct proxy_client_request *request,
const char *line)
{
request->callback.copy(*line == '1', request->context);
return TRUE;
}
static bool
proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker,
const struct proxy_client_request *request,
const char *line)
{
enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED;
const char *p, *error;
uint32_t uid;
i_assert(worker->msg_get_data.input == NULL);
p_clear(worker->msg_get_pool);
switch (line[0]) {
case '1':
/* ok */
if (line[1] != '\t')
break;
line += 2;
if ((p = strchr(line, '\t')) == NULL)
break;
uid = strtoul(t_strcut(line, '\t'), NULL, 10);
line = p + 1;
if (dsync_proxy_msg_static_import(worker->msg_get_pool,
line, &worker->msg_get_data,
&error) < 0) {
i_error("Invalid msg-get static input: %s", error);
proxy_client_fail(worker);
return FALSE;
}
worker->msg_get_data.input =
i_stream_create_dot(worker->input, FALSE);
i_stream_set_destroy_callback(worker->msg_get_data.input,
proxy_client_worker_msg_get_done,
worker);
io_remove(&worker->io);
result = DSYNC_MSG_GET_RESULT_SUCCESS;
break;
case '0':
/* expunged */
result = DSYNC_MSG_GET_RESULT_EXPUNGED;
break;
default:
/* failure */
break;
}
request->callback.get(result, &worker->msg_get_data, request->context);
return worker->io != NULL;
}
static void
proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker,
const struct proxy_client_request *request,
const char *line)
{
bool success = TRUE;
if (strcmp(line, "changes") == 0)
worker->worker.unexpected_changes = TRUE;
else if (strcmp(line, "ok") != 0)
success = FALSE;
request->callback.finish(success, request->context);
}
static bool
proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker,
const char *line)
{
const struct proxy_client_request *requests;
struct proxy_client_request request;
bool ret = TRUE;
if (aqueue_count(worker->request_queue) == 0) {
i_error("Unexpected reply from server: %s", line);
proxy_client_fail(worker);
return FALSE;
}
requests = array_idx(&worker->request_array, 0);
request = requests[aqueue_idx(worker->request_queue, 0)];
aqueue_delete_tail(worker->request_queue);
switch (request.type) {
case PROXY_CLIENT_REQUEST_TYPE_COPY:
ret = proxy_client_worker_next_copy(&request, line);
break;
case PROXY_CLIENT_REQUEST_TYPE_GET:
ret = proxy_client_worker_next_msg_get(worker, &request, line);
break;
case PROXY_CLIENT_REQUEST_TYPE_FINISH:
worker->finished = TRUE;
proxy_client_worker_next_finish(worker, &request, line);
break;
}
return ret;
}
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker)
{
const char *line;
int ret;
timeout_reset(worker->to);
if (worker->worker.input_callback != NULL) {
worker->worker.input_callback(worker->worker.input_context);
return;
}
while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) {
if (!proxy_client_worker_next_reply(worker, line))
break;
}
if (ret < 0) {
/* try to continue */
proxy_client_worker_next_reply(worker, "");
}
}
static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
{
int ret;
timeout_reset(worker->to);
if ((ret = o_stream_flush(worker->output)) < 0)
return 1;
if (worker->save_input != NULL) {
/* proxy_client_worker_msg_save() hasn't finished yet. */
o_stream_cork(worker->output);
proxy_client_send_stream(worker);
if (worker->save_input != NULL)
return 1;
}
if (worker->worker.output_callback != NULL)
worker->worker.output_callback(worker->worker.output_context);
return ret;
}
static void proxy_client_worker_timeout(void *context ATTR_UNUSED)
{
i_error("proxy client timed out");
master_service_stop(master_service);
}
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
{
struct proxy_client_dsync_worker *worker;
worker = i_new(struct proxy_client_dsync_worker, 1);
worker->worker.v = proxy_client_dsync_worker;
worker->fd_in = fd_in;
worker->fd_out = fd_out;
worker->to = timeout_add(DSYNC_PROXY_TIMEOUT_MSECS,
proxy_client_worker_timeout, NULL);
worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n");
/* we'll keep the output corked until flush is needed */
o_stream_cork(worker->output);
o_stream_set_flush_callback(worker->output, proxy_client_worker_output,
worker);
fd_set_nonblock(fd_in, TRUE);
fd_set_nonblock(fd_out, TRUE);
worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
i_array_init(&worker->request_array, 64);
worker->request_queue = aqueue_init(&worker->request_array.arr);
return &worker->worker;
}
static void proxy_client_worker_deinit(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
timeout_remove(&worker->to);
if (worker->io != NULL)
io_remove(&worker->io);
i_stream_destroy(&worker->input);
o_stream_destroy(&worker->output);
if (close(worker->fd_in) < 0)
i_error("close(worker input) failed: %m");
if (worker->fd_in != worker->fd_out) {
if (close(worker->fd_out) < 0)
i_error("close(worker output) failed: %m");
}
aqueue_deinit(&worker->request_queue);
array_free(&worker->request_array);
pool_unref(&worker->msg_get_pool);
i_free(worker);
}
static bool
worker_is_output_stream_full(struct proxy_client_dsync_worker *worker)
{
return o_stream_get_buffer_used_size(worker->output) >=
OUTBUF_THROTTLE_SIZE;
}
static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
if (worker->save_io != NULL) {
/* we haven't finished sending a message save, so we're full. */
return TRUE;
}
return worker_is_output_stream_full(worker);
}
static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
if (o_stream_flush(worker->output) < 0)
return -1;
o_stream_uncork(worker->output);
if (o_stream_get_buffer_used_size(worker->output) > 0)
return 0;
o_stream_cork(worker->output);
return 1;
}
static struct dsync_worker_mailbox_iter *
proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_dsync_worker_mailbox_iter *iter;
iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1);
iter->iter.worker = _worker;
iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
o_stream_send_str(worker->output, "BOX-LIST\n");
proxy_client_worker_output_flush(_worker);
return &iter->iter;
}
static int
proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter,
struct dsync_mailbox *dsync_box_r)
{
struct proxy_client_dsync_worker_mailbox_iter *iter =
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_iter->worker;
const char *line, *error;
int ret;
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
if (ret < 0)
_iter->failed = TRUE;
return ret;
}
if (*line == '\t') {
/* end of mailboxes */
if (line[1] != '0')
_iter->failed = TRUE;
return -1;
}
p_clear(iter->pool);
if (dsync_proxy_mailbox_import(iter->pool, line,
dsync_box_r, &error) < 0) {
i_error("Invalid mailbox input from worker server: %s", error);
_iter->failed = TRUE;
return -1;
}
return 1;
}
static int
proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter)
{
struct proxy_client_dsync_worker_mailbox_iter *iter =
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
int ret = _iter->failed ? -1 : 0;
pool_unref(&iter->pool);
i_free(iter);
return ret;
}
static struct dsync_worker_subs_iter *
proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_dsync_worker_subs_iter *iter;
iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
iter->iter.worker = _worker;
iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
o_stream_send_str(worker->output, "SUBS-LIST\n");
proxy_client_worker_output_flush(_worker);
return &iter->iter;
}
static int
proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
unsigned int wanted_arg_count,
char ***args_r)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)iter->iter.worker;
const char *line;
char **args;
int ret;
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
if (ret < 0)
iter->iter.failed = TRUE;
return ret;
}
if (*line == '\t') {
/* end of subscribed subscriptions */
if (line[1] != '0')
iter->iter.failed = TRUE;
return -1;
}
p_clear(iter->pool);
args = p_strsplit(iter->pool, line, "\t");
if (str_array_length((const char *const *)args) < wanted_arg_count) {
i_error("Invalid subscription input from worker server");
iter->iter.failed = TRUE;
return -1;
}
*args_r = args;
return 1;
}
static int
proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
struct dsync_worker_subscription *rec_r)
{
struct proxy_client_dsync_worker_subs_iter *iter =
(struct proxy_client_dsync_worker_subs_iter *)_iter;
char **args;
int ret;
ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args);
if (ret <= 0)
return ret;
rec_r->vname = str_tabunescape(args[0]);
rec_r->storage_name = str_tabunescape(args[1]);
rec_r->ns_prefix = str_tabunescape(args[2]);
rec_r->last_change = strtoul(args[3], NULL, 10);
return 1;
}
static int
proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
struct dsync_worker_unsubscription *rec_r)
{
struct proxy_client_dsync_worker_subs_iter *iter =
(struct proxy_client_dsync_worker_subs_iter *)_iter;
char **args;
int ret;
ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args);
if (ret <= 0)
return ret;
memset(rec_r, 0, sizeof(*rec_r));
if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) {
i_error("Invalid subscription input from worker server: "
"Invalid unsubscription mailbox GUID");
iter->iter.failed = TRUE;
return -1;
}
rec_r->ns_prefix = str_tabunescape(args[1]);
rec_r->last_change = strtoul(args[2], NULL, 10);
return 1;
}
static int
proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
{
struct proxy_client_dsync_worker_subs_iter *iter =
(struct proxy_client_dsync_worker_subs_iter *)_iter;
int ret = _iter->failed ? -1 : 0;
pool_unref(&iter->pool);
i_free(iter);
return ret;
}
static void
proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
const char *name, time_t last_change,
bool set)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "SUBS-SET\t");
str_tabescape_write(str, name);
str_printfa(str, "\t%s\t%d\n", dec2str(last_change),
set ? 1 : 0);
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
struct proxy_client_dsync_worker_msg_iter {
struct dsync_worker_msg_iter iter;
pool_t pool;
bool done;
};
static struct dsync_worker_msg_iter *
proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
const mailbox_guid_t mailboxes[],
unsigned int mailbox_count)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_dsync_worker_msg_iter *iter;
string_t *str;
unsigned int i;
iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1);
iter->iter.worker = _worker;
iter->pool = pool_alloconly_create("proxy message iter", 10240);
str = str_new(iter->pool, 512);
str_append(str, "MSG-LIST");
for (i = 0; i < mailbox_count; i++) T_BEGIN {
str_append_c(str, '\t');
dsync_proxy_mailbox_guid_export(str, &mailboxes[i]);
} T_END;
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
p_clear(iter->pool);
proxy_client_worker_output_flush(_worker);
return &iter->iter;
}
static int
proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter,
unsigned int *mailbox_idx_r,
struct dsync_message *msg_r)
{
struct proxy_client_dsync_worker_msg_iter *iter =
(struct proxy_client_dsync_worker_msg_iter *)_iter;
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_iter->worker;
const char *line, *error;
int ret;
if (iter->done)
return -1;
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
if (ret < 0)
_iter->failed = TRUE;
return ret;
}
if (*line == '\t') {
/* end of messages */
if (line[1] != '0')
_iter->failed = TRUE;
iter->done = TRUE;
return -1;
}
*mailbox_idx_r = 0;
while (*line >= '0' && *line <= '9') {
*mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0');
line++;
}
if (*line != '\t') {
i_error("Invalid mailbox idx from worker server");
_iter->failed = TRUE;
return -1;
}
line++;
p_clear(iter->pool);
if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) {
i_error("Invalid message input from worker server: %s", error);
_iter->failed = TRUE;
return -1;
}
return 1;
}
static int
proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
{
struct proxy_client_dsync_worker_msg_iter *iter =
(struct proxy_client_dsync_worker_msg_iter *)_iter;
int ret = _iter->failed ? -1 : 0;
pool_unref(&iter->pool);
i_free(iter);
return ret;
}
static void
proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
const struct dsync_mailbox *dsync_box)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "BOX-CREATE\t");
dsync_proxy_mailbox_export(str, dsync_box);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
const struct dsync_mailbox *dsync_box)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "BOX-DELETE\t");
dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
str_printfa(str, "\t%s\n", dec2str(dsync_box->last_changed));
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
const mailbox_guid_t *mailbox,
const struct dsync_mailbox *dsync_box)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
char sep[2];
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "BOX-RENAME\t");
dsync_proxy_mailbox_guid_export(str, mailbox);
str_append_c(str, '\t');
str_tabescape_write(str, dsync_box->name);
str_append_c(str, '\t');
sep[0] = dsync_box->name_sep; sep[1] = '\0';
str_tabescape_write(str, sep);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
const struct dsync_mailbox *dsync_box)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "BOX-UPDATE\t");
dsync_proxy_mailbox_export(str, dsync_box);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
const mailbox_guid_t *mailbox,
const ARRAY_TYPE(const_string) *cache_fields)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
return;
worker->selected_box_guid = *mailbox;
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "BOX-SELECT\t");
dsync_proxy_mailbox_guid_export(str, mailbox);
if (cache_fields != NULL)
dsync_proxy_strings_export(str, cache_fields);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
const struct dsync_message *msg)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid,
(unsigned long long)msg->modseq);
imap_write_flags(str, msg->flags, msg->keywords);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
}
static void
proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
uint32_t old_uid, uint32_t new_uid)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
o_stream_send_str(worker->output,
t_strdup_printf("MSG-UID-CHANGE\t%u\t%u\n",
old_uid, new_uid));
} T_END;
}
static void
proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
o_stream_send_str(worker->output,
t_strdup_printf("MSG-EXPUNGE\t%u\n", uid));
} T_END;
}
static void
proxy_client_worker_msg_copy(struct dsync_worker *_worker,
const mailbox_guid_t *src_mailbox,
uint32_t src_uid,
const struct dsync_message *dest_msg,
dsync_worker_copy_callback_t *callback,
void *context)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_request request;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "MSG-COPY\t");
dsync_proxy_mailbox_guid_export(str, src_mailbox);
str_printfa(str, "\t%u\t", src_uid);
dsync_proxy_msg_export(str, dest_msg);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
memset(&request, 0, sizeof(request));
request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
request.callback.copy = callback;
request.context = context;
aqueue_append(worker->request_queue, &request);
}
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
{
const unsigned char *data;
size_t size;
int ret;
while ((ret = i_stream_read_data(worker->save_input,
&data, &size, 0)) > 0) {
dsync_proxy_send_dot_output(worker->output,
&worker->save_input_last_lf,
data, size);
i_stream_skip(worker->save_input, size);
if (worker_is_output_stream_full(worker)) {
o_stream_uncork(worker->output);
if (worker_is_output_stream_full(worker))
return;
o_stream_cork(worker->output);
}
}
if (ret == 0) {
/* waiting for more input */
o_stream_uncork(worker->output);
if (worker->save_io == NULL) {
int fd = i_stream_get_fd(worker->save_input);
worker->save_io =
io_add(fd, IO_READ,
proxy_client_send_stream, worker);
}
return;
}
if (worker->save_io != NULL)
io_remove(&worker->save_io);
if (worker->save_input->stream_errno != 0) {
errno = worker->save_input->stream_errno;
i_error("proxy: reading message input failed: %m");
o_stream_close(worker->output);
} else {
i_assert(!i_stream_have_bytes_left(worker->save_input));
o_stream_send(worker->output, "\n.\n", 3);
}
i_stream_unref(&worker->save_input);
}
static void
proxy_client_worker_msg_save(struct dsync_worker *_worker,
const struct dsync_message *msg,
const struct dsync_msg_static_data *data)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "MSG-SAVE\t");
dsync_proxy_msg_static_export(str, data);
str_append_c(str, '\t');
dsync_proxy_msg_export(str, msg);
str_append_c(str, '\n');
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
i_assert(worker->save_io == NULL);
worker->save_input = data->input;
worker->save_input_last_lf = TRUE;
i_stream_ref(worker->save_input);
proxy_client_send_stream(worker);
}
static void
proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
if (worker->save_io != NULL)
io_remove(&worker->save_io);
if (worker->save_input != NULL)
i_stream_unref(&worker->save_input);
}
static void
proxy_client_worker_msg_get(struct dsync_worker *_worker,
const mailbox_guid_t *mailbox, uint32_t uid,
dsync_worker_msg_callback_t *callback,
void *context)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_request request;
i_assert(worker->save_input == NULL);
T_BEGIN {
string_t *str = t_str_new(128);
str_append(str, "MSG-GET\t");
dsync_proxy_mailbox_guid_export(str, mailbox);
str_printfa(str, "\t%u\n", uid);
o_stream_send(worker->output, str_data(str), str_len(str));
} T_END;
memset(&request, 0, sizeof(request));
request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
request.callback.get = callback;
request.context = context;
aqueue_append(worker->request_queue, &request);
}
static void
proxy_client_worker_finish(struct dsync_worker *_worker,
dsync_worker_finish_callback_t *callback,
void *context)
{
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
struct proxy_client_request request;
i_assert(worker->save_input == NULL);
o_stream_send_str(worker->output, "FINISH\n");
o_stream_uncork(worker->output);
memset(&request, 0, sizeof(request));
request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH;
request.callback.finish = callback;
request.context = context;
aqueue_append(worker->request_queue, &request);
}
struct dsync_worker_vfuncs proxy_client_dsync_worker = {
proxy_client_worker_deinit,
proxy_client_worker_is_output_full,
proxy_client_worker_output_flush,
proxy_client_worker_mailbox_iter_init,
proxy_client_worker_mailbox_iter_next,
proxy_client_worker_mailbox_iter_deinit,
proxy_client_worker_subs_iter_init,
proxy_client_worker_subs_iter_next,
proxy_client_worker_subs_iter_next_un,
proxy_client_worker_subs_iter_deinit,
proxy_client_worker_set_subscribed,
proxy_client_worker_msg_iter_init,
proxy_client_worker_msg_iter_next,
proxy_client_worker_msg_iter_deinit,
proxy_client_worker_create_mailbox,
proxy_client_worker_delete_mailbox,
proxy_client_worker_rename_mailbox,
proxy_client_worker_update_mailbox,
proxy_client_worker_select_mailbox,
proxy_client_worker_msg_update_metadata,
proxy_client_worker_msg_update_uid,
proxy_client_worker_msg_expunge,
proxy_client_worker_msg_copy,
proxy_client_worker_msg_save,
proxy_client_worker_msg_save_cancel,
proxy_client_worker_msg_get,
proxy_client_worker_finish
};