bcb4e51a409d94ae670de96afb8483a4f7855294Stephan Bosch/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "lib.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "ioloop.h"
bdd36cfdba3ff66d25570a9ff568d69e1eb543cfTimo Sirainen#include "net.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "istream.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "ostream.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "buffer.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "hash.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "llist.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "strescape.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#include "replicator-connection.h"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#define MAX_INBUF_SIZE 1024
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#define REPLICATOR_RECONNECT_MSECS 5000
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstruct replicator_connection {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen char *path;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct ip_addr *ips;
009217abb57a24a4076092e8e4e165545747839eStephan Bosch unsigned int ips_count, ip_idx;
009217abb57a24a4076092e8e4e165545747839eStephan Bosch in_port_t port;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen int fd;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct io *io;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct istream *input;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct ostream *output;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct timeout *to;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1];
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen HASH_TABLE(void *, void *) requests;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int request_id_counter;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_sync_callback_t *callback;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen};
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_disconnect(struct replicator_connection *conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic int
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_input_line(struct replicator_connection *conn, const char *line)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen void *context;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int id;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* <+|-> \t <id> */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' ||
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen str_to_uint(line+2, &id) < 0 || id == 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("Replicator sent invalid input: %s", line);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return -1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen context = hash_table_lookup(conn->requests, POINTER_CAST(id));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (context == NULL) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("Replicator sent invalid ID: %u", id);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return -1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen hash_table_remove(conn->requests, POINTER_CAST(id));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->callback(line[0] == '+', context);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return 0;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_input(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen const char *line;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen switch (i_stream_read(conn->input)) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case -2:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* buffer full */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("Replicator sent too long line");
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_disconnect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case -1:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* disconnected */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_disconnect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen while ((line = i_stream_next_line(conn->input)) != NULL)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen (void)replicator_input_line(conn, line);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic bool
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_send_buf(struct replicator_connection *conn, buffer_t *buf)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen const unsigned char *data = buf->data;
2ac5f36aa7c2e7a07ba8815d43a6d7483f62e74cTimo Sirainen size_t len = IO_BLOCK_SIZE;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* try to send about IO_BLOCK_SIZE amount of data,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen but only full lines */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (len > buf->used)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen len = buf->used;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (;; len++) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_assert(len < buf->used); /* there is always LF */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (data[len] == '\n') {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen len++;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (o_stream_send(conn->output, data, len) < 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_disconnect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return FALSE;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_delete(buf, 0, len);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return TRUE;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic int replicator_output(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen enum replication_priority p;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (o_stream_flush(conn->output) < 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_disconnect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return 1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (p = REPLICATION_PRIORITY_SYNC;;) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (o_stream_get_buffer_used_size(conn->output) > 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_pending(conn->output, TRUE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* output buffer is empty, send more data */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->queue[p]->used > 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (!replicator_send_buf(conn, conn->queue[p]))
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } else {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (p == REPLICATION_PRIORITY_LOW)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen p--;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return 1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_connect(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int n;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen int fd = -1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
595f6a9754e44233e2db3c3e66e8649201970e78Timo Sirainen if (conn->fd != -1)
595f6a9754e44233e2db3c3e66e8649201970e78Timo Sirainen return;
595f6a9754e44233e2db3c3e66e8649201970e78Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->port == 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen fd = net_connect_unix(conn->path);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (fd == -1)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("net_connect_unix(%s) failed: %m", conn->path);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } else {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (n = 0; n < conn->ips_count; n++) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int idx = conn->ip_idx;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen fd = net_connect_ip(&conn->ips[idx], conn->port, NULL);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (fd != -1)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("connect(%s, %u) failed: %m",
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen net_ip2addr(&conn->ips[idx]), conn->port);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (fd == -1) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->to == NULL) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_connect,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
0d1b8b6bec79746c5d89d57dd8c1688946bd9237Josef 'Jeff' Sipek timeout_remove(&conn->to);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->fd = fd;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->io = io_add(fd, IO_READ, replicator_input, conn);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->output = o_stream_create_fd(fd, (size_t)-1);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_set_no_error_handling(conn->output, TRUE);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_nsend_str(conn->output, REPLICATOR_HANDSHAKE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_callback(conn->output, replicator_output, conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_abort_all_requests(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct hash_iterate_context *iter;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen void *key, *value;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen iter = hash_table_iterate_init(conn->requests);
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen while (hash_table_iterate(iter, conn->requests, &key, &value))
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->callback(FALSE, value);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen hash_table_iterate_deinit(&iter);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen hash_table_clear(conn->requests, TRUE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_disconnect(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->fd == -1)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_abort_all_requests(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen io_remove(&conn->io);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_stream_destroy(&conn->input);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_destroy(&conn->output);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen net_disconnect(conn->fd);
595f6a9754e44233e2db3c3e66e8649201970e78Timo Sirainen conn->fd = -1;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic struct replicator_connection *replicator_connection_create(void)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct replicator_connection *conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int i;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn = i_new(struct replicator_connection, 1);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->fd = -1;
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainen hash_table_create_direct(&conn->requests, default_pool, 0);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->queue[i] = buffer_create_dynamic(default_pool, 1024);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstruct replicator_connection *
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_connection_create_unix(const char *path,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_sync_callback_t *callback)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct replicator_connection *conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn = replicator_connection_create();
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->callback = callback;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->path = i_strdup(path);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstruct replicator_connection *
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_connection_create_inet(const struct ip_addr *ips,
009217abb57a24a4076092e8e4e165545747839eStephan Bosch unsigned int ips_count, in_port_t port,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_sync_callback_t *callback)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct replicator_connection *conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn = replicator_connection_create();
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->callback = callback;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->ips = i_new(struct ip_addr, ips_count);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen memcpy(conn->ips, ips, sizeof(*ips) * ips_count);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->ips_count = ips_count;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->port = port;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen return conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_destroy(struct replicator_connection **_conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen struct replicator_connection *conn = *_conn;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int i;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen *_conn = NULL;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_disconnect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_free(&conn->queue[i]);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
0d1b8b6bec79746c5d89d57dd8c1688946bd9237Josef 'Jeff' Sipek timeout_remove(&conn->to);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen hash_table_destroy(&conn->requests);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_free(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_send(struct replicator_connection *conn,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen enum replication_priority priority, const char *data)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
2ac5f36aa7c2e7a07ba8815d43a6d7483f62e74cTimo Sirainen size_t data_len = strlen(data);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->fd != -1 &&
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_get_buffer_used_size(conn->output) == 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* we can send data immediately */
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_nsend(conn->output, data, data_len);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } else if (conn->queue[priority]->used + data_len >=
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen REPLICATOR_MEMBUF_MAX_SIZE) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* FIXME: compress duplicates, start writing to file */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } else {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* queue internally to separate queues */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_append(conn->queue[priority], data, data_len);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (conn->output != NULL)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_pending(conn->output, TRUE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_notify(struct replicator_connection *conn,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen const char *username,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen enum replication_priority priority)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen const char *priority_str = "";
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_connect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen switch (priority) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case REPLICATION_PRIORITY_NONE:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case REPLICATION_PRIORITY_SYNC:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_unreached();
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case REPLICATION_PRIORITY_LOW:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen priority_str = "low";
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen case REPLICATION_PRIORITY_HIGH:
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen priority_str = "high";
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen break;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen }
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen T_BEGIN {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_send(conn, priority, t_strdup_printf(
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen "U\t%s\t%s\n", str_tabescape(username), priority_str));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } T_END;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_notify_sync(struct replicator_connection *conn,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen const char *username, void *context)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen{
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int id;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_connection_connect(conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen id = ++conn->request_id_counter;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (id == 0) id++;
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen hash_table_insert(conn->requests, POINTER_CAST(id), context);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen T_BEGIN {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf(
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen "U\t%s\tsync\t%u\n", str_tabescape(username), id));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } T_END;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen}