director.c revision 3751234328db786e53680f4df21a4d10b446e252
/* Copyright (c) 2010-2016 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "str.h"
#include "strescape.h"
#include "log-throttle.h"
#include "ipc-client.h"
#include "program-client.h"
#include "var-expand.h"
#include "istream.h"
#include "ostream.h"
#include "iostream-temp.h"
#include "user-directory.h"
#include "mail-host.h"
#include "director-host.h"
#include "director-connection.h"
#include "director.h"
#define DIRECTOR_IPC_PROXY_PATH "ipc"
#define DIRECTOR_DNS_SOCKET_PATH "dns-client"
#define DIRECTOR_RECONNECT_RETRY_SECS 60
#define DIRECTOR_RING_MIN_WAIT_SECS 20
#define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
bool director_debug;
static struct log_throttle *user_move_throttle;
static struct log_throttle *user_kill_fail_throttle;
static const struct log_throttle_settings director_log_throttle_settings = {
.throttle_at_max_per_interval = 100,
};
static void
bool skip_delay);
{
return FALSE;
return FALSE;
return TRUE;
}
{
struct director_host *const *hosts;
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_fatal("director_servers doesn't list ourself");
}
{
return;
if (!director_is_self_ip_set(dir))
i_fatal("director_servers doesn't list ourself (%s:%u)",
}
}
{
struct director_host *const *hosts;
unsigned int i, count;
for (i = 0; i < count; i++) {
return i;
}
i_unreached();
}
static bool
struct director_host *host)
{
struct director_connection *const *connp;
return TRUE;
}
return FALSE;
}
{
int fd;
return 0;
if (director_debug) {
if (host->last_network_failure > 0) {
}
if (host->last_protocol_failure > 0) {
}
}
if (fd == -1) {
return -1;
}
/* Reset timestamp so that director_connect() won't skip this host
while we're still trying to connect to it */
host->last_network_failure = 0;
return 0;
}
static struct director_host *
{
if (count == 1) {
/* self */
return NULL;
}
for (i = 0; i < count; i++) {
return host;
}
/* self, with some removed hosts */
return NULL;
}
{
struct director_host *const *hostp;
/* don't assume we're alone until we've attempted to connect
to others for a while */
if (dir->ring_first_alone != 0 &&
return FALSE;
if (dir->ring_first_alone == 0)
/* reset all failures and try again */
(*hostp)->last_network_failure = 0;
(*hostp)->last_protocol_failure = 0;
}
return TRUE;
}
{
struct director_host *const *hosts;
/* try to connect to first working server on our right side.
the left side is supposed to connect to us. */
for (i = 1; i < count; i++) {
continue;
/* connection failed recently, don't try retrying here */
continue;
}
/* the director recently sent invalid protocol data,
don't try retrying yet */
continue;
}
/* success */
return;
}
}
return;
/* we're the only one */
if (count > 1) {
i_warning("director: Couldn't connect to right side, "
"we must be the only director left");
}
/* since we couldn't connect to it,
it must have failed recently */
i_warning("director: Assuming %s is dead, disconnecting",
"This connection is dead?");
}
if (!dir->ring_handshaked)
else
}
{
if (dir->ring_handshake_warning_sent) {
i_warning("Directors have been connected, "
"continuing delayed requests");
}
dir_debug("Director ring handshaked");
}
{
if (preferred_host == NULL) {
/* all directors have been removed, try again later */
} else if (cur_host != preferred_host)
else {
/* the connection hasn't finished sync yet.
keep this timeout for now. */
}
}
{
struct director_host *host;
if (dir->ring_handshake_warning_sent) {
i_warning("Ring is synced, continuing delayed requests "
"(syncing took %d secs, hosts_hash=%u)",
}
/* try to reconnect to preferred host later */
dir->to_reconnect =
}
}
unsigned int timestamp, unsigned int hosts_hash)
{
if (minor_version > 0 &&
/* only minor_version>0 supports extra parameters */
}
/* ping our connections in case either of them are hanging.
if they are, we want to know it fast. */
}
{
/* send a new SYNC in case the previous one got dropped */
return TRUE;
}
return FALSE;
}
{
if (director_resend_sync(dir))
i_error("Ring SYNC appears to have got lost, resending");
}
{
if (dir->ring_synced) {
}
} else {
}
}
{
/* we're synced again when we receive this SYNC back */
/* we're alone. if we're already synced,
don't become unsynced. */
return;
}
if (dir->sync_frozen) {
return;
}
return;
}
dir_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
/* send PINGs to our connections more rapidly until we've synced again.
if the connection has actually died, we don't need to wait (and
delay requests) for as long to detect it */
}
{
struct director_connection *const *connp;
}
{
struct director_connection *const *connp;
if (dir->sync_pending) {
}
}
struct director_host *src)
{
const char *cmd;
}
{
unsigned int i, count;
for (i = 0; i < count; ) {
} else {
i++;
}
}
}
struct director_host *src)
{
unsigned int i, count;
const char *cmd;
if (removed_host->self) {
/* others will just disconnect us */
return;
}
/* mark the host as removed and fully remove it later. this delay is
needed, because the removal may trigger director reconnections,
which may send the director back and we don't want to re-add it */
}
/* disconnect any connections to the host */
for (i = 0; i < count; ) {
i++;
else {
}
}
removed_host->port);
}
static void
struct director_host *orig_src,
{
}
} else if (host_tag[0] != '\0' &&
i_error("Ring has directors that don't support tags - removing host %s with tag '%s'",
} else {
i_error("Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
}
return;
}
(long)host->last_updown_change);
/* add any further version checks here - these directors ignore
any extra unknown arguments */
}
}
{
}
struct director_host *orig_src,
{
/* update state in case this is the first mail host being added */
dir_debug("Updating host %s vhost_count=%u "
"down=%d last_updown_change=%ld (hosts_hash=%u)",
(long)host->last_updown_change,
/* mark the host desynced until ring is synced again. except if we're
alone in the ring that never happens. */
}
struct director_host *orig_src,
{
}
"HOST-REMOVE\t%s\t%u\t%u\t%s\n",
}
}
struct director_host *orig_src,
{
}
"HOST-FLUSH\t%s\t%u\t%u\t%s\n",
}
{
}
struct director_connection *src_conn,
struct director_host *orig_src,
{
const char *cmd;
}
/* only two directors in this ring and we're forwarding
USER-WEAK from one director back to itself via another
so it sees we've received it. we can't use
director_update_send() for this, because it doesn't send
data back to the source. */
else
i_unreached();
} else {
}
}
struct director_user_kill_finish_ctx {
unsigned int username_hash;
struct program_client *pclient;
char *socket_path;
};
static void
struct director_user_kill_finish_ctx *ctx)
{
result == 1);
if (result == 0) {
char *data;
i_error("%s: Failed to flush user hash %u in host %s: %s",
i_error("%s: Failed to flush user hash %u in host %s: %s",
data);
}
i_stream_unref(&is);
} else {
}
}
static void
{
struct var_expand_table tab[] = {
};
/* execute flush script, if set */
return;
}
struct director_user_kill_finish_ctx *ctx =
const char *error;
struct program_client_settings set = {
.client_connect_timeout_msecs = 10000,
};
const char *const args[] = {"FLUSH",
i_error("%s: Failed to flush user hash %u in host %s: %s",
error);
return;
}
iostream_temp_create_named("/tmp", 0,
t_strdup_printf("flush response from %s",
}
{
}
static void
{
}
static void
bool skip_delay)
{
struct director_user_kill_finish_ctx *ctx;
if (skip_delay) {
return;
}
/* wait for a while for the kills to finish in the backend server,
so there are no longer any processes running for the user before we
start letting new in connections to the new server. */
}
struct director_kill_context {
unsigned int username_hash;
bool self;
};
static void
{
/* we're alone */
} else if (self ||
} else {
}
}
static void director_user_kill_fail_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
}
{
/* this is an asynchronous notification about user being killed.
there are no guarantees about what might have happened to the user
in the mean time. */
switch (state) {
/* shouldn't get here. the command reply isn't finished yet. */
return;
case IPC_CLIENT_CMD_STATE_OK:
break;
i_error("Failed to kill user %u connections: %s",
}
/* we can't really do anything but continue anyway */
break;
}
/* user was already freed - ignore */
/* we were still waiting for the kill notification */
} else {
/* we don't currently want to kill the user */
}
}
static void director_user_move_throttled(unsigned int new_events_count,
void *context ATTR_UNUSED)
{
i_error("%u users' move timed out, their state may now be inconsistent",
}
{
i_error("Finishing user %u move timed out, "
}
/* FIXME: shouldn't use global director, but for now there's no easy
way to get access to it otherwise */
}
struct director_host *orig_src,
{
const char *cmd;
struct director_kill_context *ctx;
/* 1. move this user's host, and set its "killing" flag to delay all of
its future connections until all directors have killed the
connections and notified us about it.
2. tell the other directors about the move
3. once user kill callback is called, tell the other directors
with USER-KILLED that we're done killing the user.
4. when some director gets a duplicate USER-KILLED, it's
responsible for notifying all directors that user is completely
killed.
5. after receiving USER-KILLED-EVERYWHERE notification,
new connections are again allowed for the user.
*/
host, ioloop_time);
} else {
/* user is already in this host */
return;
}
}
}
}
"USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
}
static void
const char *data ATTR_UNUSED,
void *context ATTR_UNUSED)
{
}
{
director_kick_user_callback, (void *)NULL);
}
}
struct director_host *orig_src,
{
director_kick_user_callback, (void *)NULL);
}
}
struct director_host *orig_src,
unsigned int username_hash,
{
const char *cmd;
director_kick_user_callback, (void *)NULL);
}
}
{
return;
switch (user->kill_state) {
case USER_KILL_STATE_KILLING:
break;
break;
case USER_KILL_STATE_NONE:
case USER_KILL_STATE_DELAY:
break;
break;
}
}
struct director_host *src,
struct director_host *orig_src,
unsigned int username_hash)
{
return;
}
"USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
user->username_hash));
}
{
}
{
/* we may get called to here from various places. use a timeout to
make sure the state callback is called with a clean state. */
dir->to_callback =
}
}
const char *cmd)
{
}
struct director_host *src,
unsigned int min_version, const char *cmd)
{
struct director_connection *const *connp;
}
}
struct director *
{
return dir;
}
{
}
}
}
{
if (!director_debug)
return;
T_BEGIN {
} T_END;
}
void directors_init(void)
{
}
void directors_deinit(void)
{
}