dsync-ibc-stream.c revision ae949831f1f668b5501b4b125e7f7b1767fb109b
/* Copyright (c) 2013-2016 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "fd-set-nonblock.h"
#include "safe-mkstemp.h"
#include "ioloop.h"
#include "istream.h"
#include "istream-seekable.h"
#include "istream-dot.h"
#include "ostream.h"
#include "str.h"
#include "strescape.h"
#include "master-service.h"
#include "mail-cache.h"
#include "mail-storage-private.h"
#include "dsync-serializer.h"
#include "dsync-deserializer.h"
#include "dsync-mail.h"
#include "dsync-mailbox.h"
#include "dsync-mailbox-state.h"
#include "dsync-mailbox-tree.h"
#include "dsync-ibc-private.h"
#define DSYNC_PROTOCOL_VERSION_MAJOR 3
#define DSYNC_PROTOCOL_VERSION_MINOR 4
#define DSYNC_HANDSHAKE_VERSION "VERSION\tdsync\t3\t4\n"
#define DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES 1
#define DSYNC_PROTOCOL_MINOR_HAVE_SAVE_GUID 2
#define DSYNC_PROTOCOL_MINOR_HAVE_FINISH 3
#define DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V2 4
enum item_type {
};
#define END_OF_LIST_LINE "."
static const struct {
/* full human readable name of the item */
const char *name;
/* unique character identifying the item */
char chr;
const char *required_keys;
const char *optional_keys;
unsigned int min_minor_version;
{ .name = "done",
.chr = 'X',
.optional_keys = ""
},
{ .name = "handshake",
.chr = 'H',
.required_keys = "hostname",
.optional_keys = "sync_ns_prefix sync_box sync_box_guid sync_type "
"debug sync_visible_namespaces exclude_mailboxes "
"send_mail_requests backup_send backup_recv lock_timeout "
"no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote "
"no_notify sync_since_timestamp sync_max_size sync_flags "
"virtual_all_box"
},
{ .name = "mailbox_state",
.chr = 'S',
.required_keys = "mailbox_guid last_uidvalidity last_common_uid "
"last_common_modseq last_common_pvt_modseq",
.optional_keys = "last_messages_count changes_during_sync"
},
{ .name = "mailbox_tree_node",
.chr = 'N',
.required_keys = "name existence",
.optional_keys = "mailbox_guid uid_validity uid_next "
"last_renamed_or_created subscribed last_subscription_change"
},
{ .name = "mailbox_delete",
.chr = 'D',
.required_keys = "hierarchy_sep",
.optional_keys = "mailboxes dirs unsubscribes"
},
{ .name = "mailbox",
.chr = 'B',
.required_keys = "mailbox_guid uid_validity uid_next messages_count "
"first_recent_uid highest_modseq highest_pvt_modseq",
.optional_keys = "mailbox_lost cache_fields have_guids have_save_guids have_only_guid128"
},
{ .name = "mailbox_attribute",
.chr = 'A',
.required_keys = "type key",
.optional_keys = "value stream deleted last_change modseq",
},
{ .name = "mail_change",
.chr = 'C',
.required_keys = "type uid",
.optional_keys = "guid hdr_hash modseq pvt_modseq "
"add_flags remove_flags final_flags "
"keywords_reset keyword_changes"
},
{ .name = "mail_request",
.chr = 'R',
.optional_keys = "guid uid"
},
{ .name = "mail",
.chr = 'M',
.optional_keys = "guid uid pop3_uidl pop3_order received_date saved_date stream"
},
{ .name = "finish",
.chr = 'F',
.optional_keys = "error mail_error require_full_resync",
},
{ .name = "mailbox_cache_field",
.chr = 'c',
.required_keys = "name decision",
.optional_keys = "last_used"
},
};
struct dsync_ibc_stream {
char *name, *temp_path_prefix;
unsigned int timeout_secs;
unsigned int minor_version;
struct dsync_deserializer_decoder *cur_decoder;
struct dsync_mail *cur_mail;
struct dsync_mailbox_attribute *cur_attr;
char value_output_last;
bool last_recv_item_eol:1;
bool last_sent_item_eol:1;
bool version_received:1;
bool handshake_received:1;
bool has_pending_data:1;
bool finish_received:1;
bool done_received:1;
bool stopped:1;
};
{
if (!ibc->version_received)
return "version not received";
else if (!ibc->handshake_received)
return "handshake not received";
return t_strdup_printf("last sent=%s%s, last recv=%s%s",
}
{
}
{
do {
return -1;
}
/* finished reading the mail stream */
return 1;
}
return 0;
}
{
if (dsync_ibc_stream_read_mail_stream(ibc) == 0)
return;
}
}
{
const unsigned char *data;
unsigned char add;
int ret;
add = '\0';
for (i = 0; i < size; i++) {
if (data[i] == '.' &&
/* escape the dot */
add = '.';
break;
}
}
if (i > 0) {
}
return -1;
}
if (ret == 0) {
/* continue later */
return 0;
}
}
if (add != '\0') {
}
}
i_error("dsync(%s): read(%s) failed: %s (%s)",
return -1;
}
/* finished sending the stream. use "CRLF." instead of "LF." just in
case we're sending binary data that ends with CR. */
return 1;
}
{
int ret;
ret = 1;
if (dsync_ibc_stream_send_value_stream(ibc) < 0)
ret = 1;
}
return ret;
}
{
i_error("dsync(%s): I/O has stalled, no activity for %u seconds (%s)",
}
{
unsigned int i;
/* initialize serializers and send their headers to remote */
const char *keys;
ibc->serializers[i] =
}
} T_END;
}
{
unsigned int i;
}
else {
/* If the remote has not told us that they are closing we
notify remote that we're closing. this is mainly to avoid
"read() failed: EOF" errors on failing dsyncs.
Avoid a race condition:
We do not tell the remote we are closing if they have
already told us because they close the
connection after sending ITEM_DONE and will
not be ever receive anything else from us unless
it just happens to get combined into the same packet
as a previous response and is already in the buffer.
*/
}
}
}
const char **line_r)
{
const char *line;
return 1;
}
/* try reading some */
return -1;
} else {
}
return -1;
}
return 0;
}
return 1;
}
struct dsync_deserializer_decoder *decoder,
const char *fmt, ...)
{
const char *error;
else {
}
}
static void
{
}
{
int fd;
if (fd == -1) {
return -1;
}
/* we just want the fd, unlink it */
/* shouldn't happen.. */
i_close_fd(&fd);
return -1;
}
return fd;
}
static struct istream *
{
i_stream_unref(&inputs[0]);
return ibc->value_input;
}
static int
{
unsigned int i;
int ret = 0;
"Remote didn't handshake deserializer for %s",
ret = -1;
}
}
return ret;
}
static bool
{
const char *const *required_keys, *error;
unsigned int i;
if (ibc->handshake_received)
return TRUE;
if (!ibc->version_received) {
&ibc->minor_version)) {
"Remote dsync doesn't use compatible protocol");
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return FALSE;
}
/* finished handshaking */
if (dsync_ibc_check_missing_deserializers(ibc) < 0)
return FALSE;
return FALSE;
}
for (i = 1; i < ITEM_END_OF_LIST; i++) {
item = i;
break;
}
}
/* unknown deserializer, ignore */
return FALSE;
}
"Remote sent invalid handshake for %s: %s",
}
return FALSE;
}
static enum dsync_ibc_recv_ret
struct dsync_deserializer_decoder **decoder_r)
{
unsigned int i;
do {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
/* end of this list */
return DSYNC_IBC_RECV_RET_FINISHED;
}
/* remote cleanly closed the connection, possibly because of
some failure (which it should have logged). we don't want to
log any stream errors anyway after this. */
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
for (i = 1; i < ITEM_END_OF_LIST; i++) {
line_item = i;
break;
}
}
"Received unexpected input %c != %c",
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
&error) < 0) {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static struct dsync_serializer_encoder *
{
}
static void
const struct dsync_ibc_settings *set)
{
struct dsync_serializer_encoder *encoder;
char sync_type[2];
}
}
unsigned int i;
if (i != 0)
}
}
}
break;
sync_type[0] = 'f';
break;
sync_type[0] = 'c';
break;
sync_type[0] = 's';
break;
}
if (sync_type[0] != '\0')
if (set->lock_timeout > 0) {
}
if (set->sync_since_timestamp > 0) {
}
if (set->sync_max_size > 0) {
}
set->sync_flags);
}
}
static enum dsync_ibc_recv_ret
const struct dsync_ibc_settings **set_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_ibc_settings *set;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK) {
if (ret != DSYNC_IBC_RECV_RET_TRYAGAIN) {
i_error("dsync(%s): Unexpected input in handshake",
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
/* now that we know the remote's hostname, use it for the
stream's name */
"Invalid sync_box_guid: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
*value != '\0') {
}
switch (value[0]) {
case 'f':
break;
case 'c':
break;
case 's':
break;
default:
"Unknown sync_type: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
set->lock_timeout == 0) {
"Invalid lock_timeout: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
set->sync_since_timestamp == 0) {
"Invalid sync_since_timestamp: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
set->sync_max_size == 0) {
"Invalid sync_max_size: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
enum dsync_ibc_eol_type type)
{
switch (type) {
return;
break;
default:
break;
}
}
static void
const struct dsync_mailbox_state *state)
{
struct dsync_serializer_encoder *encoder;
if (state->changes_during_sync)
}
static enum dsync_ibc_recv_ret
struct dsync_mailbox_state *state_r)
{
struct dsync_deserializer_decoder *decoder;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
const char *const *name,
const struct dsync_mailbox_node *node)
{
struct dsync_serializer_encoder *encoder;
/* convert all hierarchy separators to tabs. mailbox names really
aren't supposed to have any tabs, but escape them anyway if there
are. */
}
break;
break;
break;
}
}
if (node->uid_validity != 0) {
}
}
if (node->last_renamed_or_created != 0) {
}
if (node->last_subscription_change != 0) {
}
if (node->subscribed)
}
static enum dsync_ibc_recv_ret
const char *const **name_r,
const struct dsync_mailbox_node **node_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mailbox_node *node;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
if (*value == '\0') {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
switch (*value) {
case 'n':
break;
case 'y':
break;
case 'd':
break;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
struct dsync_serializer_encoder *encoder,
const struct dsync_mailbox_delete *deletes,
{
unsigned int i;
str_truncate(str, 0);
for (i = 0; i < count; i++) {
}
}
}
}
static void
const struct dsync_mailbox_delete *deletes,
unsigned int count, char hierarchy_sep)
{
struct dsync_serializer_encoder *encoder;
char sep[2];
"mailboxes",
"dirs",
"unsubscribes",
}
static int
{
struct dsync_mailbox_delete *del;
const char *const *tmp;
unsigned int i;
return -1;
return -1;
}
return 0;
}
static enum dsync_ibc_recv_ret
const struct dsync_mailbox_delete **deletes_r,
unsigned int *count_r, char *hierarchy_sep_r)
{
struct dsync_deserializer_decoder *decoder;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
*hierarchy_sep_r = value[0];
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
DSYNC_MAILBOX_DELETE_TYPE_DIR) < 0) {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static const char *
const struct dsync_mailbox *dsync_box)
{
struct dsync_serializer_encoder *encoder;
const struct mailbox_cache_field *cache_fields;
unsigned int i, count;
char decision[3];
if (count == 0)
return "";
for (i = 0; i < count; i++) {
case MAIL_CACHE_DECISION_NO:
decision[0] = 'n';
break;
case MAIL_CACHE_DECISION_TEMP:
decision[0] = 't';
break;
case MAIL_CACHE_DECISION_YES:
decision[0] = 'y';
break;
}
}
}
if (i > 0) {
/* remove the trailing LF */
}
}
static void
const struct dsync_mailbox *dsync_box)
{
struct dsync_serializer_encoder *encoder;
const char *value;
if (dsync_box->mailbox_lost)
if (dsync_box->have_guids)
if (dsync_box->have_save_guids)
if (dsync_box->have_only_guid128)
}
static int
const char *value)
{
struct dsync_deserializer_decoder *decoder;
struct mailbox_cache_field field;
const char *error;
int ret = 0;
"cache_field: Invalid input: %s", error);
return -1;
}
switch (*value) {
case 'n':
break;
case 't':
break;
case 'y':
break;
default:
value);
ret = -1;
break;
}
ret = -1;
}
return ret;
}
static enum dsync_ibc_recv_ret
const struct dsync_mailbox **dsync_box_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mailbox *box;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
*dsync_box_r = box;
return DSYNC_IBC_RECV_RET_OK;
}
static void
const struct dsync_mailbox_attribute *attr)
{
struct dsync_serializer_encoder *encoder;
char type[2];
return;
type[0] = 'p';
break;
type[0] = 's';
break;
}
if (attr->last_change != 0) {
}
}
}
}
static enum dsync_ibc_recv_ret
const struct dsync_mailbox_attribute **attr_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mailbox_attribute *attr;
const char *value;
enum dsync_ibc_recv_ret ret;
return DSYNC_IBC_RECV_RET_FINISHED;
/* wait until the mail's stream has been read */
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
/* finished reading the stream, return the mail now */
return DSYNC_IBC_RECV_RET_OK;
}
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
switch (*value) {
case 'p':
break;
case 's':
break;
default:
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
/* already finished reading the stream */
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
const struct dsync_mail_change *change)
{
struct dsync_serializer_encoder *encoder;
char type[2];
type[0] = 's';
break;
type[0] = 'e';
break;
type[0] = 'f';
break;
}
}
}
if (change->pvt_modseq != 0) {
}
}
if (change->remove_flags != 0) {
}
if (change->final_flags != 0) {
}
if (change->keywords_reset)
const char *const *changes;
unsigned int i, count;
for (i = 1; i < count; i++) {
}
}
if (change->received_timestamp > 0) {
}
if (change->virtual_size > 0) {
}
}
static enum dsync_ibc_recv_ret
const struct dsync_mail_change **change_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mail_change *change;
const char *value;
unsigned int uintval;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
switch (*value) {
case 's':
break;
case 'e':
break;
case 'f':
break;
default:
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
"Invalid add_flags: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
"Invalid remove_flags: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
"Invalid final_flags: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
*value != '\0') {
for (i = 0; i < count; i++) {
}
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
const struct dsync_mail_request *request)
{
struct dsync_serializer_encoder *encoder;
}
}
static enum dsync_ibc_recv_ret
const struct dsync_mail_request **request_r)
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mail_request *request;
const char *value;
enum dsync_ibc_recv_ret ret;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
const struct dsync_mail *mail)
{
struct dsync_serializer_encoder *encoder;
}
if (mail->pop3_order > 0) {
}
if (mail->received_date > 0) {
}
if (mail->saved_date != 0) {
}
}
}
static enum dsync_ibc_recv_ret
{
struct dsync_deserializer_decoder *decoder;
struct dsync_mail *mail;
const char *value;
enum dsync_ibc_recv_ret ret;
/* wait until the mail's stream has been read */
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
/* finished reading the stream, return the mail now */
return DSYNC_IBC_RECV_RET_OK;
}
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
/* already finished reading the stream */
}
return DSYNC_IBC_RECV_RET_OK;
}
static void
enum mail_error mail_error,
bool require_full_resync)
{
struct dsync_serializer_encoder *encoder;
if (mail_error != 0) {
}
if (require_full_resync)
}
static enum dsync_ibc_recv_ret
enum mail_error *mail_error_r,
bool *require_full_resync_r)
{
struct dsync_deserializer_decoder *decoder;
const char *value;
enum dsync_ibc_recv_ret ret;
int i = 0;
*mail_error_r = 0;
return DSYNC_IBC_RECV_RET_OK;
if (ret != DSYNC_IBC_RECV_RET_OK)
return ret;
str_to_int(value, &i) < 0) {
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
*mail_error_r = i;
return DSYNC_IBC_RECV_RET_OK;
}
{
}
}
{
return TRUE;
return FALSE;
return TRUE;
}
{
return ibc->has_pending_data;
}
static const struct dsync_ibc_vfuncs dsync_ibc_stream_vfuncs = {
};
struct dsync_ibc *
const char *name, const char *temp_path_prefix,
unsigned int timeout_secs)
{
struct dsync_ibc_stream *ibc;
}