director-connection.c revision 2acc1162990b5be76d2b4923a0fcfcfdcdc65d10
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen/* Copyright (c) 2010 Dovecot authors, see the included COPYING file */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "lib.h"
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen#include "ioloop.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "array.h"
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen#include "network.h"
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen#include "istream.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "ostream.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "str.h"
fea541eec46707f9b01bd3cbc981d73c1e808a54Timo Sirainen#include "llist.h"
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen#include "master-service.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "mail-host.h"
a53cb86b4d733d9c48ee4d285bed477c80825804Timo Sirainen#include "director.h"
a53cb86b4d733d9c48ee4d285bed477c80825804Timo Sirainen#include "director-host.h"
e4fb5bfcdff32d337d054cce36e00e1cdfaae9f8Timo Sirainen#include "director-request.h"
e4fb5bfcdff32d337d054cce36e00e1cdfaae9f8Timo Sirainen#include "user-directory.h"
e4fb5bfcdff32d337d054cce36e00e1cdfaae9f8Timo Sirainen#include "director-connection.h"
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen#include <stdlib.h>
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen#include <unistd.h>
89b548af722113acb5d63dfffb44423cb60f91e4Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#define DIRECTOR_VERSION_NAME "director"
4b231ca0bbe3b536acbd350101e183441ce0247aTimo Sirainen#define DIRECTOR_VERSION_MAJOR 1
4b231ca0bbe3b536acbd350101e183441ce0247aTimo Sirainen#define DIRECTOR_VERSION_MINOR 0
4b231ca0bbe3b536acbd350101e183441ce0247aTimo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#define MAX_INBUF_SIZE 1024
11fa9bc1e84f7cc88d953c0a0ae4e2cd07881957Timo Sirainen#define MAX_OUTBUF_SIZE (1024*1024*10)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#define OUTBUF_FLUSH_THRESHOLD (1024*128)
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen/* Max idling time while connecting/handshaking before disconnecting */
8e7da21696c9f8a6d5e601243fb6172ec85d47b2Timo Sirainen#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
1b3bb8d39686ed24730cbc31cc9a33dc62c8c6c3Timo Sirainen/* How long to wait for PONG after PING request */
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen/* How long to wait to send PING when connection is idle */
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
da985034a708db2f61394b30d117050ae6829ee5Timo Sirainen/* How long to wait before sending PING while waiting for SYNC reply */
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainenstruct director_connection {
f23298fea47eecbeded985ee2537a34c4c4ef56bTimo Sirainen struct director_connection *prev, *next;
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen struct director *dir;
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen char *name;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen /* for incoming connections the director host isn't known until
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen ME-line is received */
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen struct director_host *host;
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen
ab286a8b58306eb8d22fc18342b6c199fd428e1eTimo Sirainen int fd;
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen struct io *io;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen struct istream *input;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen struct ostream *output;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen struct timeout *to, *to_ping;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen struct user_directory_iter *user_iter;
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen unsigned int in:1;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen unsigned int connected:1;
d74899545d913eac91c82b692927b32c3bf36abaTimo Sirainen unsigned int version_received:1;
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen unsigned int me_received:1;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen unsigned int handshake_received:1;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen unsigned int ignore_host_events:1;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen unsigned int handshake_sending_hosts:1;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen unsigned int ping_waiting:1;
d74899545d913eac91c82b692927b32c3bf36abaTimo Sirainen unsigned int sync_ping:1;
d74899545d913eac91c82b692927b32c3bf36abaTimo Sirainen};
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen
8e7da21696c9f8a6d5e601243fb6172ec85d47b2Timo Sirainenstatic void director_connection_ping(struct director_connection *conn);
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainenstatic void director_connection_disconnected(struct director_connection **conn);
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainenstatic bool
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainendirector_args_parse_ip_port(struct director_connection *conn,
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen const char *const *args,
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen struct ip_addr *ip_r, unsigned int *port_r)
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen{
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen if (net_addr2ip(args[0], ip_r) < 0) {
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen i_error("director(%s): Command has invalid IP address: %s",
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen conn->name, args[0]);
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen return FALSE;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen }
9315dd69233d554452df0c12bc57002d2042a8f4Timo Sirainen if (str_to_uint(args[1], port_r) < 0) {
9315dd69233d554452df0c12bc57002d2042a8f4Timo Sirainen i_error("director(%s): Command has invalid port: %s",
9315dd69233d554452df0c12bc57002d2042a8f4Timo Sirainen conn->name, args[1]);
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen return FALSE;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen }
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen return TRUE;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen}
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainenstatic bool director_cmd_me(struct director_connection *conn,
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen const char *const *args)
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen{
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen struct director *dir = conn->dir;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen struct director_host *host;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen const char *connect_str;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen struct ip_addr ip;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen unsigned int port;
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen
de12ff295bb3d0873b4dced5840612cbacd635efTimo Sirainen if (!director_args_parse_ip_port(conn, args, &ip, &port))
da985034a708db2f61394b30d117050ae6829ee5Timo Sirainen return FALSE;
da985034a708db2f61394b30d117050ae6829ee5Timo Sirainen
da985034a708db2f61394b30d117050ae6829ee5Timo Sirainen if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
da985034a708db2f61394b30d117050ae6829ee5Timo Sirainen conn->host->port != port)) {
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen i_error("Remote director thinks it's someone else "
c27f03fa8fd2ef4acd1db814fae7d90e0eb9d3aeTimo Sirainen "(connected to %s:%u, remote says it's %s:%u)",
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen net_ip2addr(&conn->host->ip), conn->host->port,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen net_ip2addr(&ip), port);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return FALSE;
1b3bb8d39686ed24730cbc31cc9a33dc62c8c6c3Timo Sirainen }
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen host = director_host_get(dir, &ip, port);
8e7da21696c9f8a6d5e601243fb6172ec85d47b2Timo Sirainen conn->me_received = TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (!conn->in)
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen return TRUE;
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen i_free(conn->name);
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen conn->name = i_strdup_printf("%s/left", host->name);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen conn->host = host;
b2105c78f0fd58281317e6d777ded860f33153a3Timo Sirainen /* make sure we don't keep old sequence values across restarts */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen host->last_seq = 0;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
fddec1bf093b45eaedcece13c649b811208e0547Timo Sirainen net_ip2addr(&host->ip), host->port);
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen /* make sure this is the correct incoming connection */
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen if (host->self) {
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen /* probably we're trying to find our own ip. it's no */
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen i_error("director(%s): Connection from self, dropping",
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen host->name);
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen return FALSE;
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen } else if (dir->left == NULL) {
190237ce467d2389dfb809874b0fec86d3c7968dTimo Sirainen /* no conflicts yet */
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen } else if (dir->left->host == host) {
41bb0aa8e357876bc9a1916a37c9e3e78e5f8185Timo Sirainen i_warning("director(%s): Dropping existing connection "
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen "in favor of its new connection", host->name);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen director_connection_deinit(&dir->left);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen } else {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (director_host_cmp_to_self(dir->left->host, host,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen dir->self_host) < 0) {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* the old connection is the correct one.
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen refer the client there. */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen director_connection_send(conn, t_strdup_printf(
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen "CONNECT\t%s\t%u\n",
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen net_ip2addr(&dir->left->host->ip),
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen dir->left->host->port));
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen /* also make sure that the connection is alive */
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen director_connection_ping(dir->left);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return FALSE;
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen }
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* this new connection is the correct one. disconnect the old
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen one, but before that tell it to connect to the new one.
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen that message might not reach it, so also send the same
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen message to right side. */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_warning("Replacing director connection %s with %s",
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen dir->left->host->name, host->name);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen director_connection_send(dir->left, connect_str);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen (void)o_stream_flush(dir->left->output);
40ef82c46f6652412b068ebcdac7c3e74840a284Timo Sirainen director_connection_deinit(&dir->left);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen dir->left = conn;
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen /* tell the ring's right side to connect to this new director. */
13c6532dc104d23061e6901783ceb1ff8872c206Timo Sirainen if (dir->right != NULL) {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (dir->left->host != dir->right->host)
e86d0d34fe365da4c7ca4312d575bfcbf3a01c0eTimo Sirainen director_connection_send(dir->right, connect_str);
e86d0d34fe365da4c7ca4312d575bfcbf3a01c0eTimo Sirainen else {
e86d0d34fe365da4c7ca4312d575bfcbf3a01c0eTimo Sirainen /* there are only two directors, and we already have
e86d0d34fe365da4c7ca4312d575bfcbf3a01c0eTimo Sirainen a connection to this server. */
e86d0d34fe365da4c7ca4312d575bfcbf3a01c0eTimo Sirainen }
41e1c7380edda701719d8ce1fb4d465d2ec4c84dTimo Sirainen } else {
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen /* there are only two directors. connect to the other one. */
bb10ebcf076c959c752f583746d83805d7686df8Timo Sirainen (void)director_connect_host(dir, host);
41e1c7380edda701719d8ce1fb4d465d2ec4c84dTimo Sirainen }
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen return TRUE;
690af4a90eaf8611c2573d34126bb7a852c50a44Timo Sirainen}
690af4a90eaf8611c2573d34126bb7a852c50a44Timo Sirainen
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainenstatic bool
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainendirector_user_refresh(struct director *dir, unsigned int username_hash,
8d80659e504ffb34bb0c6a633184fece35751b18Timo Sirainen struct mail_host *host, time_t timestamp,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct user **user_r)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct user *user;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen bool ret = FALSE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen user = user_directory_lookup(dir->users, username_hash);
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen if (user == NULL) {
369a1084c500a9df7448ffa9409ce32e42060bc2Timo Sirainen *user_r = user_directory_add(dir->users, username_hash,
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen host, timestamp);
8887bf3757d51d73887dd20b1db3334d867d3817Timo Sirainen return TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
b9ce555e8624a5593b3bfd81b572b7d2e1e1fca5Timo Sirainen user_directory_refresh(dir->users, user);
ed3ce1282f6bc35d20e82c2c23a2990c8dfe876fTimo Sirainen ret = TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen if (user->host != host) {
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen i_error("User hash %u is being redirected to two hosts: "
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen "%s and %s", username_hash,
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen net_ip2addr(&user->host->ip),
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen net_ip2addr(&host->ip));
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen /* we want all the directors to redirect the user to same
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen server, but we don't want two directors fighting over which
5626ae5e3316eced244adb6485c0927f1c7fdc41Timo Sirainen server it belongs to, so always use the lower IP address */
5626ae5e3316eced244adb6485c0927f1c7fdc41Timo Sirainen if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen /* change the host. we'll also need to remove the user
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen from the old host's user_count, because we can't
f23298fea47eecbeded985ee2537a34c4c4ef56bTimo Sirainen keep track of the user for more than one host */
f23298fea47eecbeded985ee2537a34c4c4ef56bTimo Sirainen user->host->user_count--;
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen user->host = host;
519e0a461271843833a2b42626ad93f6e7ddc497Timo Sirainen user->host->user_count++;
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen }
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen ret = TRUE;
7e94cf9d70ce9fdeccb7a85ff400b899e6386f36Timo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen *user_r = user;
e4b09b008ab544eb8994beecbfffefa21d855e43Timo Sirainen return ret;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen}
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic bool
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainendirector_handshake_cmd_user(struct director_connection *conn,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen const char *const *args)
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen unsigned int username_hash, timestamp;
5c1a8aee989af87bddefd71e2aa83aa2bd695155Timo Sirainen struct ip_addr ip;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct mail_host *host;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct user *user;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen if (str_array_length(args) != 3 ||
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen str_to_uint(args[0], &username_hash) < 0 ||
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen net_addr2ip(args[1], &ip) < 0 ||
2af769daebd83719ac696a440e06f6020471cec0Timo Sirainen str_to_uint(args[2], &timestamp) < 0) {
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen i_error("director(%s): Invalid USER handshake args",
d9fdacd5fb3e07997e5c389739d2054f0c8441d8Timo Sirainen conn->name);
a53cb86b4d733d9c48ee4d285bed477c80825804Timo Sirainen return FALSE;
abbe0657a4d6271740d41cf1de55e8686f5769fcTimo Sirainen }
d30da25fb6be1f1c667d93767c9194000194b618Timo Sirainen
abbe0657a4d6271740d41cf1de55e8686f5769fcTimo Sirainen host = mail_host_lookup(conn->dir->mail_hosts, &ip);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (host == NULL) {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_error("director(%s): USER used unknown host %s in handshake",
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen conn->name, args[1]);
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen return FALSE;
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen }
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen director_user_refresh(conn->dir, username_hash, host, timestamp, &user);
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen return TRUE;
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen}
75ef04fc62a3955d3a5310410e09735cbd4e972bTimo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic bool
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainendirector_cmd_user(struct director_connection *conn, const char *const *args)
8e7da21696c9f8a6d5e601243fb6172ec85d47b2Timo Sirainen{
375fac6f5a29f687fe8117f814cd7e594b0a1dd2Timo Sirainen unsigned int username_hash;
375fac6f5a29f687fe8117f814cd7e594b0a1dd2Timo Sirainen struct ip_addr ip;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen struct mail_host *host;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen struct user *user;
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen if (str_array_length(args) != 2 ||
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen str_to_uint(args[0], &username_hash) < 0 ||
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen net_addr2ip(args[1], &ip) < 0) {
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen i_error("director(%s): Invalid USER args", conn->name);
bbf796c17f02538058d7559bfe96d677e5b55015Timo Sirainen return FALSE;
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen }
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen host = mail_host_lookup(conn->dir->mail_hosts, &ip);
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen if (host == NULL) {
6a19e109ee8c5a6f688da83a86a7f6abeb71abddTimo Sirainen /* we probably just removed this host. */
fa5957ffc9b676bfd649fa9953e63e72ee4ebeb4Timo Sirainen return TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
1e47cfede3a0b62654105daab00e97b5d660bc6bTimo Sirainen
1e47cfede3a0b62654105daab00e97b5d660bc6bTimo Sirainen if (director_user_refresh(conn->dir, username_hash,
16c89b1260c9d07c01c83a9219424d3727069b2eTimo Sirainen host, ioloop_time, &user))
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen director_update_user(conn->dir, conn->host, user);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return TRUE;
fea541eec46707f9b01bd3cbc981d73c1e808a54Timo Sirainen}
fea541eec46707f9b01bd3cbc981d73c1e808a54Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic bool director_cmd_director(struct director_connection *conn,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen const char *const *args)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct director_host *host;
31ddc75584c5cde53d2e78a737587f2e7fdcb0d2Timo Sirainen struct ip_addr ip;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen unsigned int port;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (!director_args_parse_ip_port(conn, args, &ip, &port))
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return FALSE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen host = director_host_lookup(conn->dir, &ip, port);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (host != NULL) {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* already have this, skip */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
44ff75ca53188056ff5a3e50428e3f2078800b3cTimo Sirainen /* save the director and forward it */
44ff75ca53188056ff5a3e50428e3f2078800b3cTimo Sirainen director_host_add(conn->dir, &ip, port);
44ff75ca53188056ff5a3e50428e3f2078800b3cTimo Sirainen director_connection_send(conn->dir->right,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen t_strdup_printf("DIRECTOR\t%s\t%u\n", net_ip2addr(&ip), port));
return TRUE;
}
static bool
director_cmd_host_hand_start(struct director_connection *conn,
const char *const *args)
{
const ARRAY_TYPE(mail_host) *hosts;
struct mail_host *const *hostp;
unsigned int remote_ring_completed;
if (args == NULL || str_to_uint(args[0], &remote_ring_completed) < 0) {
i_error("director(%s): Invalid HOST-HAND-START args",
conn->name);
return FALSE;
}
if (remote_ring_completed && !conn->dir->ring_handshaked) {
/* clear everything we have and use only what remote sends us */
hosts = mail_hosts_get(conn->dir->mail_hosts);
while (array_count(hosts) > 0) {
hostp = array_idx(hosts, 0);
director_remove_host(conn->dir, NULL, NULL, *hostp);
}
} else if (!remote_ring_completed && conn->dir->ring_handshaked) {
/* ignore whatever remote sends */
conn->ignore_host_events = TRUE;
}
conn->handshake_sending_hosts = TRUE;
return TRUE;
}
static int
director_cmd_is_seen(struct director_connection *conn,
const char *const **_args,
struct director_host **host_r)
{
const char *const *args = *_args;
struct ip_addr ip;
unsigned int port, seq;
struct director_host *host;
if (str_array_length(args) < 3 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &port) < 0 ||
str_to_uint(args[2], &seq) < 0) {
i_error("director(%s): Command is missing parameters",
conn->name);
return -1;
}
*_args = args + 3;
host = director_host_lookup(conn->dir, &ip, port);
if (host == NULL) {
/* director is already gone, but we can't be sure if this
command was sent everywhere. re-send it as if it was from
ourself. */
*host_r = NULL;
} else {
if (seq <= host->last_seq) {
/* already seen this */
return 1;
}
*host_r = host;
host->last_seq = seq;
}
return 0;
}
static bool
director_cmd_host_int(struct director_connection *conn, const char *const *args,
struct director_host *dir_host)
{
struct mail_host *host;
struct ip_addr ip;
unsigned int vhost_count;
bool update;
if (str_array_length(args) != 2 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &vhost_count) < 0) {
i_error("director(%s): Invalid HOST args", conn->name);
return FALSE;
}
if (conn->ignore_host_events) {
/* remote is sending hosts in a handshake, but it doesn't have
a completed ring and we do. */
i_assert(conn->handshake_sending_hosts);
return TRUE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL) {
host = mail_host_add_ip(conn->dir->mail_hosts, &ip);
update = TRUE;
} else {
update = host->vhost_count != vhost_count;
}
if (update) {
mail_host_set_vhost_count(conn->dir->mail_hosts,
host, vhost_count);
director_update_host(conn->dir, conn->host, dir_host, host);
}
return TRUE;
}
static bool
director_cmd_host_handshake(struct director_connection *conn,
const char *const *args)
{
return director_cmd_host_int(conn, args, NULL);
}
static bool
director_cmd_host(struct director_connection *conn, const char *const *args)
{
struct director_host *dir_host;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
return director_cmd_host_int(conn, args, dir_host);
}
static bool
director_cmd_host_remove(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 1 ||
net_addr2ip(args[0], &ip) < 0) {
i_error("director(%s): Invalid HOST-REMOVE args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
director_remove_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
static bool
director_cmd_host_flush(struct director_connection *conn,
const char *const *args)
{
struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
unsigned int seq;
int ret;
if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
return ret > 0;
if (str_array_length(args) != 2 ||
net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &seq) < 0) {
i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
director_flush_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
static void director_handshake_cmd_done(struct director_connection *conn)
{
struct director *dir = conn->dir;
if (dir->debug)
i_debug("Handshaked to %s", conn->host->name);
conn->handshake_received = TRUE;
if (conn->in) {
/* handshaked to left side. tell it we've received the
whole handshake. */
director_connection_send(conn, "DONE\n");
/* tell the right director about the left one */
if (dir->right != NULL) {
director_connection_send(dir->right,
t_strdup_printf("DIRECTOR\t%s\t%u\n",
net_ip2addr(&conn->host->ip),
conn->host->port));
}
}
if (dir->left != NULL && dir->right != NULL &&
dir->left->handshake_received && dir->right->handshake_received) {
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
dir->sync_seq++;
dir->ring_synced = FALSE;
director_connection_send(dir->right,
t_strdup_printf("SYNC\t%s\t%u\t%u\n",
net_ip2addr(&dir->self_ip),
dir->self_port, dir->sync_seq));
}
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
director_connection_ping, conn);
}
static bool
director_connection_handle_handshake(struct director_connection *conn,
const char *cmd, const char *const *args)
{
struct director_host *host;
struct ip_addr ip;
unsigned int port;
/* both incoming and outgoing connections get VERSION and ME */
if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
i_error("director(%s): Wrong protocol in socket "
"(%s vs %s)",
conn->name, args[0], DIRECTOR_VERSION_NAME);
return FALSE;
} else if (atoi(args[1]) != DIRECTOR_VERSION_MAJOR) {
i_error("director(%s): Incompatible protocol version: "
"%u vs %u", conn->name, atoi(args[1]),
DIRECTOR_VERSION_MAJOR);
return FALSE;
}
conn->version_received = TRUE;
return TRUE;
}
if (!conn->version_received) {
i_error("director(%s): Incompatible protocol", conn->name);
return FALSE;
}
if (strcmp(cmd, "ME") == 0 && !conn->me_received &&
str_array_length(args) == 2)
return director_cmd_me(conn, args);
/* only outgoing connections get a CONNECT reference */
if (!conn->in && strcmp(cmd, "CONNECT") == 0 &&
str_array_length(args) == 2) {
/* remote wants us to connect elsewhere */
if (!director_args_parse_ip_port(conn, args, &ip, &port))
return FALSE;
conn->dir->right = NULL;
host = director_host_get(conn->dir, &ip, port);
(void)director_connect_host(conn->dir, host);
return FALSE;
}
/* only incoming connections get DIRECTOR and HOST lists */
if (conn->in && strcmp(cmd, "DIRECTOR") == 0 && conn->me_received)
return director_cmd_director(conn, args);
if (strcmp(cmd, "HOST") == 0) {
/* allow hosts from all connections always,
this could be an host update */
if (conn->handshake_sending_hosts)
return director_cmd_host_handshake(conn, args);
else
return director_cmd_host(conn, args);
}
if (conn->handshake_sending_hosts &&
strcmp(cmd, "HOST-HAND-END") == 0) {
conn->ignore_host_events = FALSE;
conn->handshake_sending_hosts = FALSE;
return TRUE;
}
if (conn->in && strcmp(cmd, "HOST-HAND-START") == 0 &&
conn->me_received)
return director_cmd_host_hand_start(conn, args);
/* only incoming connections get a full USER list, but outgoing
connections can also receive USER updates during handshake and
it wouldn't be safe to ignore them. */
if (strcmp(cmd, "USER") == 0 && conn->me_received) {
if (conn->in)
return director_handshake_cmd_user(conn, args);
else
return director_cmd_user(conn, args);
}
/* both get DONE */
if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
!conn->handshake_sending_hosts) {
director_handshake_cmd_done(conn);
return TRUE;
}
i_error("director(%s): Invalid handshake command: %s "
"(in=%d me_received=%d)", conn->name, cmd,
conn->in, conn->me_received);
return FALSE;
}
static bool director_connection_sync(struct director_connection *conn,
const char *const *args, const char *line)
{
struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
unsigned int port, seq;
if (str_array_length(args) != 3 ||
director_args_parse_ip_port(conn, args, &ip, &port) < 0 ||
str_to_uint(args[2], &seq) < 0) {
i_error("director(%s): Invalid SYNC args", conn->name);
return FALSE;
}
/* find the originating director. if we don't see it, it was already
removed and we can ignore this sync. */
host = director_host_lookup(dir, &ip, port);
if (host == NULL)
return TRUE;
if (host->self) {
if (dir->sync_seq != seq) {
/* stale SYNC event */
return TRUE;
}
if (!dir->ring_handshaked) {
/* the ring is handshaked */
director_set_ring_handshaked(dir);
} else if (dir->ring_synced) {
i_error("Received SYNC from %s (seq=%u) "
"while already synced", conn->name, seq);
return TRUE;
} else {
if (dir->debug) {
i_debug("Ring is synced (%s sent seq=%u)",
conn->name, seq);
}
director_set_ring_synced(dir);
}
return TRUE;
}
/* forward it to the connection on right */
if (dir->right != NULL) {
director_connection_send(dir->right,
t_strconcat(line, "\n", NULL));
}
return TRUE;
}
static bool director_cmd_connect(struct director_connection *conn,
const char *const *args)
{
struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
unsigned int port;
if (str_array_length(args) != 2 ||
director_args_parse_ip_port(conn, args, &ip, &port) < 0) {
i_error("director(%s): Invalid CONNECT args", conn->name);
return FALSE;
}
host = director_host_lookup(dir, &ip, port);
if (host == NULL) {
i_error("Received CONNECT request to unknown host %s:%u",
net_ip2addr(&ip), port);
return TRUE;
}
/* remote suggests us to connect elsewhere */
if (dir->right != NULL &&
director_host_cmp_to_self(host, dir->right->host,
dir->self_host) <= 0) {
/* the old connection is the correct one */
if (dir->debug) {
i_debug("Ignoring CONNECT request to %s "
"(current right is %s)",
host->name, dir->right->name);
}
return TRUE;
}
if (dir->debug) {
if (dir->right == NULL) {
i_debug("Received CONNECT request to %s, "
"initializing right", host->name);
} else {
i_debug("Received CONNECT request to %s, "
"replacing current right %s",
host->name, dir->right->name);
}
}
/* connect here */
(void)director_connect_host(dir, host);
return TRUE;
}
static bool director_cmd_pong(struct director_connection *conn)
{
if (!conn->ping_waiting)
return TRUE;
conn->ping_waiting = FALSE;
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
director_connection_ping, conn);
return TRUE;
}
static bool
director_connection_handle_line(struct director_connection *conn,
const char *line)
{
const char *cmd, *const *args;
args = t_strsplit(line, "\t");
cmd = args[0]; args++;
if (cmd == NULL) {
i_error("director(%s): Received empty line", conn->name);
return FALSE;
}
/* ping/pong is always handled */
if (strcmp(cmd, "PING") == 0) {
director_connection_send(conn, "PONG\n");
return TRUE;
}
if (strcmp(cmd, "PONG") == 0)
return director_cmd_pong(conn);
if (!conn->handshake_received) {
if (!director_connection_handle_handshake(conn, cmd, args)) {
/* invalid commands during handshake,
we probably don't want to reconnect here */
if (conn->host != NULL)
conn->host->last_failed = ioloop_time;
return FALSE;
}
return TRUE;
}
if (strcmp(cmd, "USER") == 0)
return director_cmd_user(conn, args);
if (strcmp(cmd, "HOST") == 0)
return director_cmd_host(conn, args);
if (strcmp(cmd, "HOST-REMOVE") == 0)
return director_cmd_host_remove(conn, args);
if (strcmp(cmd, "HOST-FLUSH") == 0)
return director_cmd_host_flush(conn, args);
if (strcmp(cmd, "DIRECTOR") == 0)
return director_cmd_director(conn, args);
if (strcmp(cmd, "SYNC") == 0)
return director_connection_sync(conn, args, line);
if (strcmp(cmd, "CONNECT") == 0)
return director_cmd_connect(conn, args);
i_error("director(%s): Unknown command (in this state): %s",
conn->name, cmd);
return FALSE;
}
static void director_connection_input(struct director_connection *conn)
{
struct director *dir = conn->dir;
char *line;
bool ret;
if (conn->to_ping != NULL)
timeout_reset(conn->to_ping);
switch (i_stream_read(conn->input)) {
case 0:
return;
case -1:
/* disconnected */
i_error("Director %s disconnected%s", conn->name,
conn->handshake_received ? "" :
" before handshake finished");
director_connection_disconnected(&conn);
return;
case -2:
/* buffer full */
i_error("BUG: Director %s sent us more than %d bytes",
conn->name, MAX_INBUF_SIZE);
director_connection_disconnected(&conn);
return;
}
director_sync_freeze(dir);
while ((line = i_stream_next_line(conn->input)) != NULL) {
T_BEGIN {
ret = director_connection_handle_line(conn, line);
} T_END;
if (!ret) {
director_connection_disconnected(&conn);
break;
}
}
director_sync_thaw(dir);
}
static void director_connection_send_directors(struct director_connection *conn,
string_t *str)
{
struct director_host *const *hostp;
array_foreach(&conn->dir->dir_hosts, hostp) {
str_printfa(str, "DIRECTOR\t%s\t%u\n",
net_ip2addr(&(*hostp)->ip), (*hostp)->port);
}
}
static void
director_connection_send_hosts(struct director_connection *conn, string_t *str)
{
struct mail_host *const *hostp;
str_printfa(str, "HOST-HAND-START\t%u\n", conn->dir->ring_handshaked);
array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) {
str_printfa(str, "HOST\t%s\t%u\n",
net_ip2addr(&(*hostp)->ip), (*hostp)->vhost_count);
}
str_printfa(str, "HOST-HAND-END\t%u\n", conn->dir->ring_handshaked);
}
static int director_connection_send_users(struct director_connection *conn)
{
struct user *user;
int ret;
o_stream_cork(conn->output);
while ((user = user_directory_iter_next(conn->user_iter)) != NULL) {
if (!user_directory_user_has_connections(conn->dir->users,
user)) {
/* user is already expired */
continue;
}
T_BEGIN {
const char *line;
line = t_strdup_printf("USER\t%u\t%s\t%u\n",
user->username_hash,
net_ip2addr(&user->host->ip),
user->timestamp);
director_connection_send(conn, line);
} T_END;
if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
if ((ret = o_stream_flush(conn->output)) <= 0) {
/* continue later */
return ret;
}
}
}
user_directory_iter_deinit(&conn->user_iter);
director_connection_send(conn, "DONE\n");
i_assert(conn->io == NULL);
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
ret = o_stream_flush(conn->output);
o_stream_uncork(conn->output);
return ret;
}
static int director_connection_output(struct director_connection *conn)
{
if (conn->user_iter != NULL)
return director_connection_send_users(conn);
else
return o_stream_flush(conn->output);
}
static void
director_connection_init_timeout(struct director_connection *conn)
{
if (conn->host != NULL)
conn->host->last_failed = ioloop_time;
if (!conn->connected)
i_error("director(%s): Connect timed out", conn->name);
else
i_error("director(%s): Handshaking timed out", conn->name);
director_connection_disconnected(&conn);
}
static struct director_connection *
director_connection_init_common(struct director *dir, int fd)
{
struct director_connection *conn;
conn = i_new(struct director_connection, 1);
conn->fd = fd;
conn->dir = dir;
conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
o_stream_set_flush_callback(conn->output,
director_connection_output, conn);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
director_connection_init_timeout, conn);
DLLIST_PREPEND(&dir->connections, conn);
return conn;
}
static void director_connection_send_handshake(struct director_connection *conn)
{
director_connection_send(conn, t_strdup_printf(
"VERSION\t"DIRECTOR_VERSION_NAME"\t%u\t%u\n"
"ME\t%s\t%u\n",
DIRECTOR_VERSION_MAJOR, DIRECTOR_VERSION_MINOR,
net_ip2addr(&conn->dir->self_ip), conn->dir->self_port));
}
struct director_connection *
director_connection_init_in(struct director *dir, int fd,
const struct ip_addr *ip)
{
struct director_connection *conn;
conn = director_connection_init_common(dir, fd);
conn->in = TRUE;
conn->connected = TRUE;
conn->name = i_strdup_printf("%s/in", net_ip2addr(ip));
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
director_connection_send_handshake(conn);
return conn;
}
static void director_connection_connected(struct director_connection *conn)
{
struct director *dir = conn->dir;
string_t *str = t_str_new(1024);
int err;
if ((err = net_geterror(conn->fd)) != 0) {
conn->host->last_failed = ioloop_time;
i_error("director(%s): connect() failed: %s", conn->name,
strerror(err));
director_connection_disconnected(&conn);
return;
}
conn->connected = TRUE;
if (dir->right != NULL) {
/* see if we should disconnect or keep the existing
connection. */
if (director_host_cmp_to_self(conn->host, dir->right->host,
dir->self_host) <= 0) {
/* the old connection is the correct one */
director_connection_deinit(&conn);
return;
}
director_connection_deinit(&dir->right);
}
dir->right = conn;
i_free(conn->name);
conn->name = i_strdup_printf("%s/right", conn->host->name);
io_remove(&conn->io);
director_connection_send_handshake(conn);
director_connection_send_directors(conn, str);
director_connection_send_hosts(conn, str);
director_connection_send(conn, str_c(str));
conn->user_iter = user_directory_iter_init(dir->users);
(void)director_connection_send_users(conn);
}
struct director_connection *
director_connection_init_out(struct director *dir, int fd,
struct director_host *host)
{
struct director_connection *conn;
/* make sure we don't keep old sequence values across restarts */
host->last_seq = 0;
conn = director_connection_init_common(dir, fd);
conn->name = i_strdup_printf("%s/out", host->name);
conn->host = host;
/* use IO_READ instead of IO_WRITE, so that we don't assign
dir->right until remote has actually sent some data */
conn->io = io_add(conn->fd, IO_READ,
director_connection_connected, conn);
return conn;
}
void director_connection_deinit(struct director_connection **_conn)
{
struct director_connection *conn = *_conn;
struct director *dir = conn->dir;
*_conn = NULL;
DLLIST_REMOVE(&dir->connections, conn);
if (dir->left == conn)
dir->left = NULL;
if (dir->right == conn)
dir->right = NULL;
if (conn->user_iter != NULL)
user_directory_iter_deinit(&conn->user_iter);
if (conn->to != NULL)
timeout_remove(&conn->to);
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
if (conn->io != NULL)
io_remove(&conn->io);
i_stream_unref(&conn->input);
o_stream_unref(&conn->output);
if (close(conn->fd) < 0)
i_error("close(director connection) failed: %m");
if (conn->in)
master_service_client_connection_destroyed(master_service);
i_free(conn->name);
i_free(conn);
if (dir->left == NULL || dir->right == NULL) {
/* we aren't synced until we're again connected to a ring */
dir->sync_seq++;
dir->ring_synced = FALSE;
}
}
void director_connection_disconnected(struct director_connection **_conn)
{
struct director_connection *conn = *_conn;
struct director *dir = conn->dir;
director_connection_deinit(_conn);
if (dir->right == NULL)
director_connect(dir);
}
static void director_connection_timeout(struct director_connection *conn)
{
director_connection_disconnected(&conn);
}
void director_connection_send(struct director_connection *conn,
const char *data)
{
unsigned int len = strlen(data);
off_t ret;
if (conn->output->closed || !conn->connected)
return;
ret = o_stream_send(conn->output, data, len);
if (ret != (off_t)len) {
if (ret < 0)
i_error("director(%s): write() failed: %m", conn->name);
else {
i_error("director(%s): Output buffer full, "
"disconnecting", conn->name);
}
o_stream_close(conn->output);
conn->to = timeout_add(0, director_connection_timeout, conn);
}
}
void director_connection_send_except(struct director_connection *conn,
struct director_host *skip_host,
const char *data)
{
if (conn->host != skip_host)
director_connection_send(conn, data);
}
static void director_connection_ping_timeout(struct director_connection *conn)
{
i_error("director(%s): Ping timed out, disconnecting", conn->name);
director_connection_disconnected(&conn);
}
static void director_connection_ping(struct director_connection *conn)
{
conn->sync_ping = FALSE;
if (conn->ping_waiting)
return;
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
director_connection_ping_timeout, conn);
director_connection_send(conn, "PING\n");
conn->ping_waiting = TRUE;
}
const char *director_connection_get_name(struct director_connection *conn)
{
return conn->name;
}
struct director_host *
director_connection_get_host(struct director_connection *conn)
{
return conn->host;
}
struct director_connection *
director_connection_find_outgoing(struct director *dir,
struct director_host *host)
{
struct director_connection *conn;
for (conn = dir->connections; conn != NULL; conn = conn->next) {
if (conn->host == host && !conn->in)
return conn;
}
return NULL;
}
void director_connection_cork(struct director_connection *conn)
{
o_stream_cork(conn->output);
}
void director_connection_uncork(struct director_connection *conn)
{
o_stream_uncork(conn->output);
}
void director_connection_wait_sync(struct director_connection *conn)
{
/* switch to faster ping timeout. avoid reseting the timeout if it's
already fast. */
if (conn->ping_waiting || conn->sync_ping)
return;
if (conn->to_ping != NULL)
timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
director_connection_ping, conn);
conn->sync_ping = TRUE;
}
void director_connections_deinit(struct director *dir)
{
struct director_connection *conn;
while (dir->connections != NULL) {
conn = dir->connections;
dir->connections = conn->next;
director_connection_deinit(&conn);
}
}