0N/A/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
0N/A
0N/A#include "lib.h"
0N/A#include "array.h"
0N/A#include "ioloop.h"
0N/A#include "istream.h"
0N/A#include "ostream.h"
0N/A#include "str.h"
0N/A#include "strescape.h"
0N/A#include "hash.h"
0N/A#include "replicator-queue.h"
0N/A
0N/A#include <unistd.h>
0N/A#include <fcntl.h>
0N/A
0N/Astruct replicator_sync_lookup {
0N/A struct replicator_user *user;
0N/A
0N/A replicator_sync_callback_t *callback;
0N/A void *context;
0N/A
0N/A bool wait_for_next_push;
0N/A};
0N/A
0N/Astruct replicator_queue {
0N/A struct priorityq *user_queue;
0N/A /* username => struct replicator_user* */
0N/A HASH_TABLE(char *, struct replicator_user *) user_hash;
0N/A
0N/A ARRAY(struct replicator_sync_lookup) sync_lookups;
0N/A
0N/A unsigned int full_sync_interval;
0N/A unsigned int failure_resync_interval;
0N/A
0N/A void (*change_callback)(void *context);
0N/A void *change_context;
0N/A};
0N/A
0N/Astruct replicator_queue_iter {
0N/A struct replicator_queue *queue;
0N/A struct hash_iterate_context *iter;
0N/A};
0N/A
0N/Astatic int user_priority_cmp(const void *p1, const void *p2)
0N/A{
0N/A const struct replicator_user *user1 = p1, *user2 = p2;
0N/A
0N/A if (user1->priority > user2->priority)
0N/A return -1;
0N/A if (user1->priority < user2->priority)
0N/A return 1;
0N/A
0N/A if (user1->priority != REPLICATION_PRIORITY_NONE) {
0N/A /* there is something to replicate */
0N/A if (user1->last_fast_sync < user2->last_fast_sync)
0N/A return -1;
0N/A if (user1->last_fast_sync > user2->last_fast_sync)
0N/A return 1;
0N/A } else if (user1->last_sync_failed != user2->last_sync_failed) {
0N/A /* resync failures first */
0N/A if (user1->last_sync_failed)
0N/A return -1;
0N/A else
0N/A return 1;
0N/A } else if (user1->last_sync_failed) {
0N/A /* both have failed. resync failures with fast-sync timestamp */
0N/A if (user1->last_fast_sync < user2->last_fast_sync)
0N/A return -1;
0N/A if (user1->last_fast_sync > user2->last_fast_sync)
0N/A return 1;
0N/A } else {
0N/A /* nothing to replicate, but do still periodic full syncs */
0N/A if (user1->last_full_sync < user2->last_full_sync)
0N/A return -1;
0N/A if (user1->last_full_sync > user2->last_full_sync)
0N/A return 1;
0N/A }
0N/A return 0;
0N/A}
0N/A
0N/Astruct replicator_queue *
0N/Areplicator_queue_init(unsigned int full_sync_interval,
0N/A unsigned int failure_resync_interval)
0N/A{
0N/A struct replicator_queue *queue;
0N/A
0N/A queue = i_new(struct replicator_queue, 1);
0N/A queue->full_sync_interval = full_sync_interval;
0N/A queue->failure_resync_interval = failure_resync_interval;
0N/A queue->user_queue = priorityq_init(user_priority_cmp, 1024);
0N/A hash_table_create(&queue->user_hash, default_pool, 1024,
0N/A str_hash, strcmp);
0N/A i_array_init(&queue->sync_lookups, 32);
0N/A return queue;
0N/A}
0N/A
0N/Avoid replicator_queue_deinit(struct replicator_queue **_queue)
0N/A{
0N/A struct replicator_queue *queue = *_queue;
0N/A struct priorityq_item *item;
0N/A
0N/A *_queue = NULL;
0N/A
0N/A queue->change_callback = NULL;
0N/A
0N/A while ((item = priorityq_pop(queue->user_queue)) != NULL) {
0N/A struct replicator_user *user = (struct replicator_user *)item;
0N/A
0N/A user->popped = TRUE;
0N/A replicator_queue_remove(queue, &user);
0N/A }
0N/A
0N/A priorityq_deinit(&queue->user_queue);
0N/A hash_table_destroy(&queue->user_hash);
0N/A i_assert(array_count(&queue->sync_lookups) == 0);
0N/A array_free(&queue->sync_lookups);
0N/A i_free(queue);
0N/A}
0N/A
0N/Avoid replicator_queue_set_change_callback(struct replicator_queue *queue,
0N/A void (*callback)(void *context),
0N/A void *context)
0N/A{
0N/A queue->change_callback = callback;
0N/A queue->change_context = context;
0N/A}
0N/A
0N/Avoid replicator_user_ref(struct replicator_user *user)
0N/A{
0N/A i_assert(user->refcount > 0);
0N/A user->refcount++;
0N/A}
0N/A
0N/Abool replicator_user_unref(struct replicator_user **_user)
0N/A{
0N/A struct replicator_user *user = *_user;
0N/A
0N/A i_assert(user->refcount > 0);
0N/A *_user = NULL;
0N/A if (--user->refcount > 0)
0N/A return TRUE;
0N/A
0N/A i_free(user->state);
0N/A i_free(user->username);
0N/A i_free(user);
0N/A return FALSE;
0N/A}
0N/A
0N/Astruct replicator_user *
0N/Areplicator_queue_lookup(struct replicator_queue *queue, const char *username)
0N/A{
0N/A return hash_table_lookup(queue->user_hash, username);
0N/A}
0N/A
0N/Astatic struct replicator_user *
0N/Areplicator_queue_add_int(struct replicator_queue *queue, const char *username,
0N/A enum replication_priority priority)
0N/A{
0N/A struct replicator_user *user;
0N/A
0N/A user = replicator_queue_lookup(queue, username);
0N/A if (user == NULL) {
0N/A user = i_new(struct replicator_user, 1);
0N/A user->refcount = 1;
0N/A user->username = i_strdup(username);
0N/A hash_table_insert(queue->user_hash, user->username, user);
0N/A } else {
0N/A if (user->priority > priority) {
0N/A /* user already has a higher priority than this */
0N/A return user;
0N/A }
0N/A if (!user->popped)
0N/A priorityq_remove(queue->user_queue, &user->item);
0N/A }
0N/A user->priority = priority;
0N/A user->last_update = ioloop_time;
0N/A
0N/A if (!user->popped)
0N/A priorityq_add(queue->user_queue, &user->item);
0N/A return user;
0N/A}
0N/A
0N/Astruct replicator_user *
0N/Areplicator_queue_add(struct replicator_queue *queue, const char *username,
0N/A enum replication_priority priority)
0N/A{
0N/A struct replicator_user *user;
0N/A
0N/A user = replicator_queue_add_int(queue, username, priority);
0N/A if (queue->change_callback != NULL)
0N/A queue->change_callback(queue->change_context);
0N/A return user;
0N/A}
0N/A
0N/Avoid replicator_queue_add_sync(struct replicator_queue *queue,
0N/A const char *username,
0N/A replicator_sync_callback_t *callback,
0N/A void *context)
0N/A{
0N/A struct replicator_user *user;
0N/A struct replicator_sync_lookup *lookup;
0N/A
0N/A user = replicator_queue_add_int(queue, username,
0N/A REPLICATION_PRIORITY_SYNC);
0N/A
0N/A lookup = array_append_space(&queue->sync_lookups);
0N/A lookup->user = user;
0N/A lookup->callback = callback;
0N/A lookup->context = context;
0N/A lookup->wait_for_next_push = user->popped;
0N/A
0N/A if (queue->change_callback != NULL)
0N/A queue->change_callback(queue->change_context);
0N/A}
0N/A
0N/Avoid replicator_queue_remove(struct replicator_queue *queue,
0N/A struct replicator_user **_user)
0N/A{
0N/A struct replicator_user *user = *_user;
0N/A
0N/A *_user = NULL;
0N/A if (!user->popped)
0N/A priorityq_remove(queue->user_queue, &user->item);
0N/A hash_table_remove(queue->user_hash, user->username);
0N/A replicator_user_unref(&user);
0N/A
0N/A if (queue->change_callback != NULL)
0N/A queue->change_callback(queue->change_context);
0N/A}
0N/A
0N/Abool replicator_queue_want_sync_now(struct replicator_queue *queue,
0N/A struct replicator_user *user,
0N/A unsigned int *next_secs_r)
0N/A{
0N/A time_t next_sync;
0N/A
0N/A if (user->priority != REPLICATION_PRIORITY_NONE)
0N/A return TRUE;
0N/A
0N/A if (user->last_sync_failed) {
0N/A next_sync = user->last_fast_sync +
0N/A queue->failure_resync_interval;
0N/A } else {
0N/A next_sync = user->last_full_sync + queue->full_sync_interval;
0N/A }
0N/A if (next_sync <= ioloop_time)
0N/A return TRUE;
0N/A
0N/A *next_secs_r = next_sync - ioloop_time;
0N/A return FALSE;
0N/A}
0N/A
0N/Astruct replicator_user *
0N/Areplicator_queue_pop(struct replicator_queue *queue,
0N/A unsigned int *next_secs_r)
0N/A{
0N/A struct priorityq_item *item;
0N/A struct replicator_user *user;
0N/A
0N/A item = priorityq_peek(queue->user_queue);
0N/A if (item == NULL) {
0N/A /* no users defined. we shouldn't normally get here */
0N/A *next_secs_r = 3600;
0N/A return NULL;
0N/A }
0N/A user = (struct replicator_user *)item;
0N/A if (!replicator_queue_want_sync_now(queue, user, next_secs_r)) {
0N/A /* we don't want to sync the user yet */
0N/A return NULL;
0N/A }
0N/A priorityq_remove(queue->user_queue, &user->item);
0N/A user->popped = TRUE;
0N/A return user;
0N/A}
0N/A
0N/Astatic void
0N/Areplicator_queue_handle_sync_lookups(struct replicator_queue *queue,
0N/A struct replicator_user *user)
0N/A{
0N/A struct replicator_sync_lookup *lookups;
0N/A ARRAY(struct replicator_sync_lookup) callbacks;
0N/A unsigned int i, count;
0N/A bool success = !user->last_sync_failed;
0N/A
0N/A t_array_init(&callbacks, 8);
0N/A lookups = array_get_modifiable(&queue->sync_lookups, &count);
0N/A for (i = 0; i < count; ) {
0N/A if (lookups[i].user != user)
0N/A i++;
0N/A else if (lookups[i].wait_for_next_push) {
0N/A /* another sync request came while user was being
0N/A replicated */
0N/A i_assert(user->priority == REPLICATION_PRIORITY_SYNC);
0N/A lookups[i].wait_for_next_push = FALSE;
0N/A i++;
0N/A } else {
0N/A array_append(&callbacks, &lookups[i], 1);
0N/A array_delete(&queue->sync_lookups, i, 1);
0N/A }
0N/A }
0N/A
0N/A array_foreach_modifiable(&callbacks, lookups)
0N/A lookups->callback(success, lookups->context);
0N/A}
0N/A
0N/Avoid replicator_queue_push(struct replicator_queue *queue,
0N/A struct replicator_user *user)
0N/A{
0N/A i_assert(user->popped);
0N/A
0N/A priorityq_add(queue->user_queue, &user->item);
0N/A user->popped = FALSE;
0N/A
0N/A T_BEGIN {
0N/A replicator_queue_handle_sync_lookups(queue, user);
0N/A } T_END;
0N/A}
0N/A
0N/Astatic int
0N/Areplicator_queue_import_line(struct replicator_queue *queue, const char *line)
0N/A{
0N/A const char *const *args, *username, *state;
0N/A unsigned int priority;
0N/A struct replicator_user *user, tmp_user;
0N/A
0N/A /* <user> <priority> <last update> <last fast sync> <last full sync>
0N/A <last failed> <state> <last successful sync>*/
0N/A args = t_strsplit_tabescaped(line);
0N/A if (str_array_length(args) < 7)
0N/A return -1;
0N/A
0N/A i_zero(&tmp_user);
0N/A username = args[0];
0N/A state = t_strdup_noconst(args[6]);
0N/A if (username[0] == '\0' ||
0N/A str_to_uint(args[1], &priority) < 0 ||
0N/A str_to_time(args[2], &tmp_user.last_update) < 0 ||
0N/A str_to_time(args[3], &tmp_user.last_fast_sync) < 0 ||
0N/A str_to_time(args[4], &tmp_user.last_full_sync) < 0)
0N/A return -1;
0N/A tmp_user.priority = priority;
0N/A tmp_user.last_sync_failed = args[5][0] != '0';
0N/A
0N/A if (str_array_length(args) >= 8) {
0N/A if (str_to_time(args[7], &tmp_user.last_successful_sync) < 0)
0N/A return -1;
0N/A } else {
0N/A tmp_user.last_successful_sync = 0;
0N/A /* On-disk format didn't have this yet */
0N/A }
0N/A
0N/A user = hash_table_lookup(queue->user_hash, username);
0N/A if (user != NULL) {
0N/A if (user->last_update > tmp_user.last_update) {
0N/A /* we already have a newer state */
0N/A return 0;
0N/A }
0N/A if (user->last_update == tmp_user.last_update) {
0N/A /* either one of these could be newer. use the one
0N/A with higher priority. */
0N/A if (user->priority > tmp_user.priority)
0N/A return 0;
0N/A }
0N/A }
0N/A user = replicator_queue_add(queue, username,
0N/A tmp_user.priority);
0N/A user->last_update = tmp_user.last_update;
0N/A user->last_fast_sync = tmp_user.last_fast_sync;
0N/A user->last_full_sync = tmp_user.last_full_sync;
0N/A user->last_successful_sync = tmp_user.last_successful_sync;
0N/A user->last_sync_failed = tmp_user.last_sync_failed;
0N/A i_free(user->state);
0N/A user->state = i_strdup(state);
0N/A return 0;
0N/A}
0N/A
0N/Aint replicator_queue_import(struct replicator_queue *queue, const char *path)
0N/A{
0N/A struct istream *input;
0N/A const char *line;
0N/A int fd, ret = 0;
0N/A
0N/A fd = open(path, O_RDONLY);
0N/A if (fd == -1) {
0N/A if (errno == ENOENT)
return 0;
i_error("open(%s) failed: %m", path);
return -1;
}
input = i_stream_create_fd_autoclose(&fd, (size_t)-1);
while ((line = i_stream_read_next_line(input)) != NULL) {
T_BEGIN {
ret = replicator_queue_import_line(queue, line);
} T_END;
if (ret < 0) {
i_error("Corrupted replicator record in %s: %s",
path, line);
break;
}
}
if (input->stream_errno != 0) {
i_error("read(%s) failed: %s", path, i_stream_get_error(input));
ret = -1;
}
i_stream_destroy(&input);
return ret;
}
static void
replicator_queue_export_user(struct replicator_user *user, string_t *str)
{
str_append_tabescaped(str, user->username);
str_printfa(str, "\t%d\t%lld\t%lld\t%lld\t%d\t", (int)user->priority,
(long long)user->last_update,
(long long)user->last_fast_sync,
(long long)user->last_full_sync,
user->last_sync_failed ? 1 : 0);
if (user->state != NULL)
str_append_tabescaped(str, user->state);
str_printfa(str, "\t%lld\n", (long long)user->last_successful_sync);
}
int replicator_queue_export(struct replicator_queue *queue, const char *path)
{
struct replicator_queue_iter *iter;
struct replicator_user *user;
struct ostream *output;
string_t *str;
int fd, ret = 0;
fd = creat(path, 0600);
if (fd == -1) {
i_error("creat(%s) failed: %m", path);
return -1;
}
output = o_stream_create_fd_file_autoclose(&fd, 0);
o_stream_cork(output);
str = t_str_new(128);
iter = replicator_queue_iter_init(queue);
while ((user = replicator_queue_iter_next(iter)) != NULL) {
str_truncate(str, 0);
replicator_queue_export_user(user, str);
if (o_stream_send(output, str_data(str), str_len(str)) < 0)
break;
}
replicator_queue_iter_deinit(&iter);
if (o_stream_finish(output) < 0) {
i_error("write(%s) failed: %s", path, o_stream_get_error(output));
ret = -1;
}
o_stream_destroy(&output);
return ret;
}
struct replicator_queue_iter *
replicator_queue_iter_init(struct replicator_queue *queue)
{
struct replicator_queue_iter *iter;
iter = i_new(struct replicator_queue_iter, 1);
iter->queue = queue;
iter->iter = hash_table_iterate_init(queue->user_hash);
return iter;
}
struct replicator_user *
replicator_queue_iter_next(struct replicator_queue_iter *iter)
{
struct replicator_user *user;
char *username;
if (!hash_table_iterate(iter->iter, iter->queue->user_hash,
&username, &user))
return NULL;
return user;
}
void replicator_queue_iter_deinit(struct replicator_queue_iter **_iter)
{
struct replicator_queue_iter *iter = *_iter;
*_iter = NULL;
hash_table_iterate_deinit(&iter->iter);
i_free(iter);
}