director.c revision b0421c7397be2146988ee3afb5dcc491c01206cc
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch/* Copyright (c) 2010-2017 Dovecot authors, see the included COPYING file */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "lib.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ioloop.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "array.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "str.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "strescape.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "log-throttle.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ipc-client.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "program-client.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "var-expand.h"
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen#include "istream.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ostream.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "iostream-temp.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "mail-user-hash.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "user-directory.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "mail-host.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "director-host.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "director-connection.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "director.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_IPC_PROXY_PATH "ipc"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_DNS_SOCKET_PATH "dns-client"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_RECONNECT_RETRY_SECS 60
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_RING_MIN_WAIT_SECS 20
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschbool director_debug;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschconst char *user_kill_state_names[USER_KILL_STATE_DELAY+1] = {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "none",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "killing",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "notify-received",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "waiting-for-notify",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "waiting-for-everyone",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "flushing",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "delay",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch};
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct log_throttle *user_move_throttle;
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct log_throttle *user_kill_fail_throttle;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic const struct log_throttle_settings director_log_throttle_settings = {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch .throttle_at_max_per_interval = 100,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch .unthrottle_at_max_per_interval = 2,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch};
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void
7384b4e78eaab44693c985192276e31322155e32Stephan Boschdirector_user_kill_finish_delayed(struct director *dir, struct user *user,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch bool skip_delay);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic bool director_is_self_ip_set(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (net_ip_compare(&dir->self_ip, &net_ip4_any))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (net_ip_compare(&dir->self_ip, &net_ip6_any))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void director_find_self_ip(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *const *hosts;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int i, count;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch hosts = array_get(&dir->dir_hosts, &count);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch for (i = 0; i < count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (net_try_bind(&hosts[i]->ip) == 0) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->self_ip = hosts[i]->ip;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_fatal("director_servers doesn't list ourself");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschvoid director_find_self(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->self_host != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (!director_is_self_ip_set(dir))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_find_self_ip(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->self_host = director_host_lookup(dir, &dir->self_ip,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->self_port);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->self_host == NULL) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen i_fatal("director_servers doesn't list ourself (%s:%u)",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch net_ip2addr(&dir->self_ip), dir->self_port);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->self_host->self = TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic unsigned int director_find_self_idx(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *const *hosts;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int i, count;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert(dir->self_host != NULL);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch hosts = array_get(&dir->dir_hosts, &count);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch for (i = 0; i < count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (hosts[i] == dir->self_host)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return i;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_unreached();
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic bool
7384b4e78eaab44693c985192276e31322155e32Stephan Boschdirector_has_outgoing_connection(struct director *dir,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *host)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_connection *const *connp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach(&dir->connections, connp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (director_connection_get_host(*connp) == host &&
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch !director_connection_is_incoming(*connp))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void
7384b4e78eaab44693c985192276e31322155e32Stephan Boschdirector_log_connect(struct director *dir, struct director_host *host,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch const char *reason)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch string_t *str = t_str_new(128);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (host->last_network_failure > 0) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch str_printfa(str, ", last network failure %ds ago",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (int)(ioloop_time - host->last_network_failure));
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (host->last_protocol_failure > 0) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch str_printfa(str, ", last protocol failure %ds ago",
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen (int)(ioloop_time - host->last_protocol_failure));
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen i_info("Connecting to %s:%u (as %s%s): %s",
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen net_ip2addr(&host->ip), host->port,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen net_ip2addr(&dir->self_ip), str_c(str), reason);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschint director_connect_host(struct director *dir, struct director_host *host,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen const char *reason)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch in_port_t port;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch int fd;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (director_has_outgoing_connection(dir, host))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_log_connect(dir, host, reason);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch port = dir->test_port != 0 ? dir->test_port : host->port;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch fd = net_connect_ip(&host->ip, port, &dir->self_ip);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (fd == -1) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch host->last_network_failure = ioloop_time;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_error("connect(%s) failed: %m", host->name);
57962a937b214be3a131f78005509afaa26fe4bfTimo Sirainen return -1;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* Reset timestamp so that director_connect() won't skip this host
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch while we're still trying to connect to it */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch host->last_network_failure = 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen (void)director_connection_init_out(dir, fd, host);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct director_host *
7384b4e78eaab44693c985192276e31322155e32Stephan Boschdirector_get_preferred_right_host(struct director *dir)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *const *hosts, *host;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int i, count, self_idx;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen hosts = array_get(&dir->dir_hosts, &count);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (count == 1) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* self */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return NULL;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen self_idx = director_find_self_idx(dir);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen for (i = 0; i < count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch host = hosts[(self_idx + i + 1) % count];
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (!host->removed)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return host;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* self, with some removed hosts */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return NULL;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void director_quick_reconnect_retry(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connect(dir, "Alone in director ring - trying to connect to others");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic bool director_wait_for_others(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *const *hostp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* don't assume we're alone until we've attempted to connect
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch to others for a while */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->ring_first_alone != 0 &&
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->ring_first_alone == 0)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_first_alone = ioloop_time;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* reset all failures and try again */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach(&dir->dir_hosts, hostp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (*hostp)->last_network_failure = 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (*hostp)->last_protocol_failure = 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_remove(&dir->to_reconnect);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_quick_reconnect_retry, dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainenvoid director_connect(struct director *dir, const char *reason)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen{
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen struct director_host *const *hosts;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen unsigned int i, count, self_idx;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch self_idx = director_find_self_idx(dir);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* try to connect to first working server on our right side.
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen the left side is supposed to connect to us. */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch hosts = array_get(&dir->dir_hosts, &count);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch for (i = 1; i < count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int idx = (self_idx + i) % count;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (hosts[idx]->removed)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch continue;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (hosts[idx]->last_network_failure +
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* connection failed recently, don't try retrying here */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch continue;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (hosts[idx]->last_protocol_failure +
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* the director recently sent invalid protocol data,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch don't try retrying yet */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen continue;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (director_connect_host(dir, hosts[idx], reason) == 0) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* success */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (count > 1 && director_wait_for_others(dir))
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* we're the only one */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (count > 1) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_warning("director: Couldn't connect to right side, "
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "we must be the only director left");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->left != NULL) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* since we couldn't connect to it,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch it must have failed recently */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_warning("director: Assuming %s is dead, disconnecting",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_get_name(dir->left));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_deinit(&dir->left,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "This connection is dead?");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_min_version = DIRECTOR_VERSION_MINOR;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (!dir->ring_handshaked)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_set_ring_handshaked(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch else
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_set_ring_synced(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschvoid director_set_ring_handshaked(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert(!dir->ring_handshaked);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_remove(&dir->to_handshake_warning);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->ring_handshake_warning_sent) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_warning("Directors have been connected, "
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "continuing delayed requests");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_handshake_warning_sent = FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir_debug("Director ring handshaked");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_handshaked = TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_set_ring_synced(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void director_reconnect_timeout(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *cur_host, *preferred_host =
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_get_preferred_right_host(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch cur_host = dir->right == NULL ? NULL :
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_get_host(dir->right);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (preferred_host == NULL) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* all directors have been removed, try again later */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch } else if (cur_host != preferred_host) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (void)director_connect_host(dir, preferred_host,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "Reconnect attempt to preferred director");
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch } else {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* the connection hasn't finished sync yet.
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch keep this timeout for now. */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschvoid director_set_ring_synced(struct director *dir)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *host;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert(!dir->ring_synced);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert((dir->left != NULL && dir->right != NULL) ||
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (dir->left == NULL && dir->right == NULL));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_remove(&dir->to_handshake_warning);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->ring_handshake_warning_sent) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_warning("Ring is synced, continuing delayed requests "
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch "(syncing took %d secs, hosts_hash=%u)",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (int)(ioloop_time - dir->ring_last_sync_time),
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch mail_hosts_hash(dir->mail_hosts));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_handshake_warning_sent = FALSE;
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch host = dir->right == NULL ? NULL :
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_get_host(dir->right);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_remove(&dir->to_reconnect);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (host != director_get_preferred_right_host(dir)) {
d45ab3fff7c47f1719b9cd310228c0dac2bdd1b2Timo Sirainen /* try to reconnect to preferred host later */
d45ab3fff7c47f1719b9cd310228c0dac2bdd1b2Timo Sirainen dir->to_reconnect =
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_reconnect_timeout, dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->left != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_set_synced(dir->left, TRUE);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->right != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_set_synced(dir->right, TRUE);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_remove(&dir->to_sync);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_synced = TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->ring_last_sync_time = ioloop_time;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch mail_hosts_set_synced(dir->mail_hosts);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_set_state_changed(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschvoid director_sync_send(struct director *dir, struct director_host *host,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch uint32_t seq, unsigned int minor_version,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int timestamp, unsigned int hosts_hash)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch string_t *str;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (host == dir->self_host)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->last_sync_sent_ring_change_counter = dir->ring_change_counter;
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch str = t_str_new(128);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch str_printfa(str, "SYNC\t%s\t%u\t%u",
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch net_ip2addr(&host->ip), host->port, seq);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch if (minor_version > 0 &&
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_get_minor_version(dir->right) > 0) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* only minor_version>0 supports extra parameters */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen str_printfa(str, "\t%u\t%u\t%u", minor_version,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen timestamp, hosts_hash);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen str_append_c(str, '\n');
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_send(dir->right, str_c(str));
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* ping our connections in case either of them are hanging.
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if they are, we want to know it fast. */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->left != NULL)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_ping(dir->left);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_ping(dir->right);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen}
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Boschbool director_resend_sync(struct director *dir)
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (!dir->ring_synced && dir->left != NULL && dir->right != NULL) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* send a new SYNC in case the previous one got dropped */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->self_host->last_sync_timestamp = ioloop_time;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_sync_send(dir, dir->self_host, dir->sync_seq,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch DIRECTOR_VERSION_MINOR, ioloop_time,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch mail_hosts_hash(dir->mail_hosts));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->to_sync != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch timeout_reset(dir->to_sync);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return TRUE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Boschstatic void director_sync_timeout(struct director *dir)
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch{
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch i_assert(!dir->ring_synced);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch if (director_resend_sync(dir))
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch i_error("Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch}
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Boschvoid director_set_ring_unsynced(struct director *dir)
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch{
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch if (dir->ring_synced) {
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch dir->ring_synced = FALSE;
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch dir->ring_last_sync_time = ioloop_time;
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch }
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->to_sync == NULL) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_sync_timeout, dir);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen } else {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen timeout_reset(dir->to_sync);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen}
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainenstatic void director_sync(struct director *dir)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen{
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* we're synced again when we receive this SYNC back */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->sync_seq++;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->right == NULL && dir->left == NULL) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen /* we're alone. if we're already synced,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen don't become unsynced. */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen return;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_set_ring_unsynced(dir);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->sync_frozen) {
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen dir->sync_pending = TRUE;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen return;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen }
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch if (dir->right == NULL) {
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch i_assert(!dir->ring_synced ||
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (dir->left == NULL && dir->right == NULL));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir_debug("Ring is desynced (seq=%u, no right connection)",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->sync_seq);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->sync_seq, dir->right == NULL ? "(nowhere)" :
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_get_name(dir->right));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch /* send PINGs to our connections more rapidly until we've synced again.
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if the connection has actually died, we don't need to wait (and
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen delay requests) for as long to detect it */
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen if (dir->left != NULL)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_set_synced(dir->left, FALSE);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_connection_set_synced(dir->right, FALSE);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen director_sync_send(dir, dir->self_host, dir->sync_seq,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen DIRECTOR_VERSION_MINOR, ioloop_time,
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen mail_hosts_hash(dir->mail_hosts));
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen}
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainenvoid director_sync_freeze(struct director *dir)
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen{
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen struct director_connection *const *connp;
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen i_assert(!dir->sync_frozen);
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen i_assert(!dir->sync_pending);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch array_foreach(&dir->connections, connp)
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch director_connection_cork(*connp);
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch dir->sync_frozen = TRUE;
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch}
65c0e43da8cfc730eeb4634f8aa384081bbfa4e7Timo Sirainen
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Boschvoid director_sync_thaw(struct director *dir)
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch{
50a6d26bd9041f44b4cad0c0357c0c604c132cc8Stephan Bosch struct director_connection *const *connp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert(dir->sync_frozen);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->sync_frozen = FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (dir->sync_pending) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch dir->sync_pending = FALSE;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_sync(dir);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach(&dir->connections, connp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch director_connection_uncork(*connp);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschvoid director_notify_ring_added(struct director_host *added_host,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct director_host *src, bool log)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch const char *cmd;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (log) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_info("Adding director %s to ring (requested by %s)",
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch added_host->name, src->name);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
added_host->dir->ring_change_counter++;
cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
net_ip2addr(&added_host->ip), added_host->port);
director_update_send(added_host->dir, src, cmd);
}
static void director_delayed_dir_remove_timeout(struct director *dir)
{
struct director_host *const *hosts, *host;
unsigned int i, count;
timeout_remove(&dir->to_remove_dirs);
hosts = array_get(&dir->dir_hosts, &count);
for (i = 0; i < count; ) {
if (hosts[i]->removed) {
host = hosts[i];
director_host_free(&host);
hosts = array_get(&dir->dir_hosts, &count);
} else {
i++;
}
}
}
void director_ring_remove(struct director_host *removed_host,
struct director_host *src)
{
struct director *dir = removed_host->dir;
struct director_connection *const *conns, *conn;
unsigned int i, count;
const char *cmd;
i_info("Removing director %s from ring (requested by %s)",
removed_host->name, src->name);
if (removed_host->self && !src->self) {
/* others will just disconnect us */
return;
}
if (!removed_host->self) {
/* mark the host as removed and fully remove it later. this
delay is needed, because the removal may trigger director
reconnections, which may send the director back and we don't
want to re-add it */
removed_host->removed = TRUE;
if (dir->to_remove_dirs == NULL) {
dir->to_remove_dirs =
timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
director_delayed_dir_remove_timeout, dir);
}
}
/* if our left or ride side gets removed, notify them first
before disconnecting. */
cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
net_ip2addr(&removed_host->ip),
removed_host->port);
director_update_send_version(dir, src,
DIRECTOR_VERSION_RING_REMOVE, cmd);
/* disconnect any connections to the host */
conns = array_get(&dir->connections, &count);
for (i = 0; i < count; ) {
conn = conns[i];
if (director_connection_get_host(conn) != removed_host ||
removed_host->self)
i++;
else {
director_connection_deinit(&conn, "Removing from ring");
conns = array_get(&dir->connections, &count);
}
}
if (dir->right == NULL)
director_connect(dir, "Reconnecting after director was removed");
director_sync(dir);
}
static void
director_send_host(struct director *dir, struct director_host *src,
struct director_host *orig_src,
struct mail_host *host)
{
const char *host_tag = mail_host_get_tag(host);
string_t *str;
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
str = t_str_new(128);
str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u",
net_ip2addr(&orig_src->ip), orig_src->port,
orig_src->last_seq,
net_ip2addr(&host->ip), host->vhost_count);
if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) {
str_append_c(str, '\t');
str_append_tabescaped(str, host_tag);
} else if (host_tag[0] != '\0' &&
dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) {
if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) {
i_error("Ring has directors that don't support tags - removing host %s with tag '%s'",
net_ip2addr(&host->ip), host_tag);
} else {
i_error("Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
net_ip2addr(&host->ip), host_tag);
}
director_remove_host(dir, NULL, NULL, host);
return;
}
if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) {
str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
(long)host->last_updown_change);
/* add any further version checks here - these directors ignore
any extra unknown arguments */
if (host->hostname != NULL)
str_append_tabescaped(str, host->hostname);
}
str_append_c(str, '\n');
director_update_send(dir, src, str_c(str));
}
void director_resend_hosts(struct director *dir)
{
struct mail_host *const *hostp;
array_foreach(mail_hosts_get(dir->mail_hosts), hostp)
director_send_host(dir, dir->self_host, NULL, *hostp);
}
void director_update_host(struct director *dir, struct director_host *src,
struct director_host *orig_src,
struct mail_host *host)
{
/* update state in case this is the first mail host being added */
director_set_state_changed(dir);
dir_debug("Updating host %s vhost_count=%u "
"down=%d last_updown_change=%ld (hosts_hash=%u)",
net_ip2addr(&host->ip), host->vhost_count, host->down ? 1 : 0,
(long)host->last_updown_change,
mail_hosts_hash(dir->mail_hosts));
director_send_host(dir, src, orig_src, host);
/* mark the host desynced until ring is synced again. except if we're
alone in the ring that never happens. */
if (dir->right != NULL || dir->left != NULL)
host->desynced = TRUE;
director_sync(dir);
}
void director_remove_host(struct director *dir, struct director_host *src,
struct director_host *orig_src,
struct mail_host *host)
{
struct user_directory *users = host->tag->users;
if (src != NULL) {
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
director_update_send(dir, src, t_strdup_printf(
"HOST-REMOVE\t%s\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port,
orig_src->last_seq, net_ip2addr(&host->ip)));
}
user_directory_remove_host(users, host);
mail_host_remove(host);
director_sync(dir);
}
void director_flush_host(struct director *dir, struct director_host *src,
struct director_host *orig_src,
struct mail_host *host)
{
struct user_directory *users = host->tag->users;
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
director_update_send(dir, src, t_strdup_printf(
"HOST-FLUSH\t%s\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
net_ip2addr(&host->ip)));
user_directory_remove_host(users, host);
director_sync(dir);
}
void director_update_user(struct director *dir, struct director_host *src,
struct user *user)
{
i_assert(src != NULL);
i_assert(!user->weak);
director_update_send(dir, src, t_strdup_printf("USER\t%u\t%s\n",
user->username_hash, net_ip2addr(&user->host->ip)));
}
void director_update_user_weak(struct director *dir, struct director_host *src,
struct director_connection *src_conn,
struct director_host *orig_src,
struct user *user)
{
const char *cmd;
i_assert(src != NULL);
i_assert(user->weak);
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
user->username_hash, net_ip2addr(&user->host->ip));
if (src != dir->self_host && dir->left != NULL && dir->right != NULL &&
director_connection_get_host(dir->left) ==
director_connection_get_host(dir->right)) {
/* only two directors in this ring and we're forwarding
USER-WEAK from one director back to itself via another
so it sees we've received it. we can't use
director_update_send() for this, because it doesn't send
data back to the source. */
if (dir->right == src_conn)
director_connection_send(dir->left, cmd);
else if (dir->left == src_conn)
director_connection_send(dir->right, cmd);
else
i_unreached();
} else {
director_update_send(dir, src, cmd);
}
}
static void
director_flush_user_continue(int result, struct director_kill_context *ctx)
{
struct director *dir = ctx->dir;
ctx->callback_pending = FALSE;
struct user *user = user_directory_lookup(ctx->tag->users,
ctx->username_hash);
if (result == 0) {
struct istream *is = iostream_temp_finish(&ctx->reply, (size_t)-1);
char *data;
i_stream_set_return_partial_line(is, TRUE);
data = i_stream_read_next_line(is);
i_error("%s: Failed to flush user hash %u in host %s: %s",
ctx->socket_path,
ctx->username_hash,
net_ip2addr(&ctx->host_ip),
data == NULL ? "(no output to stdout)" : data);
while((data = i_stream_read_next_line(is)) != NULL) {
i_error("%s: Failed to flush user hash %u in host %s: %s",
ctx->socket_path,
ctx->username_hash,
net_ip2addr(&ctx->host_ip),
data);
}
i_stream_unref(&is);
} else {
o_stream_unref(&ctx->reply);
}
program_client_destroy(&ctx->pclient);
if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
/* user was already freed - ignore */
dir_debug("User %u freed while flushing, result=%d",
ctx->username_hash, result);
i_assert(ctx->to_move == NULL);
i_free(ctx);
} else {
/* ctx is freed later via user->kill_ctx */
dir_debug("Flushing user %u finished, result=%d",
ctx->username_hash, result);
director_user_kill_finish_delayed(dir, user, result == 1);
}
}
static void
director_flush_user(struct director *dir, struct user *user)
{
struct director_kill_context *ctx = user->kill_ctx;
struct var_expand_table tab[] = {
{ 'i', net_ip2addr(&user->host->ip), "ip" },
{ 'h', user->host->hostname, "host" },
{ '\0', NULL, NULL }
};
const char *error;
/* Execute flush script, if set. Only the director that started the
user moving will call the flush script. Having each director do it
would be redundant since they're all supposed to be performing the
same flush task to the same backend.
Flushing is also not triggered if we're moving a user that we just
created due to the user move. This means that the user doesn't have
an old host, so we couldn't really even perform any flushing on the
backend. */
if (*dir->set->director_flush_socket == '\0' ||
ctx->old_host_ip.family == 0 ||
!ctx->kill_is_self_initiated) {
director_user_kill_finish_delayed(dir, user, FALSE);
return;
}
ctx->host_ip = user->host->ip;
string_t *s_sock = str_new(default_pool, 32);
if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) {
i_error("Failed to expand director_flush_socket=%s: %s",
dir->set->director_flush_socket, error);
director_user_kill_finish_delayed(dir, user, FALSE);
return;
}
ctx->socket_path = str_free_without_data(&s_sock);
struct program_client_settings set = {
.client_connect_timeout_msecs = 10000,
.dns_client_socket_path = DIRECTOR_DNS_SOCKET_PATH,
};
restrict_access_init(&set.restrict_set);
const char *const args[] = {
"FLUSH",
t_strdup_printf("%u", user->username_hash),
net_ip2addr(&ctx->old_host_ip),
net_ip2addr(&user->host->ip),
ctx->old_host_down ? "down" : "up",
dec2str(ctx->old_host_vhost_count),
NULL
};
ctx->kill_state = USER_KILL_STATE_FLUSHING;
dir_debug("Flushing user %u via %s", user->username_hash,
ctx->socket_path);
if ((program_client_create(ctx->socket_path, args, &set, FALSE,
&ctx->pclient, &error)) != 0) {
i_error("%s: Failed to flush user hash %u in host %s: %s",
ctx->socket_path,
user->username_hash,
net_ip2addr(&user->host->ip),
error);
director_flush_user_continue(0, ctx);
return;
}
ctx->reply =
iostream_temp_create_named("/tmp", 0,
t_strdup_printf("flush response from %s",
net_ip2addr(&user->host->ip)));
o_stream_set_no_error_handling(ctx->reply, TRUE);
program_client_set_output(ctx->pclient, ctx->reply);
ctx->callback_pending = TRUE;
program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
}
static void director_user_move_finished(struct director *dir)
{
i_assert(dir->users_moving_count > 0);
dir->users_moving_count--;
director_set_state_changed(dir);
}
static void director_user_move_free(struct user *user)
{
struct director *dir = user->kill_ctx->dir;
struct director_kill_context *kill_ctx = user->kill_ctx;
i_assert(kill_ctx != NULL);
dir_debug("User %u move finished at state=%s", user->username_hash,
user_kill_state_names[kill_ctx->kill_state]);
timeout_remove(&kill_ctx->to_move);
i_free(kill_ctx->socket_path);
i_free(kill_ctx);
user->kill_ctx = NULL;
director_user_move_finished(dir);
}
static void
director_user_kill_finish_delayed_to(struct user *user)
{
i_assert(user->kill_ctx != NULL);
i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY);
director_user_move_free(user);
}
static void
director_user_kill_finish_delayed(struct director *dir, struct user *user,
bool skip_delay)
{
if (skip_delay) {
user->kill_ctx->kill_state = USER_KILL_STATE_NONE;
director_user_move_free(user);
return;
}
user->kill_ctx->kill_state = USER_KILL_STATE_DELAY;
/* wait for a while for the kills to finish in the backend server,
so there are no longer any processes running for the user before we
start letting new in connections to the new server. */
timeout_remove(&user->kill_ctx->to_move);
user->kill_ctx->to_move =
timeout_add(dir->set->director_user_kick_delay * 1000,
director_user_kill_finish_delayed_to, user);
}
static void
director_finish_user_kill(struct director *dir, struct user *user, bool self)
{
struct director_kill_context *kill_ctx = user->kill_ctx;
i_assert(kill_ctx != NULL);
i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING);
i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY);
dir_debug("User %u kill finished - %sstate=%s", user->username_hash,
self ? "we started it " : "",
user_kill_state_names[kill_ctx->kill_state]);
if (dir->right == NULL) {
/* we're alone */
director_flush_user(dir, user);
} else if (self ||
kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
director_connection_send(dir->right, t_strdup_printf(
"USER-KILLED\t%u\n", user->username_hash));
kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
} else {
i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING);
kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
}
}
static void director_user_kill_fail_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
i_error("Failed to kill %u users' connections", new_events_count);
}
static void director_kill_user_callback(enum ipc_client_cmd_state state,
const char *data, void *context)
{
struct director_kill_context *ctx = context;
struct user *user;
/* this is an asynchronous notification about user being killed.
there are no guarantees about what might have happened to the user
in the mean time. */
switch (state) {
case IPC_CLIENT_CMD_STATE_REPLY:
/* shouldn't get here. the command reply isn't finished yet. */
return;
case IPC_CLIENT_CMD_STATE_OK:
break;
case IPC_CLIENT_CMD_STATE_ERROR:
if (log_throttle_accept(user_kill_fail_throttle)) {
i_error("Failed to kill user %u connections: %s",
ctx->username_hash, data);
}
/* we can't really do anything but continue anyway */
break;
}
ctx->callback_pending = FALSE;
user = user_directory_lookup(ctx->tag->users, ctx->username_hash);
if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
/* user was already freed - ignore */
i_assert(ctx->to_move == NULL);
director_user_move_finished(ctx->dir);
i_free(ctx);
} else {
i_assert(ctx->kill_state == USER_KILL_STATE_KILLING ||
ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED);
/* we were still waiting for the kill notification */
director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated);
}
}
static void director_user_move_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
i_error("%u users' move timed out, their state may now be inconsistent",
new_events_count);
}
static void director_user_move_timeout(struct user *user)
{
i_assert(user->kill_ctx != NULL);
i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY);
if (log_throttle_accept(user_move_throttle)) {
i_error("Finishing user %u move timed out, "
"its state may now be inconsistent (state=%s)",
user->username_hash,
user_kill_state_names[user->kill_ctx->kill_state]);
}
if (user->kill_ctx->kill_state == USER_KILL_STATE_FLUSHING) {
o_stream_unref(&user->kill_ctx->reply);
program_client_destroy(&user->kill_ctx->pclient);
}
director_user_move_free(user);
}
void director_kill_user(struct director *dir, struct director_host *src,
struct user *user, struct mail_tag *tag,
struct mail_host *old_host, bool forced_kick)
{
struct director_kill_context *ctx;
const char *cmd;
if (USER_IS_BEING_KILLED(user)) {
/* User is being moved again before the previous move
finished. We'll just continue wherever we left off
earlier. */
dir_debug("User %u move restarted - previous kill_state=%s",
user->username_hash,
user_kill_state_names[user->kill_ctx->kill_state]);
return;
}
user->kill_ctx = ctx = i_new(struct director_kill_context, 1);
ctx->dir = dir;
ctx->tag = tag;
ctx->username_hash = user->username_hash;
ctx->kill_is_self_initiated = src->self;
if (old_host != NULL) {
ctx->old_host_ip = old_host->ip;
ctx->old_host_down = old_host->down;
ctx->old_host_vhost_count = old_host->vhost_count;
}
dir->users_moving_count++;
ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
director_user_move_timeout, user);
ctx->kill_state = USER_KILL_STATE_KILLING;
if ((old_host != NULL && old_host != user->host) || forced_kick) {
cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u",
user->username_hash);
ctx->callback_pending = TRUE;
ipc_client_cmd(dir->ipc_proxy, cmd,
director_kill_user_callback, ctx);
} else {
/* a) we didn't even know about the user before now.
don't bother performing a local kick, since it wouldn't
kick anything.
b) our host was already correct. notify others that we have
killed the user, but don't really do it. */
director_finish_user_kill(ctx->dir, user,
ctx->kill_is_self_initiated);
}
}
void director_move_user(struct director *dir, struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash, struct mail_host *host)
{
struct user_directory *users = host->tag->users;
struct mail_host *old_host = NULL;
struct user *user;
/* 1. move this user's host, and set its "killing" flag to delay all of
its future connections until all directors have killed the
connections and notified us about it.
2. tell the other directors about the move
3. once user kill callback is called, tell the other directors
with USER-KILLED that we're done killing the user.
4. when some director gets a duplicate USER-KILLED, it's
responsible for notifying all directors that user is completely
killed.
5. after receiving USER-KILLED-EVERYWHERE notification,
new connections are again allowed for the user.
*/
user = user_directory_lookup(users, username_hash);
if (user == NULL) {
dir_debug("User %u move started: User was nonexistent",
username_hash);
user = user_directory_add(users, username_hash,
host, ioloop_time);
} else if (user->host == host) {
/* User is already in the wanted host, but another director
didn't think so. We'll need to finish the move without
killing any of our connections. */
old_host = user->host;
user->timestamp = ioloop_time;
dir_debug("User %u move forwarded: host is already %s",
username_hash, net_ip2addr(&host->ip));
} else {
/* user is looked up via the new host's tag, so if it's found
the old tag has to be the same. */
i_assert(user->host->tag == host->tag);
old_host = user->host;
user->host->user_count--;
user->host = host;
user->host->user_count++;
user->timestamp = ioloop_time;
dir_debug("User %u move started: host %s -> %s",
username_hash, net_ip2addr(&old_host->ip),
net_ip2addr(&host->ip));
}
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
director_update_send(dir, src, t_strdup_printf(
"USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
user->username_hash, net_ip2addr(&user->host->ip)));
/* kill the user only after sending the USER-MOVE, because the kill
may finish instantly. */
director_kill_user(dir, src, user, host->tag, old_host, FALSE);
}
static void
director_kick_user_callback(enum ipc_client_cmd_state state ATTR_UNUSED,
const char *data ATTR_UNUSED,
void *context ATTR_UNUSED)
{
}
void director_kick_user(struct director *dir, struct director_host *src,
struct director_host *orig_src, const char *username)
{
string_t *cmd = t_str_new(64);
str_append(cmd, "proxy\t*\tKICK\t");
str_append_tabescaped(cmd, username);
ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
director_kick_user_callback, (void *)NULL);
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
str_truncate(cmd, 0);
str_printfa(cmd, "USER-KICK\t%s\t%u\t%u\t",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq);
str_append_tabescaped(cmd, username);
str_append_c(cmd, '\n');
director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd));
}
void director_kick_user_alt(struct director *dir, struct director_host *src,
struct director_host *orig_src,
const char *field, const char *value)
{
string_t *cmd = t_str_new(64);
str_append(cmd, "proxy\t*\tKICK-ALT\t");
str_append_tabescaped(cmd, field);
str_append_c(cmd, '\t');
str_append_tabescaped(cmd, value);
ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
director_kick_user_callback, (void *)NULL);
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
str_truncate(cmd, 0);
str_printfa(cmd, "USER-KICK-ALT\t%s\t%u\t%u\t",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq);
str_append_tabescaped(cmd, field);
str_append_c(cmd, '\t');
str_append_tabescaped(cmd, value);
str_append_c(cmd, '\n');
director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd));
}
void director_kick_user_hash(struct director *dir, struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash,
const struct ip_addr *except_ip)
{
const char *cmd;
cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s",
username_hash, net_ip2addr(except_ip));
ipc_client_cmd(dir->ipc_proxy, cmd,
director_kick_user_callback, (void *)NULL);
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
username_hash, net_ip2addr(except_ip));
director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
}
static void
director_send_user_killed_everywhere(struct director *dir,
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
director_update_send(dir, src, t_strdup_printf(
"USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
username_hash));
}
static void
director_user_tag_killed(struct director *dir, struct mail_tag *tag,
unsigned int username_hash)
{
struct user *user;
user = user_directory_lookup(tag->users, username_hash);
if (user == NULL || !USER_IS_BEING_KILLED(user))
return;
switch (user->kill_ctx->kill_state) {
case USER_KILL_STATE_KILLING:
user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
break;
case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
director_finish_user_kill(dir, user, TRUE);
break;
case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
dir_debug("User %u kill_state=%s - ignoring USER-KILLED",
username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
break;
case USER_KILL_STATE_NONE:
case USER_KILL_STATE_FLUSHING:
case USER_KILL_STATE_DELAY:
/* move restarted. state=none can also happen if USER-MOVE was
sent while we were still moving. send back
USER-KILLED-EVERYWHERE to avoid hangs. */
director_send_user_killed_everywhere(dir, dir->self_host, NULL,
username_hash);
break;
case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE:
director_user_killed_everywhere(dir, dir->self_host,
NULL, username_hash);
break;
}
}
void director_user_killed(struct director *dir, unsigned int username_hash)
{
struct mail_tag *const *tagp;
array_foreach(mail_hosts_get_tags(dir->mail_hosts), tagp)
director_user_tag_killed(dir, *tagp, username_hash);
}
static void
director_user_tag_killed_everywhere(struct director *dir,
struct mail_tag *tag,
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
struct user *user;
user = user_directory_lookup(tag->users, username_hash);
if (user == NULL) {
dir_debug("User %u no longer exists - ignoring USER-KILLED-EVERYWHERE",
username_hash);
return;
}
if (!USER_IS_BEING_KILLED(user)) {
dir_debug("User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE",
username_hash);
return;
}
if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) {
dir_debug("User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE",
username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
return;
}
director_flush_user(dir, user);
director_send_user_killed_everywhere(dir, src, orig_src, username_hash);
}
void director_user_killed_everywhere(struct director *dir,
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
struct mail_tag *const *tagp;
array_foreach(mail_hosts_get_tags(dir->mail_hosts), tagp) {
director_user_tag_killed_everywhere(dir, *tagp, src, orig_src,
username_hash);
}
}
static void director_state_callback_timeout(struct director *dir)
{
timeout_remove(&dir->to_callback);
dir->state_change_callback(dir);
}
void director_set_state_changed(struct director *dir)
{
/* we may get called to here from various places. use a timeout to
make sure the state callback is called with a clean state. */
if (dir->to_callback == NULL) {
dir->to_callback =
timeout_add(0, director_state_callback_timeout, dir);
}
}
void director_update_send(struct director *dir, struct director_host *src,
const char *cmd)
{
director_update_send_version(dir, src, 0, cmd);
}
void director_update_send_version(struct director *dir,
struct director_host *src,
unsigned int min_version, const char *cmd)
{
struct director_connection *const *connp;
i_assert(src != NULL);
array_foreach(&dir->connections, connp) {
if (director_connection_get_host(*connp) != src &&
director_connection_get_minor_version(*connp) >= min_version)
director_connection_send(*connp, cmd);
}
}
static void director_user_freed(struct user *user)
{
if (user->kill_ctx != NULL) {
/* director_user_expire is very short. user expired before
moving the user finished or timed out. */
if (user->kill_ctx->callback_pending) {
/* kill_ctx is used as a callback parameter.
only remove the timeout and finish the free later. */
timeout_remove(&user->kill_ctx->to_move);
} else {
director_user_move_free(user);
}
}
}
struct director *
director_init(const struct director_settings *set,
const struct ip_addr *listen_ip, in_port_t listen_port,
director_state_change_callback_t *callback)
{
struct director *dir;
dir = i_new(struct director, 1);
dir->set = set;
dir->self_port = listen_port;
dir->self_ip = *listen_ip;
dir->state_change_callback = callback;
i_array_init(&dir->dir_hosts, 16);
i_array_init(&dir->pending_requests, 16);
i_array_init(&dir->connections, 8);
dir->mail_hosts = mail_hosts_init(set->director_user_expire,
director_user_freed);
dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
dir->ring_min_version = DIRECTOR_VERSION_MINOR;
return dir;
}
void director_deinit(struct director **_dir)
{
struct director *dir = *_dir;
struct director_host *const *hostp, *host;
struct director_connection *conn, *const *connp;
*_dir = NULL;
while (array_count(&dir->connections) > 0) {
connp = array_idx(&dir->connections, 0);
conn = *connp;
director_connection_deinit(&conn, "Shutting down");
}
mail_hosts_deinit(&dir->mail_hosts);
mail_hosts_deinit(&dir->orig_config_hosts);
ipc_client_deinit(&dir->ipc_proxy);
timeout_remove(&dir->to_reconnect);
timeout_remove(&dir->to_handshake_warning);
timeout_remove(&dir->to_request);
timeout_remove(&dir->to_sync);
timeout_remove(&dir->to_remove_dirs);
timeout_remove(&dir->to_callback);
while (array_count(&dir->dir_hosts) > 0) {
hostp = array_idx(&dir->dir_hosts, 0);
host = *hostp;
director_host_free(&host);
}
array_free(&dir->pending_requests);
array_free(&dir->dir_hosts);
array_free(&dir->connections);
i_free(dir);
}
void dir_debug(const char *fmt, ...)
{
va_list args;
if (!director_debug)
return;
va_start(args, fmt);
T_BEGIN {
i_debug("%s", t_strdup_vprintf(fmt, args));
} T_END;
va_end(args);
}
struct director_user_iter {
struct director *dir;
unsigned int tag_idx;
struct user_directory_iter *user_iter;
};
struct director_user_iter *director_iterate_users_init(struct director *dir)
{
struct director_user_iter *iter = i_new(struct director_user_iter, 1);
iter->dir = dir;
return iter;
}
struct user *director_iterate_users_next(struct director_user_iter *iter)
{
const ARRAY_TYPE(mail_tag) *tags;
struct user *user;
i_assert(iter != NULL);
if (iter->user_iter == NULL) {
tags = mail_hosts_get_tags(iter->dir->mail_hosts);
if (iter->tag_idx >= array_count(tags))
return NULL;
struct mail_tag *const *tagp = array_idx(tags, iter->tag_idx);
iter->user_iter = user_directory_iter_init((*tagp)->users);
}
user = user_directory_iter_next(iter->user_iter);
if (user == NULL) {
user_directory_iter_deinit(&iter->user_iter);
iter->tag_idx++;
return director_iterate_users_next(iter);
} else
return user;
}
void director_iterate_users_deinit(struct director_user_iter **_iter)
{
i_assert(_iter != NULL && *_iter != NULL);
struct director_user_iter *iter = *_iter;
*_iter = NULL;
if (iter->user_iter != NULL)
user_directory_iter_deinit(&iter->user_iter);
i_free(iter);
}
bool
director_get_username_hash(struct director *dir, const char *username,
unsigned int *hash_r)
{
const char *error;
if (mail_user_hash(username, dir->set->director_username_hash, hash_r,
&error))
return TRUE;
i_error("Failed to expand director_user_expire=%s: %s",
dir->set->director_username_hash, error);
return FALSE;
}
void directors_init(void)
{
user_move_throttle =
log_throttle_init(&director_log_throttle_settings,
director_user_move_throttled, NULL);
user_kill_fail_throttle =
log_throttle_init(&director_log_throttle_settings,
director_user_kill_fail_throttled, NULL);
}
void directors_deinit(void)
{
log_throttle_deinit(&user_move_throttle);
log_throttle_deinit(&user_kill_fail_throttle);
}