director.c revision a93c909cfa1139226891d198dc6f7a7c98509b49
951N/A/* Copyright (c) 2010-2015 Dovecot authors, see the included COPYING file */
951N/A
951N/A#include "lib.h"
951N/A#include "ioloop.h"
951N/A#include "array.h"
951N/A#include "str.h"
951N/A#include "strescape.h"
951N/A#include "ipc-client.h"
951N/A#include "user-directory.h"
951N/A#include "mail-host.h"
951N/A#include "director-host.h"
951N/A#include "director-connection.h"
951N/A#include "director.h"
951N/A
951N/A#define DIRECTOR_IPC_PROXY_PATH "ipc"
951N/A
951N/A#define DIRECTOR_RECONNECT_RETRY_SECS 60
951N/A#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
951N/A#define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
951N/A#define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000)
951N/A#define DIRECTOR_RING_MIN_WAIT_SECS 20
951N/A#define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
951N/A#define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30)
951N/A
951N/Abool director_debug;
951N/A
951N/Astatic bool director_is_self_ip_set(struct director *dir)
951N/A{
951N/A struct ip_addr ip;
951N/A
951N/A net_get_ip_any4(&ip);
951N/A if (net_ip_compare(&dir->self_ip, &ip))
951N/A return FALSE;
951N/A
951N/A net_get_ip_any6(&ip);
951N/A if (net_ip_compare(&dir->self_ip, &ip))
951N/A return FALSE;
951N/A
951N/A return TRUE;
951N/A}
951N/A
951N/Astatic void director_find_self_ip(struct director *dir)
951N/A{
951N/A struct director_host *const *hosts;
951N/A unsigned int i, count;
951N/A
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A for (i = 0; i < count; i++) {
951N/A if (net_try_bind(&hosts[i]->ip) == 0) {
951N/A dir->self_ip = hosts[i]->ip;
951N/A return;
951N/A }
951N/A }
951N/A i_fatal("director_servers doesn't list ourself");
951N/A}
951N/A
951N/Avoid director_find_self(struct director *dir)
951N/A{
951N/A if (dir->self_host != NULL)
951N/A return;
951N/A
951N/A if (!director_is_self_ip_set(dir))
951N/A director_find_self_ip(dir);
951N/A
951N/A dir->self_host = director_host_lookup(dir, &dir->self_ip,
951N/A dir->self_port);
951N/A if (dir->self_host == NULL) {
951N/A i_fatal("director_servers doesn't list ourself (%s:%u)",
951N/A net_ip2addr(&dir->self_ip), dir->self_port);
951N/A }
951N/A dir->self_host->self = TRUE;
951N/A}
951N/A
951N/Astatic unsigned int director_find_self_idx(struct director *dir)
951N/A{
951N/A struct director_host *const *hosts;
951N/A unsigned int i, count;
951N/A
951N/A i_assert(dir->self_host != NULL);
951N/A
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A for (i = 0; i < count; i++) {
951N/A if (hosts[i] == dir->self_host)
951N/A return i;
951N/A }
951N/A i_unreached();
951N/A}
951N/A
951N/Astatic bool
951N/Adirector_has_outgoing_connection(struct director *dir,
951N/A struct director_host *host)
951N/A{
951N/A struct director_connection *const *connp;
951N/A
951N/A array_foreach(&dir->connections, connp) {
951N/A if (director_connection_get_host(*connp) == host &&
951N/A !director_connection_is_incoming(*connp))
951N/A return TRUE;
951N/A }
951N/A return FALSE;
951N/A}
951N/A
951N/Aint director_connect_host(struct director *dir, struct director_host *host)
951N/A{
951N/A in_port_t port;
951N/A int fd;
951N/A
951N/A if (director_has_outgoing_connection(dir, host))
951N/A return 0;
951N/A
951N/A if (director_debug) {
951N/A string_t *str = t_str_new(128);
951N/A
951N/A str_printfa(str, "Connecting to %s:%u (as %s",
951N/A net_ip2addr(&host->ip), host->port,
951N/A net_ip2addr(&dir->self_ip));
951N/A if (host->last_network_failure > 0) {
951N/A str_printfa(str, ", last network failure %ds ago",
951N/A (int)(ioloop_time - host->last_network_failure));
951N/A }
951N/A if (host->last_protocol_failure > 0) {
951N/A str_printfa(str, ", last protocol failure %ds ago",
951N/A (int)(ioloop_time - host->last_protocol_failure));
951N/A }
951N/A dir_debug("%s", str_c(str));
951N/A }
951N/A port = dir->test_port != 0 ? dir->test_port : host->port;
951N/A fd = net_connect_ip(&host->ip, port, &dir->self_ip);
951N/A if (fd == -1) {
951N/A host->last_network_failure = ioloop_time;
951N/A i_error("connect(%s) failed: %m", host->name);
951N/A return -1;
951N/A }
951N/A /* Reset timestamp so that director_connect() won't skip this host
951N/A while we're still trying to connect to it */
951N/A host->last_network_failure = 0;
951N/A
951N/A (void)director_connection_init_out(dir, fd, host);
951N/A return 0;
951N/A}
951N/A
951N/Astatic struct director_host *
951N/Adirector_get_preferred_right_host(struct director *dir)
951N/A{
951N/A struct director_host *const *hosts, *host;
951N/A unsigned int i, count, self_idx;
951N/A
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A if (count == 1) {
951N/A /* self */
951N/A return NULL;
951N/A }
951N/A
951N/A self_idx = director_find_self_idx(dir);
951N/A for (i = 0; i < count; i++) {
951N/A host = hosts[(self_idx + i + 1) % count];
951N/A if (!host->removed)
951N/A return host;
951N/A }
951N/A /* self, with some removed hosts */
951N/A return NULL;
951N/A}
951N/A
951N/Astatic bool director_wait_for_others(struct director *dir)
951N/A{
951N/A struct director_host *const *hostp;
951N/A
951N/A /* don't assume we're alone until we've attempted to connect
951N/A to others for a while */
951N/A if (dir->ring_first_alone != 0 &&
951N/A ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
951N/A return FALSE;
951N/A
951N/A if (dir->ring_first_alone == 0)
951N/A dir->ring_first_alone = ioloop_time;
951N/A /* reset all failures and try again */
951N/A array_foreach(&dir->dir_hosts, hostp) {
951N/A (*hostp)->last_network_failure = 0;
951N/A (*hostp)->last_protocol_failure = 0;
951N/A }
951N/A if (dir->to_reconnect != NULL)
951N/A timeout_remove(&dir->to_reconnect);
951N/A dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
951N/A director_connect, dir);
951N/A return TRUE;
951N/A}
951N/A
951N/Avoid director_connect(struct director *dir)
951N/A{
951N/A struct director_host *const *hosts;
951N/A unsigned int i, count, self_idx;
951N/A
951N/A self_idx = director_find_self_idx(dir);
951N/A
951N/A /* try to connect to first working server on our right side.
951N/A the left side is supposed to connect to us. */
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A for (i = 1; i < count; i++) {
951N/A unsigned int idx = (self_idx + i) % count;
951N/A
951N/A if (hosts[idx]->removed)
951N/A continue;
951N/A
951N/A if (hosts[idx]->last_network_failure +
951N/A DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
951N/A /* connection failed recently, don't try retrying here */
951N/A continue;
951N/A }
951N/A if (hosts[idx]->last_protocol_failure +
951N/A DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) {
951N/A /* the director recently sent invalid protocol data,
951N/A don't try retrying yet */
951N/A continue;
951N/A }
951N/A
951N/A if (director_connect_host(dir, hosts[idx]) == 0) {
951N/A /* success */
951N/A return;
951N/A }
951N/A }
951N/A
951N/A if (count > 1 && director_wait_for_others(dir))
951N/A return;
951N/A
951N/A /* we're the only one */
951N/A if (count > 1) {
951N/A i_warning("director: Couldn't connect to right side, "
951N/A "we must be the only director left");
951N/A }
951N/A if (dir->left != NULL) {
951N/A /* since we couldn't connect to it,
951N/A it must have failed recently */
951N/A i_warning("director: Assuming %s is dead, disconnecting",
951N/A director_connection_get_name(dir->left));
951N/A director_connection_deinit(&dir->left,
951N/A "This connection is dead?");
951N/A }
951N/A dir->ring_min_version = DIRECTOR_VERSION_MINOR;
951N/A if (!dir->ring_handshaked)
951N/A director_set_ring_handshaked(dir);
951N/A else
951N/A director_set_ring_synced(dir);
951N/A}
951N/A
951N/Avoid director_set_ring_handshaked(struct director *dir)
951N/A{
951N/A i_assert(!dir->ring_handshaked);
951N/A
951N/A if (dir->to_handshake_warning != NULL)
951N/A timeout_remove(&dir->to_handshake_warning);
951N/A if (dir->ring_handshake_warning_sent) {
951N/A i_warning("Directors have been connected, "
951N/A "continuing delayed requests");
951N/A dir->ring_handshake_warning_sent = FALSE;
951N/A }
951N/A dir_debug("Director ring handshaked");
951N/A
951N/A dir->ring_handshaked = TRUE;
951N/A director_set_ring_synced(dir);
951N/A}
951N/A
951N/Astatic void director_reconnect_timeout(struct director *dir)
951N/A{
951N/A struct director_host *cur_host, *preferred_host =
951N/A director_get_preferred_right_host(dir);
951N/A
951N/A cur_host = dir->right == NULL ? NULL :
951N/A director_connection_get_host(dir->right);
951N/A
951N/A if (preferred_host == NULL) {
951N/A /* all directors have been removed, try again later */
951N/A } else if (cur_host != preferred_host)
951N/A (void)director_connect_host(dir, preferred_host);
951N/A else {
951N/A /* the connection hasn't finished sync yet.
951N/A keep this timeout for now. */
951N/A }
951N/A}
951N/A
951N/Avoid director_set_ring_synced(struct director *dir)
951N/A{
951N/A struct director_host *host;
951N/A
951N/A i_assert(!dir->ring_synced);
951N/A i_assert((dir->left != NULL && dir->right != NULL) ||
951N/A (dir->left == NULL && dir->right == NULL));
951N/A
951N/A if (dir->to_handshake_warning != NULL)
951N/A timeout_remove(&dir->to_handshake_warning);
951N/A if (dir->ring_handshake_warning_sent) {
951N/A i_warning("Ring is synced, continuing delayed requests "
951N/A "(syncing took %d secs, hosts_hash=%u)",
951N/A (int)(ioloop_time - dir->ring_last_sync_time),
951N/A mail_hosts_hash(dir->mail_hosts));
951N/A dir->ring_handshake_warning_sent = FALSE;
951N/A }
951N/A
951N/A host = dir->right == NULL ? NULL :
951N/A director_connection_get_host(dir->right);
951N/A
951N/A if (dir->to_reconnect != NULL)
951N/A timeout_remove(&dir->to_reconnect);
951N/A if (host != director_get_preferred_right_host(dir)) {
951N/A /* try to reconnect to preferred host later */
951N/A dir->to_reconnect =
951N/A timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
951N/A director_reconnect_timeout, dir);
951N/A }
951N/A
951N/A if (dir->left != NULL)
951N/A director_connection_set_synced(dir->left, TRUE);
951N/A if (dir->right != NULL)
951N/A director_connection_set_synced(dir->right, TRUE);
951N/A if (dir->to_sync != NULL)
951N/A timeout_remove(&dir->to_sync);
951N/A dir->ring_synced = TRUE;
951N/A dir->ring_last_sync_time = ioloop_time;
951N/A mail_hosts_set_synced(dir->mail_hosts);
951N/A director_set_state_changed(dir);
951N/A}
951N/A
951N/Avoid director_sync_send(struct director *dir, struct director_host *host,
951N/A uint32_t seq, unsigned int minor_version,
951N/A unsigned int timestamp, unsigned int hosts_hash)
951N/A{
951N/A string_t *str;
951N/A
951N/A str = t_str_new(128);
951N/A str_printfa(str, "SYNC\t%s\t%u\t%u",
951N/A net_ip2addr(&host->ip), host->port, seq);
951N/A if (minor_version > 0 &&
951N/A director_connection_get_minor_version(dir->right) > 0) {
951N/A /* only minor_version>0 supports extra parameters */
951N/A str_printfa(str, "\t%u\t%u\t%u", minor_version,
951N/A timestamp, hosts_hash);
951N/A }
951N/A str_append_c(str, '\n');
951N/A director_connection_send(dir->right, str_c(str));
951N/A
951N/A /* ping our connections in case either of them are hanging.
951N/A if they are, we want to know it fast. */
951N/A if (dir->left != NULL)
951N/A director_connection_ping(dir->left);
951N/A director_connection_ping(dir->right);
951N/A}
951N/A
951N/Abool director_resend_sync(struct director *dir)
951N/A{
951N/A if (!dir->ring_synced && dir->left != NULL && dir->right != NULL) {
951N/A /* send a new SYNC in case the previous one got dropped */
951N/A dir->self_host->last_sync_timestamp = ioloop_time;
951N/A director_sync_send(dir, dir->self_host, dir->sync_seq,
951N/A DIRECTOR_VERSION_MINOR, ioloop_time,
951N/A mail_hosts_hash(dir->mail_hosts));
951N/A if (dir->to_sync != NULL)
951N/A timeout_reset(dir->to_sync);
951N/A return TRUE;
951N/A }
951N/A return FALSE;
951N/A}
951N/A
951N/Astatic void director_sync_timeout(struct director *dir)
951N/A{
951N/A i_assert(!dir->ring_synced);
951N/A
951N/A if (director_resend_sync(dir))
951N/A i_error("Ring SYNC appears to have got lost, resending");
951N/A}
951N/A
951N/Avoid director_set_ring_unsynced(struct director *dir)
951N/A{
951N/A if (dir->ring_synced) {
951N/A dir->ring_synced = FALSE;
951N/A dir->ring_last_sync_time = ioloop_time;
951N/A }
951N/A
951N/A if (dir->to_sync == NULL) {
951N/A dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
951N/A director_sync_timeout, dir);
951N/A } else {
951N/A timeout_reset(dir->to_sync);
951N/A }
951N/A}
951N/A
951N/Astatic void director_sync(struct director *dir)
951N/A{
951N/A /* we're synced again when we receive this SYNC back */
951N/A dir->sync_seq++;
951N/A if (dir->right == NULL && dir->left == NULL) {
951N/A /* we're alone. if we're already synced,
951N/A don't become unsynced. */
951N/A return;
951N/A }
951N/A director_set_ring_unsynced(dir);
951N/A
951N/A if (dir->sync_frozen) {
951N/A dir->sync_pending = TRUE;
951N/A return;
951N/A }
951N/A if (dir->right == NULL) {
951N/A i_assert(!dir->ring_synced ||
951N/A (dir->left == NULL && dir->right == NULL));
951N/A return;
951N/A }
951N/A
951N/A dir_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
951N/A dir->sync_seq, dir->right == NULL ? "(nowhere)" :
951N/A director_connection_get_name(dir->right));
951N/A
951N/A /* send PINGs to our connections more rapidly until we've synced again.
951N/A if the connection has actually died, we don't need to wait (and
951N/A delay requests) for as long to detect it */
951N/A if (dir->left != NULL)
951N/A director_connection_set_synced(dir->left, FALSE);
951N/A director_connection_set_synced(dir->right, FALSE);
951N/A director_sync_send(dir, dir->self_host, dir->sync_seq,
951N/A DIRECTOR_VERSION_MINOR, ioloop_time,
951N/A mail_hosts_hash(dir->mail_hosts));
951N/A}
951N/A
951N/Avoid director_sync_freeze(struct director *dir)
951N/A{
951N/A struct director_connection *const *connp;
951N/A
951N/A i_assert(!dir->sync_frozen);
951N/A i_assert(!dir->sync_pending);
951N/A
951N/A array_foreach(&dir->connections, connp)
951N/A director_connection_cork(*connp);
951N/A dir->sync_frozen = TRUE;
951N/A}
951N/A
951N/Avoid director_sync_thaw(struct director *dir)
951N/A{
951N/A struct director_connection *const *connp;
951N/A
951N/A i_assert(dir->sync_frozen);
951N/A
951N/A dir->sync_frozen = FALSE;
951N/A if (dir->sync_pending) {
951N/A dir->sync_pending = FALSE;
951N/A director_sync(dir);
951N/A }
951N/A array_foreach(&dir->connections, connp)
951N/A director_connection_uncork(*connp);
951N/A}
951N/A
951N/Avoid director_notify_ring_added(struct director_host *added_host,
951N/A struct director_host *src)
951N/A{
951N/A const char *cmd;
951N/A
951N/A cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
951N/A net_ip2addr(&added_host->ip), added_host->port);
951N/A director_update_send(added_host->dir, src, cmd);
951N/A}
951N/A
951N/Astatic void director_delayed_dir_remove_timeout(struct director *dir)
951N/A{
951N/A struct director_host *const *hosts, *host;
951N/A unsigned int i, count;
951N/A
951N/A timeout_remove(&dir->to_remove_dirs);
951N/A
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A for (i = 0; i < count; ) {
951N/A if (hosts[i]->removed) {
951N/A host = hosts[i];
951N/A director_host_free(&host);
951N/A hosts = array_get(&dir->dir_hosts, &count);
951N/A } else {
951N/A i++;
951N/A }
951N/A }
951N/A}
951N/A
951N/Avoid director_ring_remove(struct director_host *removed_host,
951N/A struct director_host *src)
951N/A{
951N/A struct director *dir = removed_host->dir;
951N/A struct director_connection *const *conns, *conn;
951N/A unsigned int i, count;
951N/A const char *cmd;
951N/A
951N/A if (removed_host->self) {
951N/A /* others will just disconnect us */
951N/A return;
951N/A }
951N/A
951N/A /* mark the host as removed and fully remove it later. this delay is
951N/A needed, because the removal may trigger director reconnections,
951N/A which may send the director back and we don't want to re-add it */
951N/A removed_host->removed = TRUE;
951N/A if (dir->to_remove_dirs == NULL) {
951N/A dir->to_remove_dirs =
951N/A timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
951N/A director_delayed_dir_remove_timeout, dir);
951N/A }
951N/A
951N/A /* disconnect any connections to the host */
951N/A conns = array_get(&dir->connections, &count);
951N/A for (i = 0; i < count; ) {
951N/A conn = conns[i];
951N/A if (director_connection_get_host(conn) != removed_host)
951N/A i++;
951N/A else {
951N/A director_connection_deinit(&conn, "Removing from ring");
951N/A conns = array_get(&dir->connections, &count);
951N/A }
951N/A }
951N/A if (dir->right == NULL)
951N/A director_connect(dir);
951N/A
951N/A cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
951N/A net_ip2addr(&removed_host->ip),
951N/A removed_host->port);
951N/A director_update_send_version(dir, src,
951N/A DIRECTOR_VERSION_RING_REMOVE, cmd);
951N/A}
951N/A
951N/Astatic void
951N/Adirector_send_host(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A struct mail_host *host)
951N/A{
951N/A const char *host_tag = mail_host_get_tag(host);
951N/A string_t *str;
951N/A
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A
951N/A str = t_str_new(128);
951N/A str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u",
951N/A net_ip2addr(&orig_src->ip), orig_src->port,
951N/A orig_src->last_seq,
951N/A net_ip2addr(&host->ip), host->vhost_count);
951N/A if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) {
951N/A str_append_c(str, '\t');
951N/A str_append_tabescaped(str, host_tag);
951N/A } else if (host_tag[0] != '\0' &&
951N/A dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) {
951N/A if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) {
951N/A i_error("Ring has directors that don't support tags - removing host %s with tag '%s'",
951N/A net_ip2addr(&host->ip), host_tag);
951N/A } else {
951N/A i_error("Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
951N/A net_ip2addr(&host->ip), host_tag);
951N/A }
951N/A director_remove_host(dir, NULL, NULL, host);
951N/A return;
951N/A }
951N/A if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) {
951N/A str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
951N/A (long)host->last_updown_change);
951N/A /* add any further version checks here - these directors ignore
951N/A any extra unknown arguments */
951N/A if (host->hostname != NULL)
951N/A str_append_tabescaped(str, host->hostname);
951N/A }
951N/A str_append_c(str, '\n');
951N/A director_update_send(dir, src, str_c(str));
951N/A}
951N/A
951N/Avoid director_resend_hosts(struct director *dir)
951N/A{
951N/A struct mail_host *const *hostp;
951N/A
951N/A array_foreach(mail_hosts_get(dir->mail_hosts), hostp)
951N/A director_send_host(dir, dir->self_host, NULL, *hostp);
951N/A}
951N/A
951N/Avoid director_update_host(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A struct mail_host *host)
951N/A{
951N/A /* update state in case this is the first mail host being added */
951N/A director_set_state_changed(dir);
951N/A
951N/A dir_debug("Updating host %s vhost_count=%u "
951N/A "down=%d last_updown_change=%ld (hosts_hash=%u)",
951N/A net_ip2addr(&host->ip), host->vhost_count, host->down,
951N/A (long)host->last_updown_change,
951N/A mail_hosts_hash(dir->mail_hosts));
951N/A
951N/A director_send_host(dir, src, orig_src, host);
951N/A
951N/A /* mark the host desynced until ring is synced again. except if we're
951N/A alone in the ring that never happens. */
951N/A if (dir->right != NULL || dir->left != NULL)
951N/A host->desynced = TRUE;
951N/A director_sync(dir);
951N/A}
951N/A
951N/Avoid director_remove_host(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A struct mail_host *host)
951N/A{
951N/A if (src != NULL) {
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A
951N/A director_update_send(dir, src, t_strdup_printf(
951N/A "HOST-REMOVE\t%s\t%u\t%u\t%s\n",
951N/A net_ip2addr(&orig_src->ip), orig_src->port,
951N/A orig_src->last_seq, net_ip2addr(&host->ip)));
951N/A }
951N/A
951N/A user_directory_remove_host(dir->users, host);
951N/A mail_host_remove(host);
951N/A director_sync(dir);
951N/A}
951N/A
951N/Avoid director_flush_host(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A struct mail_host *host)
951N/A{
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A
951N/A director_update_send(dir, src, t_strdup_printf(
951N/A "HOST-FLUSH\t%s\t%u\t%u\t%s\n",
951N/A net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
951N/A net_ip2addr(&host->ip)));
951N/A user_directory_remove_host(dir->users, host);
951N/A director_sync(dir);
951N/A}
951N/A
951N/Avoid director_update_user(struct director *dir, struct director_host *src,
951N/A struct user *user)
951N/A{
951N/A i_assert(src != NULL);
951N/A
951N/A i_assert(!user->weak);
951N/A director_update_send(dir, src, t_strdup_printf("USER\t%u\t%s\n",
951N/A user->username_hash, net_ip2addr(&user->host->ip)));
951N/A}
951N/A
951N/Avoid director_update_user_weak(struct director *dir, struct director_host *src,
951N/A struct director_connection *src_conn,
951N/A struct director_host *orig_src,
951N/A struct user *user)
951N/A{
951N/A const char *cmd;
951N/A
951N/A i_assert(src != NULL);
951N/A i_assert(user->weak);
951N/A
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A
951N/A cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
951N/A net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
951N/A user->username_hash, net_ip2addr(&user->host->ip));
951N/A
951N/A if (src != dir->self_host && dir->left != NULL && dir->right != NULL &&
951N/A director_connection_get_host(dir->left) ==
951N/A director_connection_get_host(dir->right)) {
951N/A /* only two directors in this ring and we're forwarding
951N/A USER-WEAK from one director back to itself via another
951N/A so it sees we've received it. we can't use
951N/A director_update_send() for this, because it doesn't send
951N/A data back to the source. */
951N/A if (dir->right == src_conn)
951N/A director_connection_send(dir->left, cmd);
951N/A else if (dir->left == src_conn)
951N/A director_connection_send(dir->right, cmd);
951N/A else
951N/A i_unreached();
951N/A } else {
951N/A director_update_send(dir, src, cmd);
951N/A }
951N/A}
951N/A
951N/Astruct director_user_kill_finish_ctx {
951N/A struct director *dir;
951N/A struct user *user;
951N/A};
951N/A
951N/Astatic void
951N/Adirector_user_kill_finish_delayed_to(struct director_user_kill_finish_ctx *ctx)
951N/A{
951N/A i_assert(ctx->user->kill_state == USER_KILL_STATE_DELAY);
951N/A
951N/A ctx->user->kill_state = USER_KILL_STATE_NONE;
951N/A timeout_remove(&ctx->user->to_move);
951N/A
951N/A ctx->dir->state_change_callback(ctx->dir);
951N/A i_free(ctx);
951N/A}
951N/A
951N/Astatic void
951N/Adirector_user_kill_finish_delayed(struct director *dir, struct user *user)
951N/A{
951N/A struct director_user_kill_finish_ctx *ctx;
951N/A
951N/A ctx = i_new(struct director_user_kill_finish_ctx, 1);
951N/A ctx->dir = dir;
951N/A ctx->user = user;
951N/A
951N/A user->kill_state = USER_KILL_STATE_DELAY;
951N/A timeout_remove(&user->to_move);
951N/A
951N/A /* wait for a while for the kills to finish in the backend server,
951N/A so there are no longer any processes running for the user before we
951N/A start letting new in connections to the new server. */
951N/A user->to_move = timeout_add(dir->set->director_user_kick_delay * 1000,
951N/A director_user_kill_finish_delayed_to, ctx);
951N/A}
951N/A
951N/Astruct director_kill_context {
951N/A struct director *dir;
951N/A unsigned int username_hash;
951N/A bool self;
951N/A};
951N/A
951N/Astatic void
951N/Adirector_finish_user_kill(struct director *dir, struct user *user, bool self)
951N/A{
951N/A if (dir->right == NULL) {
951N/A /* we're alone */
951N/A director_user_kill_finish_delayed(dir, user);
951N/A } else if (self ||
951N/A user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
951N/A director_connection_send(dir->right, t_strdup_printf(
951N/A "USER-KILLED\t%u\n", user->username_hash));
951N/A user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
951N/A } else {
951N/A i_assert(user->kill_state == USER_KILL_STATE_KILLING);
951N/A user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
951N/A }
951N/A}
951N/A
951N/Astatic void director_kill_user_callback(enum ipc_client_cmd_state state,
951N/A const char *data, void *context)
951N/A{
951N/A struct director_kill_context *ctx = context;
951N/A struct user *user;
951N/A
951N/A switch (state) {
951N/A case IPC_CLIENT_CMD_STATE_REPLY:
951N/A return;
951N/A case IPC_CLIENT_CMD_STATE_OK:
951N/A break;
951N/A case IPC_CLIENT_CMD_STATE_ERROR:
951N/A i_error("Failed to kill user %u connections: %s",
951N/A ctx->username_hash, data);
951N/A /* we can't really do anything but continue anyway */
951N/A break;
951N/A }
951N/A
951N/A user = user_directory_lookup(ctx->dir->users, ctx->username_hash);
951N/A if (user == NULL || user->kill_state == USER_KILL_STATE_NONE)
951N/A return;
951N/A
951N/A director_finish_user_kill(ctx->dir, user, ctx->self);
951N/A}
951N/A
951N/Astatic void director_user_move_timeout(struct user *user)
951N/A{
951N/A i_error("Finishing user %u move timed out, "
951N/A "its state may now be inconsistent", user->username_hash);
951N/A
951N/A user->kill_state = USER_KILL_STATE_NONE;
951N/A timeout_remove(&user->to_move);
951N/A}
951N/A
951N/Avoid director_move_user(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A unsigned int username_hash, struct mail_host *host)
951N/A{
951N/A struct user *user;
951N/A const char *cmd;
951N/A struct director_kill_context *ctx;
951N/A
951N/A /* 1. move this user's host, and set its "killing" flag to delay all of
951N/A its future connections until all directors have killed the
951N/A connections and notified us about it.
951N/A
951N/A 2. tell the other directors about the move
951N/A
951N/A 3. once user kill callback is called, tell the other directors
951N/A with USER-KILLED that we're done killing the user.
951N/A
951N/A 4. when some director gets a duplicate USER-KILLED, it's
951N/A responsible for notifying all directors that user is completely
951N/A killed.
951N/A
951N/A 5. after receiving USER-KILLED-EVERYWHERE notification,
951N/A new connections are again allowed for the user.
951N/A */
951N/A user = user_directory_lookup(dir->users, username_hash);
951N/A if (user == NULL) {
951N/A user = user_directory_add(dir->users, username_hash,
951N/A host, ioloop_time);
951N/A } else {
951N/A if (user->host == host) {
951N/A /* user is already in this host */
951N/A return;
951N/A }
951N/A user->host->user_count--;
951N/A user->host = host;
951N/A user->host->user_count++;
951N/A user->timestamp = ioloop_time;
951N/A }
951N/A if (user->kill_state == USER_KILL_STATE_NONE) {
951N/A ctx = i_new(struct director_kill_context, 1);
951N/A ctx->dir = dir;
951N/A ctx->username_hash = username_hash;
951N/A ctx->self = src->self;
951N/A
951N/A user->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
951N/A director_user_move_timeout, user);
951N/A user->kill_state = USER_KILL_STATE_KILLING;
951N/A cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u",
951N/A username_hash);
951N/A ipc_client_cmd(dir->ipc_proxy, cmd,
951N/A director_kill_user_callback, ctx);
951N/A }
951N/A
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A director_update_send(dir, src, t_strdup_printf(
951N/A "USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
951N/A net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
951N/A user->username_hash, net_ip2addr(&user->host->ip)));
951N/A}
951N/A
951N/Astatic void
951N/Adirector_kick_user_callback(enum ipc_client_cmd_state state ATTR_UNUSED,
951N/A const char *data ATTR_UNUSED,
951N/A void *context ATTR_UNUSED)
951N/A{
951N/A}
951N/A
951N/Avoid director_kick_user(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src, const char *username)
951N/A{
951N/A const char *cmd;
951N/A
951N/A cmd = t_strdup_printf("proxy\t*\tKICK\t%s", username);
951N/A ipc_client_cmd(dir->ipc_proxy, cmd,
951N/A director_kick_user_callback, (void *)NULL);
951N/A
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
951N/A cmd = t_strdup_printf("USER-KICK\t%s\t%u\t%u\t%s\n",
951N/A net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
951N/A username);
951N/A director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
951N/A}
951N/A
951N/Avoid director_kick_user_hash(struct director *dir, struct director_host *src,
951N/A struct director_host *orig_src,
951N/A unsigned int username_hash,
951N/A const struct ip_addr *except_ip)
951N/A{
951N/A const char *cmd;
951N/A
951N/A cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s",
951N/A username_hash, net_ip2addr(except_ip));
951N/A ipc_client_cmd(dir->ipc_proxy, cmd,
951N/A director_kick_user_callback, (void *)NULL);
951N/A
951N/A if (orig_src == NULL) {
951N/A orig_src = dir->self_host;
951N/A orig_src->last_seq++;
951N/A }
cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
username_hash, net_ip2addr(except_ip));
director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
}
void director_user_killed(struct director *dir, unsigned int username_hash)
{
struct user *user;
user = user_directory_lookup(dir->users, username_hash);
if (user == NULL)
return;
switch (user->kill_state) {
case USER_KILL_STATE_KILLING:
user->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
break;
case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
director_finish_user_kill(dir, user, TRUE);
break;
case USER_KILL_STATE_NONE:
case USER_KILL_STATE_DELAY:
case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
break;
case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE:
director_user_killed_everywhere(dir, dir->self_host,
NULL, username_hash);
break;
}
}
void director_user_killed_everywhere(struct director *dir,
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
struct user *user;
user = user_directory_lookup(dir->users, username_hash);
if (user == NULL ||
user->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE)
return;
director_user_kill_finish_delayed(dir, user);
if (orig_src == NULL) {
orig_src = dir->self_host;
orig_src->last_seq++;
}
director_update_send(dir, src, t_strdup_printf(
"USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
user->username_hash));
}
static void director_state_callback_timeout(struct director *dir)
{
timeout_remove(&dir->to_callback);
dir->state_change_callback(dir);
}
void director_set_state_changed(struct director *dir)
{
/* we may get called to here from various places. use a timeout to
make sure the state callback is called with a clean state. */
if (dir->to_callback == NULL) {
dir->to_callback =
timeout_add(0, director_state_callback_timeout, dir);
}
}
void director_update_send(struct director *dir, struct director_host *src,
const char *cmd)
{
director_update_send_version(dir, src, 0, cmd);
}
void director_update_send_version(struct director *dir,
struct director_host *src,
unsigned int min_version, const char *cmd)
{
struct director_connection *const *connp;
i_assert(src != NULL);
array_foreach(&dir->connections, connp) {
if (director_connection_get_host(*connp) != src &&
director_connection_get_minor_version(*connp) >= min_version)
director_connection_send(*connp, cmd);
}
}
struct director *
director_init(const struct director_settings *set,
const struct ip_addr *listen_ip, in_port_t listen_port,
director_state_change_callback_t *callback)
{
struct director *dir;
dir = i_new(struct director, 1);
dir->set = set;
dir->self_port = listen_port;
dir->self_ip = *listen_ip;
dir->state_change_callback = callback;
i_array_init(&dir->dir_hosts, 16);
i_array_init(&dir->pending_requests, 16);
i_array_init(&dir->connections, 8);
dir->users = user_directory_init(set->director_user_expire,
set->director_username_hash);
dir->mail_hosts = mail_hosts_init(set->director_consistent_hashing);
dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
dir->ring_min_version = DIRECTOR_VERSION_MINOR;
return dir;
}
void director_deinit(struct director **_dir)
{
struct director *dir = *_dir;
struct director_host *const *hostp, *host;
struct director_connection *conn, *const *connp;
*_dir = NULL;
while (array_count(&dir->connections) > 0) {
connp = array_idx(&dir->connections, 0);
conn = *connp;
director_connection_deinit(&conn, "Shutting down");
}
user_directory_deinit(&dir->users);
mail_hosts_deinit(&dir->mail_hosts);
mail_hosts_deinit(&dir->orig_config_hosts);
ipc_client_deinit(&dir->ipc_proxy);
if (dir->to_reconnect != NULL)
timeout_remove(&dir->to_reconnect);
if (dir->to_handshake_warning != NULL)
timeout_remove(&dir->to_handshake_warning);
if (dir->to_request != NULL)
timeout_remove(&dir->to_request);
if (dir->to_sync != NULL)
timeout_remove(&dir->to_sync);
if (dir->to_remove_dirs != NULL)
timeout_remove(&dir->to_remove_dirs);
if (dir->to_callback != NULL)
timeout_remove(&dir->to_callback);
while (array_count(&dir->dir_hosts) > 0) {
hostp = array_idx(&dir->dir_hosts, 0);
host = *hostp;
director_host_free(&host);
}
array_free(&dir->pending_requests);
array_free(&dir->dir_hosts);
array_free(&dir->connections);
i_free(dir);
}
void dir_debug(const char *fmt, ...)
{
va_list args;
if (!director_debug)
return;
va_start(args, fmt);
T_BEGIN {
i_debug("%s", t_strdup_vprintf(fmt, args));
} T_END;
va_end(args);
}