director-connection.c revision 468c28dfb03613ab8d487b5aebc985a969193ace
/* Copyright (c) 2010-2013 Dovecot authors, see the included COPYING file */
/*
Handshaking:
Incoming director connections send:
VERSION
ME
<wait for DONE from remote handshake>
DONE
<make this connection our "left" connection, potentially disconnecting
another one>
Outgoing director connections send:
VERSION
ME
[0..n] DIRECTOR
HOST-HAND-START
[0..n] HOST
HOST-HAND-END
[0..n] USER
<possibly other non-handshake commands between USERs>
DONE
<wait for DONE from remote>
<make this connection our "right" connection, potentially disconnecting
another one>
*/
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "net.h"
#include "istream.h"
#include "ostream.h"
#include "str.h"
#include "master-service.h"
#include "mail-host.h"
#include "director.h"
#include "director-host.h"
#include "director-request.h"
#include "user-directory.h"
#include "director-connection.h"
#include <stdlib.h>
#include <unistd.h>
#define MAX_INBUF_SIZE 1024
/* Max idling time before "ME" command must have been received,
or we'll disconnect. */
/* Max time to wait for USERs in handshake to be sent. With a lot of users the
kernel may quickly eat up everything we send, while the receiver is busy
parsing the data. */
/* Max idling time before "DONE" command must have been received,
or we'll disconnect. */
/* How long to wait for PONG for an idling connection */
/* Maximum time to wait for PONG reply */
/* How long to wait to send PING when connection is idle */
/* How long to wait before sending PING while waiting for SYNC reply */
#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
/* If outgoing director connection exists for less than this many seconds,
mark the host as failed so we won't try to reconnect to it immediately */
#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
#define DIRECTOR_WAIT_DISCONNECT_SECS 10
#define DIRECTOR_HANDSHAKE_WARN_SECS 29
#define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
#endif
#endif
#define CMD_IS_USER_HANDHAKE(args) \
struct director_connection {
char *name;
unsigned int minor_version;
/* for incoming connections the director host isn't known until
ME-line is received */
struct director_host *host;
/* this is set only for wrong connections: */
struct director_host *connect_request_to;
int fd;
struct user_directory_iter *user_iter;
/* set during command execution */
unsigned int in:1;
unsigned int connected:1;
unsigned int version_received:1;
unsigned int me_received:1;
unsigned int handshake_received:1;
unsigned int ignore_host_events:1;
unsigned int handshake_sending_hosts:1;
unsigned int ping_waiting:1;
unsigned int synced:1;
unsigned int wrong_host:1;
unsigned int verifying_left:1;
unsigned int users_unsorted:1;
};
const char *reason);
static void
{
}
static void
{
i_error("director(%s): Connect timed out (%u secs)",
i_error("director(%s): Sending handshake (%u secs)",
} else if (!conn->me_received) {
i_error("director(%s): Handshaking ME timed out (%u secs)",
} else {
i_error("director(%s): Handshaking DONE timed out (%u secs)",
}
}
static void
{
unsigned int msecs;
}
{
"Timeout waiting for disconnect after CONNECT");
}
struct director_host *host)
{
const char *connect_str;
return;
/* wait for a while for the remote to disconnect, so it will hopefully
see our CONNECT command. we'll also log the warning later to avoid
multiple log lines about it. */
}
{
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
}
}
{
/* make sure this is the correct incoming connection */
i_error("Connection from self, dropping");
return FALSE;
/* no conflicts yet */
i_warning("Replacing left director connection %s with %s",
/* we're waiting to verify if our current left is still
working. if we don't receive a PONG, the current left
gets disconnected and a new left gets assigned. if we do
receive a PONG, we'll wait until the current left
disconnects us and then reassign the new left. */
return TRUE;
/* the old connection is the correct one.
refer the client there (FIXME: do we ever get here?) */
return TRUE;
} else {
/* this new connection is the correct one, but wait until the
old connection gets disconnected before using this one.
that guarantees that the director inserting itself into
the ring has finished handshaking its left side, so the
switch will be fast. */
return TRUE;
}
return TRUE;
}
{
/* either use this or disconnect it */
if (!director_connection_assign_left(conn)) {
/* we don't want this */
"Unwanted incoming connection");
break;
}
}
}
}
{
struct director_connection *const *connp;
return TRUE;
}
return FALSE;
}
{
/* see if we should disconnect or keep the existing
connection. */
/* the old connection is the correct one */
i_warning("Aborting incorrect outgoing connection to %s "
"(already connected to correct one: %s)",
return FALSE;
}
i_warning("Replacing right director connection %s with %s",
}
return TRUE;
}
static bool
const char *const *args,
{
return FALSE;
}
return FALSE;
}
return FALSE;
}
return TRUE;
}
const char *const *args)
{
const char *connect_str;
unsigned int port;
return FALSE;
if (conn->me_received) {
return FALSE;
}
i_error("Remote director thinks it's someone else "
"(connected to %s:%u, remote says it's %s:%u)",
return FALSE;
}
return TRUE;
/* Incoming connection:
a) we don't have an established ring yet. make sure we're connecting
to our right side (which might become our left side).
b) it's our current "left" connection. the previous connection
is most likely dead.
c) we have an existing ring. tell our current "left" to connect to
it with CONNECT command.
d) the incoming connection doesn't belong to us at all, refer it
elsewhere with CONNECT. however, before disconnecting it verify
first that our left side is actually still functional.
*/
/* the host shouldn't be removed at this point, but if for some
reason it is we don't want to crash */
/* make sure we don't keep old sequence values across restarts */
if (next_comm_attempt > ioloop_time) {
/* the director recently sent invalid protocol data,
don't try retrying yet */
i_error("director(%s): Remote sent invalid protocol data recently, "
"waiting %u secs before allowing further communication",
return FALSE;
/* a) - just in case the left is also our right side reset
its failed state, so we can connect to it */
/* b) */
"Replacing with new incoming connection");
/* c) */
} else {
/* d) */
}
return TRUE;
}
static bool
{
return TRUE;
}
if (!weak) {
/* removing user's weakness */
dir_debug("user refresh: %u weakness removed",
} else {
/* weak user marked again as weak */
}
} else if (weak &&
/* mark the user as weak */
} else if (weak) {
dir_debug("user refresh: %u weak update to %s ignored, "
"we recently changed it to %s",
/* update to the same host */
/* host conflict for a user that is already near expiring. we can
assume that the other director had already dropped this user
and we should have as well. use the new host. */
dir_debug("user refresh: %u is nearly expired, "
"replacing host %s with %s", username_hash,
} else {
/* non-weak user received a non-weak update with
conflicting host. this shouldn't happen. */
"is being redirected to two hosts: %s and %s",
if (!conn->handshake_received) {
(long)timestamp);
}
/* we want all the directors to redirect the user to same
server, but we don't want two directors fighting over which
server it belongs to, so always use the lower IP address */
/* change the host. we'll also need to remove the user
from the old host's user_count, because we can't
keep track of the user for more than one host */
} else {
/* keep the host */
}
}
}
}
dir_debug("user refresh: %u refreshed timeout to %ld",
if (unset_weak_user) {
/* user is no longer weak. handle pending requests for
this user if there are any */
}
return ret;
}
static bool
const char *const *args)
{
unsigned int username_hash, timestamp;
bool weak;
return FALSE;
}
i_error("director(%s): USER used unknown host %s in handshake",
return FALSE;
}
}
return TRUE;
}
static bool
const char *const *args)
{
unsigned int username_hash;
/* NOTE: if more parameters are added, update also
CMD_IS_USER_HANDHAKE() macro */
return FALSE;
}
/* we probably just removed this host. */
return TRUE;
}
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
unsigned int port;
return FALSE;
/* ignore updates to ourself */
return TRUE;
}
/* ignore re-adds of removed directors */
return TRUE;
}
/* already have this. just reset its last_network_failure
timestamp, since it might be up now. */
host->last_network_failure = 0;
/* it also may have been restarted, reset last_seq */
}
} else {
/* save the director and forward it */
}
if (forward) {
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
unsigned int port;
return FALSE;
return TRUE;
}
static bool
const char *const *args)
{
unsigned int remote_ring_completed;
return FALSE;
}
/* clear everything we have and use only what remote sends us */
while (array_count(hosts) > 0) {
}
/* ignore whatever remote sends */
}
return TRUE;
}
static int
const char *const **_args,
struct director_host **host_r)
{
struct director_host *host;
return -1;
}
/* director is already gone, but we can't be sure if this
command was sent everywhere. re-send it as if it was from
ourself. */
} else {
/* already seen this */
return 1;
}
}
return 0;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
/* note that unlike other commands we don't want to just ignore
duplicate commands */
return FALSE;
return FALSE;
}
/* we probably just removed this host. */
return TRUE;
}
if (ret == 0)
;
/* We originated this USER-WEAK request. The entire ring has seen
it and there weren't any conflicts. Make the user non-weak. */
dir_debug("user refresh: %u Our USER-WEAK seen by the entire ring",
} else {
/* The original USER-WEAK sender will send a new non-weak USER
update saying what really happened. We'll still need to forward
this around the ring to the origin so it also knows it has
travelled through the ring. */
dir_debug("user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring",
weak_forward = TRUE;
}
weak_forward) {
else {
}
}
return TRUE;
}
static bool ATTR_NULL(3)
struct director_host *dir_host)
{
unsigned int vhost_count;
bool update;
return FALSE;
}
if (conn->ignore_host_events) {
/* remote is sending hosts in a handshake, but it doesn't have
a completed ring and we do. */
return TRUE;
}
} else {
}
if (update) {
host, vhost_count);
}
return TRUE;
}
static bool
const char *const *args)
{
}
static bool
{
struct director_host *dir_host;
int ret;
return ret > 0;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
return ret > 0;
return FALSE;
}
}
return TRUE;
}
static bool
const char *const *args)
{
unsigned int username_hash;
return FALSE;
}
return TRUE;
}
static bool
const char *const *args)
{
struct director_host *dir_host;
unsigned int username_hash;
int ret;
return ret > 0;
return FALSE;
}
return TRUE;
}
{
/* we sent our user list before receiving remote's */
}
else
}
/* the host is up now, make sure we can connect to it immediately
if needed */
/* handshaked to left side. tell it we've received the
whole handshake. */
/* tell the "right" director about the "left" one */
t_strdup_printf("DIRECTOR\t%s\t%u\n",
/* this is our "left" side. */
return director_connection_assign_left(conn);
} else {
/* handshaked to "right" side. */
return director_connection_assign_right(conn);
}
}
static int
{
/* both incoming and outgoing connections get VERSION and ME */
i_error("director(%s): Wrong protocol in socket "
"(%s vs %s)",
return -1;
i_error("director(%s): Incompatible protocol version: "
return -1;
}
return 1;
}
if (!conn->version_received) {
return -1;
}
if (!conn->me_received) {
return -1;
}
/* incoming connections get a HOST list */
if (conn->handshake_sending_hosts) {
return 1;
}
return -1;
}
"Host list is only for incoming connections");
return -1;
}
}
/* both get DONE */
return 0;
}
static bool
struct director_host *host,
unsigned int timestamp)
{
if (minor_version > DIRECTOR_VERSION_MINOR) {
/* we're not up to date */
}
/* stale SYNC event */
return FALSE;
}
/* sync_seq increases when we get disconnected, so we must be
successfully connected to both directions */
if (!dir->ring_handshaked) {
/* the ring is handshaked */
} else if (dir->ring_synced) {
/* duplicate SYNC (which was sent just in case the
previous one got lost) */
} else {
dir_debug("Ring is synced (%s sent seq=%u)",
}
} else {
/* stale SYNC event */
return FALSE;
} else if (++host->last_sync_seq_counter >
/* we've received this too many times already */
return FALSE;
}
/* forward it to the connection on right */
}
}
return TRUE;
}
const char *const *args)
{
struct director_host *host;
return FALSE;
}
return FALSE;
}
}
/* find the originating director. if we don't see it, it was already
removed and we can ignore this sync. */
return TRUE;
}
(void)director_resend_sync(dir);
return TRUE;
}
const char *const *args)
{
struct director_host *host;
unsigned int port;
return FALSE;
}
/* reset failure timestamp so we'll actually try to connect there. */
host->last_network_failure = 0;
/* remote suggests us to connect elsewhere */
/* the old connection is the correct one */
dir_debug("Ignoring CONNECT request to %s (current right is %s)",
return TRUE;
}
dir_debug("Received CONNECT request to %s, "
} else {
dir_debug("Received CONNECT request to %s, "
"replacing current right %s",
}
/* connect here */
return TRUE;
}
{
}
}
{
if (!conn->ping_waiting)
return TRUE;
if (conn->verifying_left) {
/* our left side is functional. tell all the wrong
incoming connections to connect to it instead. */
}
}
return TRUE;
}
static bool
{
int ret;
if (!conn->handshake_received) {
if (ret > 0)
return TRUE;
if (ret < 0) {
/* invalid commands during handshake,
we probably don't want to reconnect here */
return FALSE;
}
/* allow also other commands during handshake */
}
return TRUE;
}
return director_cmd_pong(conn);
i_warning("Director %s disconnected us with reason: %s",
return FALSE;
}
return FALSE;
}
static bool
const char *line)
{
bool ret;
return FALSE;
}
return ret;
}
static void
{
i_warning("Director %s tried to connect to us, "
"should use %s instead",
return;
}
}
if (!conn->me_received)
else if (!conn->handshake_received)
}
{
char *line;
bool ret;
case 0:
return;
case -1:
/* disconnected */
return;
case -2:
/* buffer full */
return;
}
/* just read everything the remote sends, and wait for it
to disconnect. we mainly just want the remote to read the
CONNECT we sent it. */
return;
}
T_BEGIN {
} T_END;
if (!ret) {
"Invalid input: %s", line));
break;
}
}
}
{
struct director_host *const *hostp;
continue;
}
}
static void
{
}
}
{
int ret;
T_BEGIN {
} T_END;
/* continue later */
return ret;
}
}
}
/* we received remote's list of users before sending ours */
}
return ret;
}
{
int ret;
/* still handshaking USER list */
if (ret < 0)
else
return ret;
}
}
static struct director_connection *
{
struct director_connection *conn;
return conn;
}
{
"ME\t%s\t%u\n",
}
struct director_connection *
{
struct director_connection *conn;
return conn;
}
{
int err;
return;
}
if (director_connection_send_users(conn) == 0)
}
struct director_connection *
struct director_host *host)
{
struct director_connection *conn;
/* make sure we don't keep old sequence values across restarts */
return conn;
}
const char *remote_reason)
{
unsigned int i, count;
dir_debug("Disconnecting from %s: %s",
}
if (*remote_reason != '\0' &&
"QUIT\t%s\n", remote_reason));
}
for (i = 0; i < count; i++) {
break;
}
}
/* if there is already another handshaked incoming connection,
use it as the new "left" */
}
i_error("close(director connection) failed: %m");
/* we aren't synced until we're again connected to a ring */
}
}
{
/* connection didn't exist for very long, assume it has a
network problem */
}
}
const char *reason)
{
}
const char *data)
{
return;
if (director_debug) T_BEGIN {
} T_END;
if (ret < 0)
else {
i_error("director(%s): Output buffer full, "
}
}
}
static void
{
}
{
i_error("director(%s): PONG reply not received although other "
}
{
if (conn->ping_waiting)
return;
}
{
}
struct director_host *
{
}
{
return conn->handshake_received;
}
{
}
unsigned int
{
return conn->minor_version;
}
{
}
{
}
bool synced)
{
return;
/* switch ping timeout, unless we're already waiting for PONG */
if (conn->ping_waiting)
return;
}