bcb4e51a409d94ae670de96afb8483a4f7855294Stephan Bosch/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n"
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1];
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_disconnect(struct replicator_connection *conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_input_line(struct replicator_connection *conn, const char *line)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* <+|-> \t <id> */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' ||
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("Replicator sent invalid input: %s", line);
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen context = hash_table_lookup(conn->requests, POINTER_CAST(id));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("Replicator sent invalid ID: %u", id);
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen hash_table_remove(conn->requests, POINTER_CAST(id));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_input(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* buffer full */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* disconnected */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen while ((line = i_stream_next_line(conn->input)) != NULL)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_send_buf(struct replicator_connection *conn, buffer_t *buf)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* try to send about IO_BLOCK_SIZE amount of data,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen but only full lines */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (;; len++) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_assert(len < buf->used); /* there is always LF */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (o_stream_send(conn->output, data, len) < 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic int replicator_output(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (o_stream_get_buffer_used_size(conn->output) > 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_pending(conn->output, TRUE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* output buffer is empty, send more data */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen if (!replicator_send_buf(conn, conn->queue[p]))
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_connect(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int n;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen i_error("net_connect_unix(%s) failed: %m", conn->path);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen fd = net_connect_ip(&conn->ips[idx], conn->port, NULL);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->io = io_add(fd, IO_READ, replicator_input, conn);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->output = o_stream_create_fd(fd, (size_t)-1);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_set_no_error_handling(conn->output, TRUE);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_nsend_str(conn->output, REPLICATOR_HANDSHAKE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_callback(conn->output, replicator_output, conn);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_abort_all_requests(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen iter = hash_table_iterate_init(conn->requests);
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen while (hash_table_iterate(iter, conn->requests, &key, &value))
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic void replicator_connection_disconnect(struct replicator_connection *conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenstatic struct replicator_connection *replicator_connection_create(void)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int i;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn = i_new(struct replicator_connection, 1);
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainen hash_table_create_direct(&conn->requests, default_pool, 0);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen conn->queue[i] = buffer_create_dynamic(default_pool, 1024);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_connection_create_unix(const char *path,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_connection_create_inet(const struct ip_addr *ips,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen memcpy(conn->ips, ips, sizeof(*ips) * ips_count);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_destroy(struct replicator_connection **_conn)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen unsigned int i;
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenreplicator_send(struct replicator_connection *conn,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen enum replication_priority priority, const char *data)
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_get_buffer_used_size(conn->output) == 0) {
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* we can send data immediately */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen } else if (conn->queue[priority]->used + data_len >=
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* FIXME: compress duplicates, start writing to file */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen /* queue internally to separate queues */
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen buffer_append(conn->queue[priority], data, data_len);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen o_stream_set_flush_pending(conn->output, TRUE);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_notify(struct replicator_connection *conn,
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_send(conn, priority, t_strdup_printf(
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen "U\t%s\t%s\n", str_tabescape(username), priority_str));
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainenvoid replicator_connection_notify_sync(struct replicator_connection *conn,
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen hash_table_insert(conn->requests, POINTER_CAST(id), context);
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf(
447e086422f1ab7cc16833583ed70a4af7a84bc5Timo Sirainen "U\t%s\tsync\t%u\n", str_tabescape(username), id));