director-connection.c revision 02c335c23bf5fa225a467c19f2c063fb0dc7b8c3
/* Copyright (c) 2010-2016 Dovecot authors, see the included COPYING file */
/*
Handshaking:
Incoming director connections send:
VERSION
ME
<wait for DONE from remote handshake>
DONE
<make this connection our "left" connection, potentially disconnecting
another one>
Outgoing director connections send:
VERSION
ME
[0..n] DIRECTOR
HOST-HAND-START
[0..n] HOST
HOST-HAND-END
[0..n] USER
<possibly other non-handshake commands between USERs>
DONE
<wait for DONE from remote>
<make this connection our "right" connection, potentially disconnecting
another one>
*/
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "net.h"
#include "istream.h"
#include "ostream.h"
#include "str.h"
#include "strescape.h"
#include "master-service.h"
#include "mail-host.h"
#include "director.h"
#include "director-host.h"
#include "director-request.h"
#include "user-directory.h"
#include "director-connection.h"
#include <unistd.h>
#define MAX_INBUF_SIZE 1024
/* Max idling time before "ME" command must have been received,
or we'll disconnect. */
/* Max time to wait for USERs in handshake to be sent. With a lot of users the
kernel may quickly eat up everything we send, while the receiver is busy
parsing the data. */
/* Max idling time before "DONE" command must have been received,
or we'll disconnect. */
/* How long to wait for PONG for an idling connection */
/* Maximum time to wait for PONG reply */
/* How long to wait to send PING when connection is idle */
/* How long to wait before sending PING while waiting for SYNC reply */
#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
/* If outgoing director connection exists for less than this many seconds,
mark the host as failed so we won't try to reconnect to it immediately */
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
#define DIRECTOR_WAIT_DISCONNECT_SECS 10
#define DIRECTOR_HANDSHAKE_WARN_SECS 29
#define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
/* If we receive SYNCs with a timestamp this many seconds higher than the last
valid received SYNC timestamp, assume that we lost the director's restart
notification and reset the last_sync_seq */
#define DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS 1
#endif
#endif
#define CMD_IS_USER_HANDHAKE(args) \
#define DIRECTOR_OPT_CONSISTENT_HASHING "consistent-hashing"
struct director_connection {
char *name;
unsigned int minor_version;
/* for incoming connections the director host isn't known until
ME-line is received */
struct director_host *host;
/* this is set only for wrong connections: */
struct director_host *connect_request_to;
int fd;
struct user_directory_iter *user_iter;
/* set during command execution */
unsigned int in:1;
unsigned int connected:1;
unsigned int version_received:1;
unsigned int me_received:1;
unsigned int handshake_received:1;
unsigned int ignore_host_events:1;
unsigned int handshake_sending_hosts:1;
unsigned int ping_waiting:1;
unsigned int synced:1;
unsigned int wrong_host:1;
unsigned int verifying_left:1;
unsigned int users_unsorted:1;
unsigned int done_pending:1;
};
const char *reason);
const char *reason);
static void
const char *errstr);
{
}
static void
{
i_error("director(%s): Connect timed out (%u secs)",
i_error("director(%s): Sending handshake (%u secs)",
} else if (!conn->me_received) {
i_error("director(%s): Handshaking ME timed out (%u secs)",
} else {
i_error("director(%s): Handshaking DONE timed out (%u secs)",
}
}
static void
{
unsigned int msecs;
}
{
"Timeout waiting for disconnect after CONNECT");
}
struct director_host *host)
{
const char *connect_str;
return;
/* wait for a while for the remote to disconnect, so it will hopefully
see our CONNECT command. we'll also log the warning later to avoid
multiple log lines about it. */
}
{
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
}
}
{
/* make sure this is the correct incoming connection */
i_error("Connection from self, dropping");
return FALSE;
/* no conflicts yet */
i_warning("Replacing left director connection %s with %s",
/* we're waiting to verify if our current left is still
working. if we don't receive a PONG, the current left
gets disconnected and a new left gets assigned. if we do
receive a PONG, we'll wait until the current left
disconnects us and then reassign the new left. */
return TRUE;
/* the old connection is the correct one.
refer the client there (FIXME: do we ever get here?) */
return TRUE;
} else {
/* this new connection is the correct one, but wait until the
old connection gets disconnected before using this one.
that guarantees that the director inserting itself into
the ring has finished handshaking its left side, so the
switch will be fast. */
return TRUE;
}
return TRUE;
}
{
/* either use this or disconnect it */
if (!director_connection_assign_left(conn)) {
/* we don't want this */
"Unwanted incoming connection");
break;
}
}
}
}
{
struct director_connection *const *connp;
return TRUE;
}
return FALSE;
}
{
/* see if we should disconnect or keep the existing
connection. */
/* the old connection is the correct one */
i_warning("Aborting incorrect outgoing connection to %s "
"(already connected to correct one: %s)",
return FALSE;
}
i_warning("Replacing right director connection %s with %s",
}
return TRUE;
}
static bool
const char *const *args,
{
return FALSE;
}
return FALSE;
}
return FALSE;
}
return TRUE;
}
const char *const *args)
{
const char *connect_str;
return FALSE;
if (conn->me_received) {
return FALSE;
}
i_error("Remote director thinks it's someone else "
"(connected to %s:%u, remote says it's %s:%u)",
return FALSE;
}
int diff;
return FALSE;
}
if (diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS ||
i_warning("Director %s clock differs from ours by %d secs",
}
}
return TRUE;
/* Incoming connection:
a) we don't have an established ring yet. make sure we're connecting
to our right side (which might become our left side).
b) it's our current "left" connection. the previous connection
is most likely dead.
c) we have an existing ring. tell our current "left" to connect to
it with CONNECT command.
d) the incoming connection doesn't belong to us at all, refer it
elsewhere with CONNECT. however, before disconnecting it verify
first that our left side is actually still functional.
*/
/* the host shouldn't be removed at this point, but if for some
reason it is we don't want to crash */
/* make sure we don't keep old sequence values across restarts */
if (next_comm_attempt > ioloop_time) {
/* the director recently sent invalid protocol data,
don't try retrying yet */
i_error("director(%s): Remote sent invalid protocol data recently, "
"waiting %u secs before allowing further communication",
return FALSE;
/* a) - just in case the left is also our right side reset
its failed state, so we can connect to it */
/* b) */
"Replacing with new incoming connection");
/* c) */
} else {
/* d) */
}
return TRUE;
}
static bool
{
return TRUE;
}
if (!weak) {
/* removing user's weakness */
dir_debug("user refresh: %u weakness removed",
} else {
/* weak user marked again as weak */
}
} else if (weak &&
/* mark the user as weak */
} else if (weak) {
dir_debug("user refresh: %u weak update to %s ignored, "
"we recently changed it to %s",
/* update to the same host */
/* host conflict for a user that is already near expiring. we can
assume that the other director had already dropped this user
and we should have as well. use the new host. */
dir_debug("user refresh: %u is nearly expired, "
"replacing host %s with %s", username_hash,
/* user is still being moved - ignore conflicting host updates
from other directors who don't yet know about the move. */
dir_debug("user refresh: %u is being moved, "
"preserve its host %s instead of replacing with %s",
} else {
/* non-weak user received a non-weak update with
conflicting host. this shouldn't happen. */
"is being redirected to two hosts: %s and %s",
if (!conn->handshake_received) {
(long)timestamp);
}
/* we want all the directors to redirect the user to same
server, but we don't want two directors fighting over which
server it belongs to, so always use the lower IP address */
/* change the host. we'll also need to remove the user
from the old host's user_count, because we can't
keep track of the user for more than one host */
} else {
/* keep the host */
}
/* especially IMAP connections can take a long time to die.
make sure we kill off the connections in the wrong
backends. */
}
}
}
dir_debug("user refresh: %u refreshed timeout to %ld",
if (unset_weak_user) {
/* user is no longer weak. handle pending requests for
this user if there are any */
}
return ret;
}
static bool
const char *const *args)
{
unsigned int username_hash, timestamp;
bool weak;
return FALSE;
}
i_error("director(%s): USER used unknown host %s in handshake",
return FALSE;
}
}
return TRUE;
}
static bool
const char *const *args)
{
unsigned int username_hash;
/* NOTE: if more parameters are added, update also
CMD_IS_USER_HANDHAKE() macro */
return FALSE;
}
/* we probably just removed this host. */
return TRUE;
}
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
return FALSE;
/* ignore updates to ourself */
return TRUE;
}
/* ignore re-adds of removed directors */
return TRUE;
}
/* already have this. just reset its last_network_failure
timestamp, since it might be up now. */
host->last_network_failure = 0;
/* it also may have been restarted, reset its state */
} else {
/* save the director and forward it */
}
/* just forward this to the entire ring until it reaches back to
itself. some hosts may see this twice, but that's the only way to
guarantee that it gets seen by everyone. reseting the host multiple
times may cause us to handle its commands multiple times, but the
commands can handle that. however, we need to also handle a
situation where the added director never comes back - we don't want
to send the director information in a loop forever. */
dir_debug("Received DIRECTOR update for a host where we should be connected to. "
"Not forwarding it since it's probably crashed.");
} else {
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
return FALSE;
return TRUE;
}
static bool
const char *const *args)
{
unsigned int remote_ring_completed;
return FALSE;
}
/* clear everything we have and use only what remote sends us */
while (array_count(hosts) > 0) {
}
/* ignore whatever remote sends */
}
return TRUE;
}
static int
const char *const **_args,
struct director_host **host_r)
{
unsigned int seq;
struct director_host *host;
return -1;
}
/* director is already gone, but we can't be sure if this
command was sent everywhere. re-send it as if it was from
ourself. */
} else {
/* already seen this */
return 1;
}
}
return 0;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
/* note that unlike other commands we don't want to just ignore
duplicate commands */
return FALSE;
return FALSE;
}
/* we probably just removed this host. */
return TRUE;
}
if (ret == 0)
;
/* We originated this USER-WEAK request. The entire ring has seen
it and there weren't any conflicts. Make the user non-weak. */
dir_debug("user refresh: %u Our USER-WEAK seen by the entire ring",
} else {
/* The original USER-WEAK sender will send a new non-weak USER
update saying what really happened. We'll still need to forward
this around the ring to the origin so it also knows it has
travelled through the ring. */
dir_debug("user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring",
weak_forward = TRUE;
}
weak_forward) {
else {
}
}
return TRUE;
}
static bool ATTR_NULL(3)
struct director_host *dir_host)
{
unsigned int arg_count, vhost_count;
time_t last_updown_change = 0;
if (arg_count < 2 ||
return FALSE;
}
if (arg_count >= 3)
if (arg_count >= 4) {
return FALSE;
}
}
if (arg_count >= 5)
if (conn->ignore_host_events) {
/* remote is sending hosts in a handshake, but it doesn't have
a completed ring and we do. */
return TRUE;
}
director_cmd_error(conn, "Received a host tag from older director version with incompatible tagging support");
return FALSE;
}
} else {
i_error("director(%s): Host %s changed tag from '%s' to '%s'",
}
}
i_warning("director(%s): Host %s is being updated before previous update had finished - "
"setting to state=%s vhosts=%u",
/* make the change appear to come from us, so it
reaches the full ring */
}
}
if (update) {
} else {
dir_debug("Ignoring host %s update vhost_count=%u "
"down=%d last_updown_change=%ld (hosts_hash=%u)",
(long)last_updown_change,
}
return TRUE;
}
static bool
const char *const *args)
{
}
static bool
{
struct director_host *dir_host;
int ret;
return ret > 0;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
return ret > 0;
return FALSE;
}
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
unsigned int username_hash;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
{
/* we sent our user list before receiving remote's */
}
else
/* the host is up now, make sure we can connect to it immediately
if needed */
/* handshaked to left side. tell it we've received the
whole handshake. */
/* tell the "right" director about the "left" one */
t_strdup_printf("DIRECTOR\t%s\t%u\n",
/* this is our "left" side. */
return director_connection_assign_left(conn);
} else {
/* handshaked to "right" side. */
return director_connection_assign_right(conn);
}
}
static int
const char *const *args)
{
bool consistent_hashing = FALSE;
unsigned int i;
}
i_error("director(%s): director_consistent_hashing settings differ between directors",
return -1;
}
return 1;
}
static int
{
unsigned int major_version;
/* both incoming and outgoing connections get VERSION and ME */
i_error("director(%s): Wrong protocol in socket "
"(%s vs %s)",
return -1;
i_error("director(%s): Invalid protocol version: "
return -1;
} else if (major_version != DIRECTOR_VERSION_MAJOR) {
i_error("director(%s): Incompatible protocol version: "
return -1;
}
return FALSE;
}
if (conn->done_pending) {
if (director_connection_send_done(conn) < 0)
return -1;
}
return 1;
}
if (!conn->version_received) {
return -1;
}
if (!conn->me_received) {
return -1;
}
/* incoming connections get a HOST list */
if (conn->handshake_sending_hosts) {
return 1;
}
return -1;
}
"Host list is only for incoming connections");
return -1;
}
}
/* both get DONE */
return 0;
}
static bool
struct director_host *host,
unsigned int timestamp, unsigned int hosts_hash)
{
if (minor_version > DIRECTOR_VERSION_MINOR) {
/* we're not up to date */
}
/* stale SYNC event */
return FALSE;
}
/* sync_seq increases when we get disconnected, so we must be
successfully connected to both directions */
if (hosts_hash != 0 &&
i_error("director(%s): Hosts unexpectedly changed during SYNC reply - resending"
"(seq=%u, old hosts_hash=%u, new hosts_hash=%u)",
(void)director_resend_sync(dir);
return FALSE;
}
if (!dir->ring_handshaked) {
/* the ring is handshaked */
} else if (dir->ring_synced) {
/* duplicate SYNC (which was sent just in case the
previous one got lost) */
} else {
dir_debug("Ring is synced (%s sent seq=%u, hosts_hash=%u)",
}
} else {
/* stale SYNC event */
dir_debug("Ignore stale SYNC event for %s "
"(seq %u < %u, timestamp=%u)",
return FALSE;
i_warning("Last SYNC seq for %s appears to be stale, reseting "
"(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
dir_debug("Update SYNC for %s "
"(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
} else if (++host->last_sync_seq_counter >
/* we've received this too many times already */
dir_debug("Ignore duplicate #%u SYNC event for %s "
"(seq=%u, timestamp %u <= %u)",
return FALSE;
}
if (hosts_hash != 0 &&
dir_debug("Ignore director %s stale SYNC request whose hosts don't match us "
"(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
return FALSE;
}
/* we'll get here only if we received a SYNC twice
with the same wrong hosts_hash. FIXME: this gets
triggered unnecessarily sometimes if hosts are
changing rapidly. */
i_error("director(%s): Director %s SYNC request hosts don't match us - resending hosts "
"(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
return FALSE;
}
host->desynced_hosts_hash = 0;
/* forward it to the connection on right */
}
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
unsigned int hosts_hash = 0;
if (arg_count < 3 ||
return FALSE;
}
return FALSE;
}
return FALSE;
}
return FALSE;
}
/* find the originating director. if we don't see it, it was already
removed and we can ignore this sync. */
return TRUE;
}
(void)director_resend_sync(dir);
return TRUE;
}
const char *const *args)
{
struct director_host *host;
return FALSE;
}
/* reset failure timestamp so we'll actually try to connect there. */
host->last_network_failure = 0;
/* remote suggests us to connect elsewhere */
/* the old connection is the correct one */
dir_debug("Ignoring CONNECT request to %s (current right is %s)",
return TRUE;
}
dir_debug("Received CONNECT request to %s, "
} else {
dir_debug("Received CONNECT request to %s, "
"replacing current right %s",
}
/* connect here */
return TRUE;
}
{
}
}
{
if (!conn->ping_waiting)
return TRUE;
if (conn->verifying_left) {
/* our left side is functional. tell all the wrong
incoming connections to connect to it instead. */
}
}
return TRUE;
}
static bool
{
int ret;
if (!conn->handshake_received) {
if (ret > 0)
return TRUE;
if (ret < 0) {
/* invalid commands during handshake,
we probably don't want to reconnect here */
return FALSE;
}
/* allow also other commands during handshake */
}
return TRUE;
}
return director_cmd_pong(conn);
i_warning("Director %s disconnected us with reason: %s",
return FALSE;
}
return FALSE;
}
static bool
const char *line)
{
bool ret;
return FALSE;
}
return ret;
}
static void
const char *errstr)
{
i_warning("Director %s tried to connect to us, "
"should use %s instead",
return;
}
if (errstr[0] == '\0')
else
}
if (!conn->me_received)
else if (!conn->handshake_received)
}
{
char *line;
bool ret;
case 0:
return;
case -1:
/* disconnected */
return;
case -2:
/* buffer full */
return;
}
/* just read everything the remote sends, and wait for it
to disconnect. we mainly just want the remote to read the
CONNECT we sent it. */
return;
}
T_BEGIN {
} T_END;
if (!ret) {
"Invalid input: %s", line));
break;
}
}
}
{
struct director_host *const *hostp;
continue;
}
}
static void
{
bool send_updowns;
}
if (send_updowns) {
(long)host->last_updown_change);
}
}
}
{
;
} else {
i_error("director(%s): Director version is too old for supporting director_consistent_hashing=yes",
return -1;
}
return 0;
}
{
int ret;
T_BEGIN {
} T_END;
/* continue later */
return ret;
}
}
}
if (!conn->version_received)
else {
if (director_connection_send_done(conn) < 0)
return -1;
}
/* we received remote's list of users before sending ours */
}
return ret;
}
{
int ret;
/* still handshaking USER list */
if (ret < 0) {
} else {
}
return ret;
}
}
static struct director_connection *
{
struct director_connection *conn;
return conn;
}
{
"ME\t%s\t%u\t%lld\n",
}
struct director_connection *
{
struct director_connection *conn;
return conn;
}
{
int err;
return;
}
if (director_connection_send_users(conn) == 0)
}
struct director_connection *
struct director_host *host)
{
struct director_connection *conn;
/* make sure we don't keep old sequence values across restarts */
return conn;
}
const char *remote_reason)
{
unsigned int i, count;
dir_debug("Disconnecting from %s: %s",
}
if (*remote_reason != '\0' &&
"QUIT\t%s\n", remote_reason));
}
for (i = 0; i < count; i++) {
break;
}
}
/* if there is already another handshaked incoming connection,
use it as the new "left" */
}
i_error("close(director connection) failed: %m");
/* we aren't synced until we're again connected to a ring */
}
}
const char *reason)
{
/* connection didn't exist for very long, assume it has a
network problem */
}
}
const char *reason)
{
}
const char *data)
{
return;
if (director_debug) T_BEGIN {
} T_END;
if (ret < 0)
else {
i_error("director(%s): Output buffer full, "
}
} else {
}
}
static void
{
}
{
i_error("director(%s): PONG reply not received although other "
}
{
if (conn->ping_waiting)
return;
}
{
}
struct director_host *
{
}
{
return conn->handshake_received;
}
{
}
{
}
unsigned int
{
return conn->minor_version;
}
{
}
{
}
bool synced)
{
return;
/* switch ping timeout, unless we're already waiting for PONG */
if (conn->ping_waiting)
return;
}