director.c revision b0421c7397be2146988ee3afb5dcc491c01206cc
/* Copyright (c) 2010-2017 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "str.h"
#include "strescape.h"
#include "log-throttle.h"
#include "ipc-client.h"
#include "program-client.h"
#include "var-expand.h"
#include "istream.h"
#include "ostream.h"
#include "iostream-temp.h"
#include "mail-user-hash.h"
#include "user-directory.h"
#include "mail-host.h"
#include "director-host.h"
#include "director-connection.h"
#include "director.h"
#define DIRECTOR_IPC_PROXY_PATH "ipc"
#define DIRECTOR_DNS_SOCKET_PATH "dns-client"
#define DIRECTOR_RECONNECT_RETRY_SECS 60
#define DIRECTOR_RING_MIN_WAIT_SECS 20
#define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
bool director_debug;
"none",
"killing",
"notify-received",
"waiting-for-notify",
"waiting-for-everyone",
"flushing",
"delay",
};
static struct log_throttle *user_move_throttle;
static struct log_throttle *user_kill_fail_throttle;
static const struct log_throttle_settings director_log_throttle_settings = {
.throttle_at_max_per_interval = 100,
};
static void
bool skip_delay);
{
return FALSE;
return FALSE;
return TRUE;
}
{
struct director_host *const *hosts;
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_fatal("director_servers doesn't list ourself");
}
{
return;
if (!director_is_self_ip_set(dir))
i_fatal("director_servers doesn't list ourself (%s:%u)",
}
}
{
struct director_host *const *hosts;
unsigned int i, count;
for (i = 0; i < count; i++) {
return i;
}
i_unreached();
}
static bool
struct director_host *host)
{
struct director_connection *const *connp;
return TRUE;
}
return FALSE;
}
static void
const char *reason)
{
if (host->last_network_failure > 0) {
}
if (host->last_protocol_failure > 0) {
}
i_info("Connecting to %s:%u (as %s%s): %s",
}
const char *reason)
{
int fd;
return 0;
if (fd == -1) {
return -1;
}
/* Reset timestamp so that director_connect() won't skip this host
while we're still trying to connect to it */
host->last_network_failure = 0;
return 0;
}
static struct director_host *
{
if (count == 1) {
/* self */
return NULL;
}
for (i = 0; i < count; i++) {
return host;
}
/* self, with some removed hosts */
return NULL;
}
{
}
{
struct director_host *const *hostp;
/* don't assume we're alone until we've attempted to connect
to others for a while */
if (dir->ring_first_alone != 0 &&
return FALSE;
if (dir->ring_first_alone == 0)
/* reset all failures and try again */
(*hostp)->last_network_failure = 0;
(*hostp)->last_protocol_failure = 0;
}
return TRUE;
}
{
struct director_host *const *hosts;
/* try to connect to first working server on our right side.
the left side is supposed to connect to us. */
for (i = 1; i < count; i++) {
continue;
/* connection failed recently, don't try retrying here */
continue;
}
/* the director recently sent invalid protocol data,
don't try retrying yet */
continue;
}
/* success */
return;
}
}
return;
/* we're the only one */
if (count > 1) {
i_warning("director: Couldn't connect to right side, "
"we must be the only director left");
}
/* since we couldn't connect to it,
it must have failed recently */
i_warning("director: Assuming %s is dead, disconnecting",
"This connection is dead?");
}
if (!dir->ring_handshaked)
else
}
{
if (dir->ring_handshake_warning_sent) {
i_warning("Directors have been connected, "
"continuing delayed requests");
}
dir_debug("Director ring handshaked");
}
{
if (preferred_host == NULL) {
/* all directors have been removed, try again later */
} else if (cur_host != preferred_host) {
"Reconnect attempt to preferred director");
} else {
/* the connection hasn't finished sync yet.
keep this timeout for now. */
}
}
{
struct director_host *host;
if (dir->ring_handshake_warning_sent) {
i_warning("Ring is synced, continuing delayed requests "
"(syncing took %d secs, hosts_hash=%u)",
}
/* try to reconnect to preferred host later */
dir->to_reconnect =
}
}
unsigned int timestamp, unsigned int hosts_hash)
{
if (minor_version > 0 &&
/* only minor_version>0 supports extra parameters */
}
/* ping our connections in case either of them are hanging.
if they are, we want to know it fast. */
}
{
/* send a new SYNC in case the previous one got dropped */
return TRUE;
}
return FALSE;
}
{
if (director_resend_sync(dir))
}
{
if (dir->ring_synced) {
}
} else {
}
}
{
/* we're synced again when we receive this SYNC back */
/* we're alone. if we're already synced,
don't become unsynced. */
return;
}
if (dir->sync_frozen) {
return;
}
dir_debug("Ring is desynced (seq=%u, no right connection)",
return;
}
dir_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
/* send PINGs to our connections more rapidly until we've synced again.
if the connection has actually died, we don't need to wait (and
delay requests) for as long to detect it */
}
{
struct director_connection *const *connp;
}
{
struct director_connection *const *connp;
if (dir->sync_pending) {
}
}
{
const char *cmd;
if (log) {
i_info("Adding director %s to ring (requested by %s)",
}
}
{
unsigned int i, count;
for (i = 0; i < count; ) {
} else {
i++;
}
}
}
struct director_host *src)
{
unsigned int i, count;
const char *cmd;
i_info("Removing director %s from ring (requested by %s)",
/* others will just disconnect us */
return;
}
if (!removed_host->self) {
/* mark the host as removed and fully remove it later. this
delay is needed, because the removal may trigger director
reconnections, which may send the director back and we don't
want to re-add it */
}
}
/* if our left or ride side gets removed, notify them first
before disconnecting. */
removed_host->port);
/* disconnect any connections to the host */
for (i = 0; i < count; ) {
i++;
else {
}
}
}
static void
struct director_host *orig_src,
{
}
} else if (host_tag[0] != '\0' &&
i_error("Ring has directors that don't support tags - removing host %s with tag '%s'",
} else {
i_error("Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
}
return;
}
(long)host->last_updown_change);
/* add any further version checks here - these directors ignore
any extra unknown arguments */
}
}
{
}
struct director_host *orig_src,
{
/* update state in case this is the first mail host being added */
dir_debug("Updating host %s vhost_count=%u "
"down=%d last_updown_change=%ld (hosts_hash=%u)",
(long)host->last_updown_change,
/* mark the host desynced until ring is synced again. except if we're
alone in the ring that never happens. */
}
struct director_host *orig_src,
{
}
"HOST-REMOVE\t%s\t%u\t%u\t%s\n",
}
}
struct director_host *orig_src,
{
}
"HOST-FLUSH\t%s\t%u\t%u\t%s\n",
}
{
}
struct director_connection *src_conn,
struct director_host *orig_src,
{
const char *cmd;
}
/* only two directors in this ring and we're forwarding
USER-WEAK from one director back to itself via another
so it sees we've received it. we can't use
director_update_send() for this, because it doesn't send
data back to the source. */
else
i_unreached();
} else {
}
}
static void
{
ctx->username_hash);
if (result == 0) {
char *data;
i_error("%s: Failed to flush user hash %u in host %s: %s",
i_error("%s: Failed to flush user hash %u in host %s: %s",
data);
}
i_stream_unref(&is);
} else {
}
/* user was already freed - ignore */
dir_debug("User %u freed while flushing, result=%d",
} else {
/* ctx is freed later via user->kill_ctx */
dir_debug("Flushing user %u finished, result=%d",
}
}
static void
{
struct var_expand_table tab[] = {
};
const char *error;
/* Execute flush script, if set. Only the director that started the
user moving will call the flush script. Having each director do it
would be redundant since they're all supposed to be performing the
same flush task to the same backend.
Flushing is also not triggered if we're moving a user that we just
created due to the user move. This means that the user doesn't have
an old host, so we couldn't really even perform any flushing on the
backend. */
return;
}
i_error("Failed to expand director_flush_socket=%s: %s",
return;
}
struct program_client_settings set = {
.client_connect_timeout_msecs = 10000,
};
const char *const args[] = {
"FLUSH",
};
ctx->socket_path);
i_error("%s: Failed to flush user hash %u in host %s: %s",
error);
return;
}
iostream_temp_create_named("/tmp", 0,
t_strdup_printf("flush response from %s",
}
{
}
{
}
static void
{
}
static void
bool skip_delay)
{
if (skip_delay) {
return;
}
/* wait for a while for the kills to finish in the backend server,
so there are no longer any processes running for the user before we
start letting new in connections to the new server. */
}
static void
{
/* we're alone */
} else if (self ||
} else {
}
}
static void director_user_kill_fail_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
}
{
/* this is an asynchronous notification about user being killed.
there are no guarantees about what might have happened to the user
in the mean time. */
switch (state) {
/* shouldn't get here. the command reply isn't finished yet. */
return;
case IPC_CLIENT_CMD_STATE_OK:
break;
i_error("Failed to kill user %u connections: %s",
}
/* we can't really do anything but continue anyway */
break;
}
/* user was already freed - ignore */
} else {
/* we were still waiting for the kill notification */
}
}
static void director_user_move_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
i_error("%u users' move timed out, their state may now be inconsistent",
}
{
i_error("Finishing user %u move timed out, "
"its state may now be inconsistent (state=%s)",
}
}
}
{
struct director_kill_context *ctx;
const char *cmd;
if (USER_IS_BEING_KILLED(user)) {
/* User is being moved again before the previous move
finished. We'll just continue wherever we left off
earlier. */
dir_debug("User %u move restarted - previous kill_state=%s",
return;
}
}
} else {
/* a) we didn't even know about the user before now.
don't bother performing a local kick, since it wouldn't
kick anything.
b) our host was already correct. notify others that we have
killed the user, but don't really do it. */
}
}
struct director_host *orig_src,
{
/* 1. move this user's host, and set its "killing" flag to delay all of
its future connections until all directors have killed the
connections and notified us about it.
2. tell the other directors about the move
3. once user kill callback is called, tell the other directors
with USER-KILLED that we're done killing the user.
4. when some director gets a duplicate USER-KILLED, it's
responsible for notifying all directors that user is completely
killed.
5. after receiving USER-KILLED-EVERYWHERE notification,
new connections are again allowed for the user.
*/
dir_debug("User %u move started: User was nonexistent",
host, ioloop_time);
/* User is already in the wanted host, but another director
didn't think so. We'll need to finish the move without
killing any of our connections. */
dir_debug("User %u move forwarded: host is already %s",
} else {
/* user is looked up via the new host's tag, so if it's found
the old tag has to be the same. */
dir_debug("User %u move started: host %s -> %s",
}
}
"USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
/* kill the user only after sending the USER-MOVE, because the kill
may finish instantly. */
}
static void
const char *data ATTR_UNUSED,
void *context ATTR_UNUSED)
{
}
{
director_kick_user_callback, (void *)NULL);
}
str_truncate(cmd, 0);
}
struct director_host *orig_src,
{
director_kick_user_callback, (void *)NULL);
}
str_truncate(cmd, 0);
}
struct director_host *orig_src,
unsigned int username_hash,
{
const char *cmd;
director_kick_user_callback, (void *)NULL);
}
}
static void
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
}
"USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
}
static void
unsigned int username_hash)
{
return;
case USER_KILL_STATE_KILLING:
break;
break;
dir_debug("User %u kill_state=%s - ignoring USER-KILLED",
break;
case USER_KILL_STATE_NONE:
case USER_KILL_STATE_FLUSHING:
case USER_KILL_STATE_DELAY:
/* move restarted. state=none can also happen if USER-MOVE was
sent while we were still moving. send back
USER-KILLED-EVERYWHERE to avoid hangs. */
break;
break;
}
}
{
}
static void
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
dir_debug("User %u no longer exists - ignoring USER-KILLED-EVERYWHERE",
return;
}
if (!USER_IS_BEING_KILLED(user)) {
dir_debug("User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE",
return;
}
dir_debug("User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE",
return;
}
}
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
}
}
{
}
{
/* we may get called to here from various places. use a timeout to
make sure the state callback is called with a clean state. */
dir->to_callback =
}
}
const char *cmd)
{
}
struct director_host *src,
unsigned int min_version, const char *cmd)
{
struct director_connection *const *connp;
}
}
{
/* director_user_expire is very short. user expired before
moving the user finished or timed out. */
/* kill_ctx is used as a callback parameter.
only remove the timeout and finish the free later. */
} else {
}
}
}
struct director *
{
return dir;
}
{
}
}
}
{
if (!director_debug)
return;
T_BEGIN {
} T_END;
}
struct director_user_iter {
unsigned int tag_idx;
struct user_directory_iter *user_iter;
};
{
return iter;
}
{
return NULL;
}
return director_iterate_users_next(iter);
} else
return user;
}
{
}
bool
unsigned int *hash_r)
{
const char *error;
&error))
return TRUE;
i_error("Failed to expand director_user_expire=%s: %s",
return FALSE;
}
void directors_init(void)
{
}
void directors_deinit(void)
{
}