dsync-proxy-server-cmd.c revision 3fe67ec75ccae1230bb9eb9f16affc48377f6441
/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "str.h"
#include "strescape.h"
#include "network.h"
#include "istream.h"
#include "istream-dot.h"
#include "ostream.h"
#include "imap-util.h"
#include "master-service.h"
#include "dsync-worker.h"
#include "dsync-proxy.h"
#include "dsync-proxy-server.h"
#include <stdlib.h>
#define OUTBUF_THROTTLE_SIZE (1024*64)
static bool
proxy_server_is_output_full(struct dsync_proxy_server *server)
{
return o_stream_get_buffer_used_size(server->output) >=
OUTBUF_THROTTLE_SIZE;
}
static int
cmd_box_list(struct dsync_proxy_server *server,
const char *const *args ATTR_UNUSED)
{
struct dsync_mailbox dsync_box;
string_t *str;
int ret;
if (server->mailbox_iter == NULL) {
server->mailbox_iter =
dsync_worker_mailbox_iter_init(server->worker);
}
str = t_str_new(256);
while ((ret = dsync_worker_mailbox_iter_next(server->mailbox_iter,
&dsync_box)) > 0) {
str_truncate(str, 0);
dsync_proxy_mailbox_export(str, &dsync_box);
str_append_c(str, '\n');
o_stream_send(server->output, str_data(str), str_len(str));
if (proxy_server_is_output_full(server))
break;
}
if (ret >= 0) {
/* continue later */
o_stream_set_flush_pending(server->output, TRUE);
return 0;
}
if (dsync_worker_mailbox_iter_deinit(&server->mailbox_iter) < 0) {
o_stream_send(server->output, "\t-1\n", 4);
return -1;
} else {
o_stream_send(server->output, "\t0\n", 3);
return 1;
}
}
static bool cmd_subs_list_subscriptions(struct dsync_proxy_server *server)
{
struct dsync_worker_subscription rec;
string_t *str;
int ret;
str = t_str_new(256);
while ((ret = dsync_worker_subs_iter_next(server->subs_iter,
&rec)) > 0) {
str_truncate(str, 0);
str_tabescape_write(str, rec.vname);
str_append_c(str, '\t');
str_tabescape_write(str, rec.storage_name);
str_append_c(str, '\t');
str_tabescape_write(str, rec.ns_prefix);
str_printfa(str, "\t%ld\n", (long)rec.last_change);
o_stream_send(server->output, str_data(str), str_len(str));
if (proxy_server_is_output_full(server))
break;
}
if (ret >= 0) {
/* continue later */
o_stream_set_flush_pending(server->output, TRUE);
return FALSE;
}
return TRUE;
}
static bool cmd_subs_list_unsubscriptions(struct dsync_proxy_server *server)
{
struct dsync_worker_unsubscription rec;
string_t *str;
int ret;
str = t_str_new(256);
while ((ret = dsync_worker_subs_iter_next_un(server->subs_iter,
&rec)) > 0) {
str_truncate(str, 0);
dsync_proxy_mailbox_guid_export(str, &rec.name_sha1);
str_append_c(str, '\t');
str_tabescape_write(str, rec.ns_prefix);
str_printfa(str, "\t%ld\n", (long)rec.last_change);
o_stream_send(server->output, str_data(str), str_len(str));
if (proxy_server_is_output_full(server))
break;
}
if (ret >= 0) {
/* continue later */
o_stream_set_flush_pending(server->output, TRUE);
return FALSE;
}
return TRUE;
}
static int
cmd_subs_list(struct dsync_proxy_server *server,
const char *const *args ATTR_UNUSED)
{
int ret = 1;
if (server->subs_iter == NULL) {
server->subs_iter =
dsync_worker_subs_iter_init(server->worker);
}
if (!server->subs_sending_unsubscriptions) {
if (!cmd_subs_list_subscriptions(server))
return 0;
o_stream_send(server->output, "\t0\n", 3);
server->subs_sending_unsubscriptions = TRUE;
}
if (ret > 0) {
if (!cmd_subs_list_unsubscriptions(server))
return 0;
}
server->subs_sending_unsubscriptions = FALSE;
if (dsync_worker_subs_iter_deinit(&server->subs_iter) < 0) {
o_stream_send(server->output, "\t-1\n", 4);
return -1;
} else {
o_stream_send(server->output, "\t0\n", 3);
return 1;
}
}
static int
cmd_subs_set(struct dsync_proxy_server *server, const char *const *args)
{
if (str_array_length(args) < 3) {
i_error("subs-set: Missing parameters");
return -1;
}
dsync_worker_set_subscribed(server->worker, args[0],
strtoul(args[1], NULL, 10),
strcmp(args[2], "1") == 0);
return 1;
}
static int
cmd_msg_list_init(struct dsync_proxy_server *server, const char *const *args)
{
mailbox_guid_t *mailboxes;
unsigned int i, count;
int ret;
count = str_array_length(args);
mailboxes = count == 0 ? NULL : t_new(mailbox_guid_t, count);
for (i = 0; i < count; i++) {
T_BEGIN {
ret = dsync_proxy_mailbox_guid_import(args[i],
&mailboxes[i]);
} T_END;
if (ret < 0) {
i_error("msg-list: Invalid mailbox GUID '%s'", args[i]);
return -1;
}
}
server->msg_iter = dsync_worker_msg_iter_init(server->worker,
mailboxes, count);
return 0;
}
static int
cmd_msg_list(struct dsync_proxy_server *server, const char *const *args)
{
unsigned int mailbox_idx;
struct dsync_message msg;
string_t *str;
int ret;
if (server->msg_iter == NULL) {
if (cmd_msg_list_init(server, args) < 0)
return -1;
}
str = t_str_new(256);
while ((ret = dsync_worker_msg_iter_next(server->msg_iter,
&mailbox_idx, &msg)) > 0) {
str_truncate(str, 0);
str_printfa(str, "%u\t", mailbox_idx);
dsync_proxy_msg_export(str, &msg);
str_append_c(str, '\n');
o_stream_send(server->output, str_data(str), str_len(str));
if (proxy_server_is_output_full(server))
break;
}
if (ret >= 0) {
/* continue later */
o_stream_set_flush_pending(server->output, TRUE);
return 0;
}
if (dsync_worker_msg_iter_deinit(&server->msg_iter) < 0) {
o_stream_send(server->output, "\t-1\n", 4);
return -1;
} else {
o_stream_send(server->output, "\t0\n", 3);
return 1;
}
}
static int
cmd_box_create(struct dsync_proxy_server *server, const char *const *args)
{
struct dsync_mailbox dsync_box;
const char *error;
if (dsync_proxy_mailbox_import_unescaped(pool_datastack_create(),
args, &dsync_box,
&error) < 0) {
i_error("Invalid mailbox input: %s", error);
return -1;
}
dsync_worker_create_mailbox(server->worker, &dsync_box);
return 1;
}
static int
cmd_box_delete(struct dsync_proxy_server *server, const char *const *args)
{
mailbox_guid_t guid;
struct dsync_mailbox dsync_box;
if (str_array_length(args) < 2)
return -1;
if (dsync_proxy_mailbox_guid_import(args[0], &guid) < 0) {
i_error("box-delete: Invalid mailbox GUID '%s'", args[0]);
return -1;
}
memset(&dsync_box, 0, sizeof(dsync_box));
dsync_box.mailbox_guid = guid;
dsync_box.last_changed = strtoul(args[1], NULL, 10);
dsync_worker_delete_mailbox(server->worker, &dsync_box);
return 1;
}
static int
cmd_box_rename(struct dsync_proxy_server *server, const char *const *args)
{
mailbox_guid_t guid;
struct dsync_mailbox dsync_box;
if (str_array_length(args) < 3)
return -1;
if (dsync_proxy_mailbox_guid_import(args[0], &guid) < 0) {
i_error("box-delete: Invalid mailbox GUID '%s'", args[0]);
return -1;
}
memset(&dsync_box, 0, sizeof(dsync_box));
dsync_box.name = args[1];
dsync_box.name_sep = args[2][0];
dsync_worker_rename_mailbox(server->worker, &guid, &dsync_box);
return 1;
}
static int
cmd_box_update(struct dsync_proxy_server *server, const char *const *args)
{
struct dsync_mailbox dsync_box;
const char *error;
if (dsync_proxy_mailbox_import_unescaped(pool_datastack_create(),
args, &dsync_box,
&error) < 0) {
i_error("Invalid mailbox input: %s", error);
return -1;
}
dsync_worker_update_mailbox(server->worker, &dsync_box);
return 1;
}
static int
cmd_box_select(struct dsync_proxy_server *server, const char *const *args)
{
struct dsync_mailbox box;
unsigned int i, count;
memset(&box, 0, sizeof(box));
if (args[0] == NULL ||
dsync_proxy_mailbox_guid_import(args[0], &box.mailbox_guid) < 0) {
i_error("box-select: Invalid mailbox GUID '%s'", args[0]);
return -1;
}
args++;
count = str_array_length(args);
t_array_init(&box.cache_fields, count + 1);
for (i = 0; i < count; i++)
array_append(&box.cache_fields, &args[i], 1);
dsync_worker_select_mailbox(server->worker, &box);
return 1;
}
static int
cmd_msg_update(struct dsync_proxy_server *server, const char *const *args)
{
struct dsync_message msg;
/* uid modseq flags */
if (str_array_length(args) < 3)
return -1;
memset(&msg, 0, sizeof(msg));
msg.uid = strtoul(args[0], NULL, 10);
msg.modseq = strtoull(args[1], NULL, 10);
if (dsync_proxy_msg_parse_flags(pool_datastack_create(),
args[2], &msg) < 0)
return -1;
dsync_worker_msg_update_metadata(server->worker, &msg);
return 1;
}
static int
cmd_msg_uid_change(struct dsync_proxy_server *server, const char *const *args)
{
if (args[0] == NULL || args[1] == NULL)
return -1;
dsync_worker_msg_update_uid(server->worker,
strtoul(args[0], NULL, 10),
strtoul(args[1], NULL, 10));
return 1;
}
static int
cmd_msg_expunge(struct dsync_proxy_server *server, const char *const *args)
{
if (args[0] == NULL)
return -1;
dsync_worker_msg_expunge(server->worker, strtoul(args[0], NULL, 10));
return 1;
}
static void copy_callback(bool success, void *context)
{
struct dsync_proxy_server *server = context;
o_stream_send(server->output, success ? "1\n" : "0\n", 2);
}
static int
cmd_msg_copy(struct dsync_proxy_server *server, const char *const *args)
{
mailbox_guid_t src_mailbox_guid;
uint32_t src_uid;
struct dsync_message msg;
const char *error;
/* src_mailbox_guid src_uid <message> */
if (str_array_length(args) < 3)
return -1;
if (dsync_proxy_mailbox_guid_import(args[0], &src_mailbox_guid) < 0) {
i_error("msg-copy: Invalid mailbox GUID '%s'", args[0]);
return -1;
}
src_uid = strtoul(args[1], NULL, 10);
if (dsync_proxy_msg_import_unescaped(pool_datastack_create(),
args + 2, &msg, &error) < 0)
i_error("Invalid message input: %s", error);
dsync_worker_msg_copy(server->worker, &src_mailbox_guid, src_uid, &msg,
copy_callback, server);
return 1;
}
static int
cmd_msg_save(struct dsync_proxy_server *server, const char *const *args)
{
struct dsync_message msg;
struct dsync_msg_static_data data;
const char *error;
int ret;
if (dsync_proxy_msg_static_import_unescaped(pool_datastack_create(),
args, &data, &error) < 0) {
i_error("Invalid message input: %s", error);
return -1;
}
data.input = i_stream_create_dot(server->input, FALSE);
if (dsync_proxy_msg_import_unescaped(pool_datastack_create(),
args + 2, &msg, &error) < 0) {
i_error("Invalid message input: %s", error);
return -1;
}
/* we rely on save reading the entire input */
net_set_nonblock(server->fd_in, FALSE);
dsync_worker_msg_save(server->worker, &msg, &data);
net_set_nonblock(server->fd_in, TRUE);
ret = dsync_worker_has_failed(server->worker) ? -1 : 1;
i_assert(data.input->eof || ret < 0);
i_stream_destroy(&data.input);
return ret;
}
static void cmd_msg_get_send_more(struct dsync_proxy_server *server)
{
const unsigned char *data;
size_t size;
int ret;
while (!proxy_server_is_output_full(server)) {
ret = i_stream_read_data(server->get_input, &data, &size, 0);
if (ret == -1) {
/* done */
o_stream_send(server->output, "\n.\n", 3);
i_stream_unref(&server->get_input);
return;
} else {
/* for now we assume input is blocking */
i_assert(ret != 0);
}
dsync_proxy_send_dot_output(server->output,
&server->get_input_last_lf,
data, size);
i_stream_skip(server->get_input, size);
}
o_stream_set_flush_pending(server->output, TRUE);
}
static void
cmd_msg_get_callback(enum dsync_msg_get_result result,
const struct dsync_msg_static_data *data, void *context)
{
struct dsync_proxy_server *server = context;
string_t *str;
switch (result) {
case DSYNC_MSG_GET_RESULT_SUCCESS:
break;
case DSYNC_MSG_GET_RESULT_EXPUNGED:
o_stream_send(server->output, "0\n", 3);
return;
case DSYNC_MSG_GET_RESULT_FAILED:
o_stream_send(server->output, "-\n", 3);
return;
}
str = t_str_new(128);
str_printfa(str, "1\t%u\t", server->get_uid);
dsync_proxy_msg_static_export(str, data);
str_append_c(str, '\n');
o_stream_send(server->output, str_data(str), str_len(str));
/* then we'll still have to send the message body. */
server->get_input = data->input;
cmd_msg_get_send_more(server);
}
static int
cmd_msg_get(struct dsync_proxy_server *server, const char *const *args)
{
mailbox_guid_t mailbox_guid;
uint32_t uid;
if (str_array_length(args) < 2)
return -1;
if (dsync_proxy_mailbox_guid_import(args[0], &mailbox_guid) < 0) {
i_error("msg-get: Invalid mailbox GUID '%s'", args[0]);
return -1;
}
uid = strtoul(args[1], NULL, 10);
if (uid == 0)
return -1;
if (server->get_input != NULL) {
i_assert(server->get_uid == uid);
cmd_msg_get_send_more(server);
} else {
server->get_uid = uid;
dsync_worker_msg_get(server->worker, &mailbox_guid, uid,
cmd_msg_get_callback, server);
}
return server->get_input == NULL ? 1 : 0;
}
static void cmd_finish_callback(bool success, void *context)
{
struct dsync_proxy_server *server = context;
const char *reply;
if (!success)
reply = "fail\n";
else if (dsync_worker_has_unexpected_changes(server->worker))
reply = "changes\n";
else
reply = "ok\n";
server->finished = TRUE;
o_stream_send_str(server->output, reply);
}
static int
cmd_finish(struct dsync_proxy_server *server,
const char *const *args ATTR_UNUSED)
{
dsync_worker_finish(server->worker, cmd_finish_callback, server);
return 1;
}
static struct dsync_proxy_server_command commands[] = {
{ "BOX-LIST", cmd_box_list },
{ "SUBS-LIST", cmd_subs_list },
{ "SUBS-SET", cmd_subs_set },
{ "MSG-LIST", cmd_msg_list },
{ "BOX-CREATE", cmd_box_create },
{ "BOX-DELETE", cmd_box_delete },
{ "BOX-RENAME", cmd_box_rename },
{ "BOX-UPDATE", cmd_box_update },
{ "BOX-SELECT", cmd_box_select },
{ "MSG-UPDATE", cmd_msg_update },
{ "MSG-UID-CHANGE", cmd_msg_uid_change },
{ "MSG-EXPUNGE", cmd_msg_expunge },
{ "MSG-COPY", cmd_msg_copy },
{ "MSG-SAVE", cmd_msg_save },
{ "MSG-GET", cmd_msg_get },
{ "FINISH", cmd_finish },
{ NULL, NULL }
};
struct dsync_proxy_server_command *
dsync_proxy_server_command_find(const char *name)
{
unsigned int i;
for (i = 0; commands[i].name != NULL; i++) {
if (strcasecmp(commands[i].name, name) == 0)
return &commands[i];
}
return NULL;
}