Lines Matching defs:ibc
22 #include "dsync-ibc-private.h"
144 struct dsync_ibc ibc;
177 static const char *dsync_ibc_stream_get_state(struct dsync_ibc_stream *ibc)
179 if (!ibc->version_received)
181 else if (!ibc->handshake_received)
185 items[ibc->last_sent_item].name,
186 ibc->last_sent_item_eol ? " (EOL)" : "",
187 items[ibc->last_recv_item].name,
188 ibc->last_recv_item_eol ? " (EOL)" : "");
191 static void dsync_ibc_stream_stop(struct dsync_ibc_stream *ibc)
193 ibc->stopped = TRUE;
194 i_stream_close(ibc->input);
195 o_stream_close(ibc->output);
199 static int dsync_ibc_stream_read_mail_stream(struct dsync_ibc_stream *ibc)
202 i_stream_skip(ibc->value_input,
203 i_stream_get_data_size(ibc->value_input));
204 } while (i_stream_read(ibc->value_input) > 0);
205 if (ibc->value_input->eof) {
206 if (ibc->value_input->stream_errno != 0) {
207 i_error("dsync(%s): read(%s) failed: %s (%s)", ibc->name,
208 i_stream_get_name(ibc->value_input),
209 i_stream_get_error(ibc->value_input),
210 dsync_ibc_stream_get_state(ibc));
211 dsync_ibc_stream_stop(ibc);
215 i_assert(ibc->value_input->eof);
216 i_stream_seek(ibc->value_input, 0);
217 ibc->has_pending_data = TRUE;
218 ibc->value_input = NULL;
224 static void dsync_ibc_stream_input(struct dsync_ibc_stream *ibc)
226 timeout_reset(ibc->to);
227 if (ibc->value_input != NULL) {
228 if (dsync_ibc_stream_read_mail_stream(ibc) == 0)
231 o_stream_cork(ibc->output);
232 ibc->ibc.io_callback(ibc->ibc.io_context);
233 o_stream_uncork(ibc->output);
236 static int dsync_ibc_stream_send_value_stream(struct dsync_ibc_stream *ibc)
243 while ((ret = i_stream_read_more(ibc->value_output, &data, &size)) > 0) {
247 ((i == 0 && ibc->value_output_last == '\n') ||
256 o_stream_nsend(ibc->output, data, i);
257 ibc->value_output_last = data[i-1];
258 i_stream_skip(ibc->value_output, i);
261 if (o_stream_get_buffer_used_size(ibc->output) >= 4096) {
262 if ((ret = o_stream_flush(ibc->output)) < 0) {
263 dsync_ibc_stream_stop(ibc);
268 o_stream_set_flush_pending(ibc->output, TRUE);
274 o_stream_nsend(ibc->output, &add, 1);
275 ibc->value_output_last = add;
280 if (ibc->value_output->stream_errno != 0) {
282 ibc->name, i_stream_get_name(ibc->value_output),
283 i_stream_get_error(ibc->value_output),
284 dsync_ibc_stream_get_state(ibc));
285 dsync_ibc_stream_stop(ibc);
291 o_stream_nsend_str(ibc->output, "\r\n.\r\n");
292 i_stream_unref(&ibc->value_output);
296 static int dsync_ibc_stream_output(struct dsync_ibc_stream *ibc)
298 struct ostream *output = ibc->output;
303 else if (ibc->value_output != NULL) {
304 if (dsync_ibc_stream_send_value_stream(ibc) < 0)
307 timeout_reset(ibc->to);
309 if (!dsync_ibc_is_send_queue_full(&ibc->ibc))
310 ibc->ibc.io_callback(ibc->ibc.io_context);
314 static void dsync_ibc_stream_timeout(struct dsync_ibc_stream *ibc)
317 ibc->name, ibc->timeout_secs, dsync_ibc_stream_get_state(ibc));
318 ibc->ibc.timeout = TRUE;
319 dsync_ibc_stream_stop(ibc);
322 static void dsync_ibc_stream_init(struct dsync_ibc_stream *ibc)
326 ibc->io = io_add_istream(ibc->input, dsync_ibc_stream_input, ibc);
327 o_stream_set_no_error_handling(ibc->output, TRUE);
328 o_stream_set_flush_callback(ibc->output, dsync_ibc_stream_output, ibc);
329 ibc->to = timeout_add(ibc->timeout_secs * 1000,
330 dsync_ibc_stream_timeout, ibc);
331 o_stream_cork(ibc->output);
332 o_stream_nsend_str(ibc->output, DSYNC_HANDSHAKE_VERSION);
344 ibc->serializers[i] =
346 o_stream_nsend(ibc->output, &items[i].chr, 1);
347 o_stream_nsend_str(ibc->output,
348 dsync_serializer_encode_header_line(ibc->serializers[i]));
351 o_stream_nsend_str(ibc->output, ".\n");
352 o_stream_uncork(ibc->output);
357 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
361 if (ibc->serializers[i] != NULL)
362 dsync_serializer_deinit(&ibc->serializers[i]);
363 if (ibc->deserializers[i] != NULL)
364 dsync_deserializer_deinit(&ibc->deserializers[i]);
366 if (ibc->cur_decoder != NULL)
367 dsync_deserializer_decode_finish(&ibc->cur_decoder);
368 if (ibc->value_output != NULL)
369 i_stream_unref(&ibc->value_output);
383 if (!ibc->done_received && !ibc->finish_received) {
384 o_stream_nsend_str(ibc->output,
387 (void)o_stream_finish(ibc->output);
390 timeout_remove(&ibc->to);
391 io_remove(&ibc->io);
392 i_stream_destroy(&ibc->input);
393 o_stream_destroy(&ibc->output);
394 pool_unref(&ibc->ret_pool);
395 i_free(ibc->temp_path_prefix);
396 i_free(ibc->name);
397 i_free(ibc);
400 static int dsync_ibc_stream_next_line(struct dsync_ibc_stream *ibc,
407 line = i_stream_next_line(ibc->input);
413 if ((ret = i_stream_read(ibc->input)) == -1) {
414 if (ibc->stopped)
417 if (ibc->input->stream_errno != 0) {
418 str_printfa(error, "read(%s) failed: %s", ibc->name,
419 i_stream_get_error(ibc->input));
421 i_assert(ibc->input->eof);
422 str_printfa(error, "read(%s) failed: EOF", ibc->name);
424 str_printfa(error, " (%s)", dsync_ibc_stream_get_state(ibc));
426 dsync_ibc_stream_stop(ibc);
430 *line_r = i_stream_next_line(ibc->input);
432 ibc->has_pending_data = FALSE;
435 ibc->has_pending_data = TRUE;
440 dsync_ibc_input_error(struct dsync_ibc_stream *ibc,
450 i_error("dsync(%s): %s", ibc->name, error);
452 i_error("dsync(%s): %s: %s", ibc->name,
457 dsync_ibc_stream_stop(ibc);
461 dsync_ibc_stream_send_string(struct dsync_ibc_stream *ibc,
464 i_assert(ibc->value_output == NULL);
465 o_stream_nsend(ibc->output, str_data(str), str_len(str));
470 struct dsync_ibc_stream *ibc = context;
475 str_append(path, ibc->temp_path_prefix);
494 dsync_ibc_stream_input_stream(struct dsync_ibc_stream *ibc)
498 inputs[0] = i_stream_create_dot(ibc->input, FALSE);
500 ibc->value_input = i_stream_create_seekable(inputs, MAIL_READ_FULL_BLOCK_SIZE,
501 seekable_fd_callback, ibc);
504 return ibc->value_input;
508 dsync_ibc_check_missing_deserializers(struct dsync_ibc_stream *ibc)
514 if (ibc->deserializers[i] == NULL &&
515 ibc->minor_version >= items[i].min_minor_version &&
518 dsync_ibc_input_error(ibc, NULL,
528 dsync_ibc_stream_handshake(struct dsync_ibc_stream *ibc, const char *line)
534 if (ibc->handshake_received)
537 if (!ibc->version_received) {
540 &ibc->minor_version)) {
541 dsync_ibc_input_error(ibc, NULL,
545 ibc->version_received = TRUE;
551 if (dsync_ibc_check_missing_deserializers(ibc) < 0)
553 ibc->handshake_received = TRUE;
554 ibc->last_recv_item = ITEM_HANDSHAKE;
573 &ibc->deserializers[item], &error) < 0) {
574 dsync_ibc_input_error(ibc, NULL,
582 dsync_ibc_stream_input_next(struct dsync_ibc_stream *ibc, enum item_type item,
589 i_assert(ibc->value_input == NULL);
591 timeout_reset(ibc->to);
594 if (dsync_ibc_stream_next_line(ibc, &line) <= 0)
596 } while (!dsync_ibc_stream_handshake(ibc, line));
598 ibc->last_recv_item = item;
599 ibc->last_recv_item_eol = FALSE;
603 ibc->last_recv_item_eol = TRUE;
610 ibc->done_received = TRUE;
611 dsync_ibc_stream_stop(ibc);
622 dsync_ibc_input_error(ibc, NULL,
628 if (ibc->cur_decoder != NULL)
629 dsync_deserializer_decode_finish(&ibc->cur_decoder);
630 if (dsync_deserializer_decode_begin(ibc->deserializers[item],
631 line+1, &ibc->cur_decoder,
633 dsync_ibc_input_error(ibc, NULL, "Invalid input to %s: %s",
637 *decoder_r = ibc->cur_decoder;
642 dsync_ibc_send_encode_begin(struct dsync_ibc_stream *ibc, enum item_type item)
644 ibc->last_sent_item = item;
645 ibc->last_sent_item_eol = FALSE;
646 return dsync_serializer_encode_begin(ibc->serializers[item]);
653 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
659 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_HANDSHAKE);
760 dsync_ibc_stream_send_string(ibc, str);
767 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
771 pool_t pool = ibc->ret_pool;
774 ret = dsync_ibc_stream_input_next(ibc, ITEM_HANDSHAKE, &decoder);
778 ibc->name);
779 dsync_ibc_stream_stop(ibc);
791 i_free(ibc->name);
792 ibc->name = i_strdup(set->hostname);
802 dsync_ibc_input_error(ibc, decoder,
823 dsync_ibc_input_error(ibc, decoder,
831 dsync_ibc_input_error(ibc, decoder,
839 dsync_ibc_input_error(ibc, decoder,
847 dsync_ibc_input_error(ibc, decoder,
855 dsync_ibc_input_error(ibc, decoder,
863 dsync_ibc_input_error(ibc, decoder,
894 set->hdr_hash_v2 = ibc->minor_version >= DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V2;
895 set->hdr_hash_v3 = ibc->minor_version >= DSYNC_PROTOCOL_MINOR_HAVE_HDR_HASH_V3;
905 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
907 i_assert(ibc->value_output == NULL);
911 if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
918 ibc->last_sent_item_eol = TRUE;
919 o_stream_nsend_str(ibc->output, END_OF_LIST_LINE"\n");
926 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
931 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_STATE);
948 dsync_ibc_stream_send_string(ibc, str);
955 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
962 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_STATE, &decoder);
968 dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
973 dsync_ibc_input_error(ibc, decoder, "Invalid last_uidvalidity");
978 dsync_ibc_input_error(ibc, decoder, "Invalid last_common_uid");
983 dsync_ibc_input_error(ibc, decoder, "Invalid last_common_modseq");
988 dsync_ibc_input_error(ibc, decoder, "Invalid last_common_pvt_modseq");
993 dsync_ibc_input_error(ibc, decoder, "Invalid last_messages_count");
1006 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1025 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_TREE_NODE);
1062 dsync_ibc_stream_send_string(ibc, str);
1070 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1076 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_TREE_NODE, &decoder);
1080 p_clear(ibc->ret_pool);
1081 node = p_new(ibc->ret_pool, struct dsync_mailbox_node, 1);
1085 dsync_ibc_input_error(ibc, decoder, "Empty name");
1088 *name_r = (void *)p_strsplit_tabescaped(ibc->ret_pool, value);
1105 dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
1110 dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity");
1115 dsync_ibc_input_error(ibc, decoder, "Invalid uid_next");
1120 dsync_ibc_input_error(ibc, decoder, "Invalid last_renamed_or_created");
1125 dsync_ibc_input_error(ibc, decoder, "Invalid last_subscription_change");
1162 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1170 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_DELETE);
1185 dsync_ibc_stream_send_string(ibc, str);
1215 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1221 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_DELETE, &decoder);
1225 p_clear(ibc->ret_pool);
1226 p_array_init(&deletes, ibc->ret_pool, 16);
1230 dsync_ibc_input_error(ibc, decoder, "Invalid hierarchy_sep");
1238 dsync_ibc_input_error(ibc, decoder, "Invalid mailboxes");
1244 dsync_ibc_input_error(ibc, decoder, "Invalid dirs");
1250 dsync_ibc_input_error(ibc, decoder, "Invalid dirs");
1258 get_cache_fields(struct dsync_ibc_stream *ibc,
1275 encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_CACHE_FIELD]);
1311 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1317 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX);
1344 value = get_cache_fields(ibc, dsync_box);
1349 dsync_ibc_stream_send_string(ibc, str);
1353 parse_cache_field(struct dsync_ibc_stream *ibc, struct dsync_mailbox *box,
1361 if (dsync_deserializer_decode_begin(ibc->deserializers[ITEM_MAILBOX_CACHE_FIELD],
1363 dsync_ibc_input_error(ibc, NULL,
1370 field.name = p_strdup(ibc->ret_pool, value);
1384 dsync_ibc_input_error(ibc, decoder, "Invalid decision: %s",
1394 dsync_ibc_input_error(ibc, decoder, "Invalid last_used");
1407 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1408 pool_t pool = ibc->ret_pool;
1417 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX, &decoder);
1423 dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid");
1434 (box->have_guids && ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_SAVE_GUID))
1440 dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity");
1445 dsync_ibc_input_error(ibc, decoder, "Invalid uid_next");
1450 dsync_ibc_input_error(ibc, decoder, "Invalid messages_count");
1455 dsync_ibc_input_error(ibc, decoder, "Invalid first_recent_uid");
1460 dsync_ibc_input_error(ibc, decoder, "Invalid highest_modseq");
1465 dsync_ibc_input_error(ibc, decoder, "Invalid highest_pvt_modseq");
1473 if (parse_cache_field(ibc, box, *fields) < 0)
1486 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1491 if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
1495 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAILBOX_ATTRIBUTE);
1526 dsync_ibc_stream_send_string(ibc, str);
1529 ibc->value_output_last = '\0';
1530 ibc->value_output = attr->value_stream;
1531 i_stream_ref(ibc->value_output);
1532 (void)dsync_ibc_stream_send_value_stream(ibc);
1540 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1541 pool_t pool = ibc->ret_pool;
1547 if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES)
1550 if (ibc->value_input != NULL) {
1555 if (ibc->cur_attr != NULL) {
1557 *attr_r = ibc->cur_attr;
1558 ibc->cur_attr = NULL;
1565 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_ATTRIBUTE, &decoder);
1578 dsync_ibc_input_error(ibc, decoder, "Invalid type: %s", value);
1589 dsync_ibc_input_error(ibc, decoder, "Invalid last_change");
1594 dsync_ibc_input_error(ibc, decoder, "Invalid modseq");
1602 attr->value_stream = dsync_ibc_stream_input_stream(ibc);
1603 if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
1604 ibc->cur_attr = attr;
1608 i_assert(ibc->value_input == NULL);
1620 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1626 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL_CHANGE);
1697 dsync_ibc_stream_send_string(ibc, str);
1704 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1705 pool_t pool = ibc->ret_pool;
1716 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_CHANGE, &decoder);
1732 dsync_ibc_input_error(ibc, decoder, "Invalid type: %s", value);
1738 dsync_ibc_input_error(ibc, decoder, "Invalid uid");
1748 dsync_ibc_input_error(ibc, decoder, "Invalid modseq");
1753 dsync_ibc_input_error(ibc, decoder, "Invalid pvt_modseq");
1760 dsync_ibc_input_error(ibc, decoder,
1769 dsync_ibc_input_error(ibc, decoder,
1778 dsync_ibc_input_error(ibc, decoder,
1800 dsync_ibc_input_error(ibc, decoder, "Invalid received_timestamp");
1807 dsync_ibc_input_error(ibc, decoder, "Invalid virtual_size");
1821 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1826 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL_REQUEST);
1834 dsync_ibc_stream_send_string(ibc, str);
1841 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1847 p_clear(ibc->ret_pool);
1848 request = p_new(ibc->ret_pool, struct dsync_mail_request, 1);
1850 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_REQUEST, &decoder);
1855 request->guid = p_strdup(ibc->ret_pool, value);
1858 dsync_ibc_input_error(ibc, decoder, "Invalid uid");
1870 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1875 i_assert(ibc->value_output == NULL);
1878 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_MAIL);
1903 dsync_ibc_stream_send_string(ibc, str);
1906 ibc->value_output_last = '\0';
1907 ibc->value_output = mail->input;
1908 i_stream_ref(ibc->value_output);
1909 (void)dsync_ibc_stream_send_value_stream(ibc);
1916 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1917 pool_t pool = ibc->ret_pool;
1923 if (ibc->value_input != NULL) {
1927 if (ibc->cur_mail != NULL) {
1929 *mail_r = ibc->cur_mail;
1930 ibc->cur_mail = NULL;
1937 ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL, &decoder);
1945 dsync_ibc_input_error(ibc, decoder, "Invalid uid");
1952 dsync_ibc_input_error(ibc, decoder, "Invalid pop3_order");
1957 dsync_ibc_input_error(ibc, decoder, "Invalid received_date");
1962 dsync_ibc_input_error(ibc, decoder, "Invalid saved_date");
1966 mail->input = dsync_ibc_stream_input_stream(ibc);
1967 if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) {
1968 ibc->cur_mail = mail;
1972 i_assert(ibc->value_input == NULL);
1984 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
1989 encoder = dsync_ibc_send_encode_begin(ibc, ITEM_FINISH);
1999 dsync_ibc_stream_send_string(ibc, str);
2007 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
2017 p_clear(ibc->ret_pool);
2019 if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_FINISH)
2022 ret = dsync_ibc_stream_input_next(ibc, ITEM_FINISH, &decoder);
2027 *error_r = p_strdup(ibc->ret_pool, value);
2030 dsync_ibc_input_error(ibc, decoder, "Invalid mail_error");
2037 ibc->finish_received = TRUE;
2043 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
2045 if (ibc->value_output != NULL) {
2046 i_stream_unref(&ibc->value_output);
2047 dsync_ibc_stream_stop(ibc);
2053 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
2056 if (ibc->value_output != NULL)
2059 bytes = o_stream_get_buffer_used_size(ibc->output);
2063 o_stream_set_flush_pending(ibc->output, TRUE);
2069 struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc;
2071 return ibc->has_pending_data;
2107 struct dsync_ibc_stream *ibc;
2109 ibc = i_new(struct dsync_ibc_stream, 1);
2110 ibc->ibc.v = dsync_ibc_stream_vfuncs;
2111 ibc->input = input;
2112 ibc->output = output;
2113 ibc->name = i_strdup(name);
2114 ibc->temp_path_prefix = i_strdup(temp_path_prefix);
2115 ibc->timeout_secs = timeout_secs;
2116 ibc->ret_pool = pool_alloconly_create("ibc stream data", 2048);
2117 dsync_ibc_stream_init(ibc);
2118 return &ibc->ibc;