master-connection.c revision feaa6a3d82ea61496ced1f83a726ff33047c7da2
/* Copyright (c) 2011-2012 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "ioloop.h"
#include "istream.h"
#include "write-full.h"
#include "strescape.h"
#include "process-title.h"
#include "master-service.h"
#include "master-service-settings.h"
#include "mail-namespace.h"
#include "mail-storage-private.h"
#include "mail-storage-service.h"
#include "mail-search-build.h"
#include "master-connection.h"
#include <unistd.h>
#define INDEXER_PROTOCOL_MAJOR_VERSION 1
#define INDEXER_PROTOCOL_MINOR_VERSION 0
#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n"
#define INDEXER_MASTER_NAME "indexer-master-worker"
struct master_connection {
struct mail_storage_service_ctx *storage_service;
int fd;
struct io *io;
struct istream *input;
struct ostream *output;
unsigned int version_received:1;
};
static void ATTR_NULL(1, 2)
indexer_worker_refresh_proctitle(const char *username, const char *mailbox,
uint32_t seq1, uint32_t seq2)
{
if (!master_service_settings_get(master_service)->verbose_proctitle)
return;
if (username == NULL)
process_title_set("[idling]");
else if (seq1 == 0)
process_title_set(t_strdup_printf("[%s %s]", username, mailbox));
else {
process_title_set(t_strdup_printf("[%s %s - %u/%u]",
username, mailbox, seq1, seq2));
}
}
static int
index_mailbox_precache(struct master_connection *conn, struct mailbox *box)
{
struct mail_storage *storage = mailbox_get_storage(box);
const char *username = mail_storage_get_user(storage)->username;
const char *box_vname = mailbox_get_vname(box);
struct mailbox_status status;
struct mailbox_transaction_context *trans;
struct mail_search_args *search_args;
struct mail_search_context *ctx;
struct mail *mail;
struct mailbox_metadata metadata;
uint32_t seq;
char percentage_str[2+1+1];
unsigned int counter = 0, max, percentage, percentage_sent = 0;
int ret = 0;
if (mailbox_get_metadata(box, MAILBOX_METADATA_PRECACHE_FIELDS,
&metadata) < 0)
return -1;
mailbox_get_open_status(box, STATUS_MESSAGES | STATUS_LAST_CACHED_SEQ,
&status);
seq = status.last_cached_seq + 1;
trans = mailbox_transaction_begin(box, MAILBOX_TRANSACTION_FLAG_NO_CACHE_DEC);
search_args = mail_search_build_init();
mail_search_build_add_seqset(search_args, seq, status.messages);
ctx = mailbox_search_init(trans, search_args, NULL,
metadata.precache_fields, NULL);
mail_search_args_unref(&search_args);
max = status.messages - seq + 1;
while (mailbox_search_next(ctx, &mail)) {
mail_precache(mail);
if (++counter % 100 == 0) {
percentage = counter*100 / max;
if (percentage != percentage_sent && percentage < 100) {
percentage_sent = percentage;
if (i_snprintf(percentage_str,
sizeof(percentage_str), "%u\n",
percentage) < 0)
i_unreached();
(void)write_full(conn->fd, percentage_str,
strlen(percentage_str));
}
indexer_worker_refresh_proctitle(username, box_vname,
counter, max);
}
}
if (mailbox_search_deinit(&ctx) < 0)
ret = -1;
if (mailbox_transaction_commit(&trans) < 0)
ret = -1;
if (ret == 0) {
i_info("Indexed %u messages in %s",
counter, mailbox_get_vname(box));
}
return ret;
}
static int
index_mailbox(struct master_connection *conn, struct mail_user *user,
const char *mailbox, unsigned int max_recent_msgs,
const char *what)
{
struct mail_namespace *ns;
struct mailbox *box;
struct mailbox_status status;
const char *path, *errstr;
enum mail_error error;
enum mailbox_sync_flags sync_flags = MAILBOX_SYNC_FLAG_FULL_READ;
int ret = 0;
ns = mail_namespace_find(user->namespaces, mailbox);
if (ns == NULL) {
i_error("Namespace not found for mailbox %s: ", mailbox);
return -1;
}
box = mailbox_alloc(ns->list, mailbox, 0);
path = mailbox_get_path_to(box, MAILBOX_LIST_PATH_TYPE_INDEX);
if (*path == '\0') {
i_info("Indexes disabled for Mailbox %s, skipping", mailbox);
mailbox_free(&box);
return 0;
}
if (max_recent_msgs != 0) {
/* index only if there aren't too many recent messages.
don't bother syncing the mailbox, that alone can take a
while with large maildirs. */
if (mailbox_open(box) < 0) {
i_error("Opening mailbox %s failed: %s", mailbox,
mail_storage_get_last_error(mailbox_get_storage(box), NULL));
ret = -1;
} else {
mailbox_get_open_status(box, STATUS_RECENT, &status);
}
if (ret < 0 || status.recent > max_recent_msgs) {
mailbox_free(&box);
return ret;
}
}
if (strchr(what, 'o') != NULL)
sync_flags |= MAILBOX_SYNC_FLAG_OPTIMIZE;
if (mailbox_sync(box, sync_flags) < 0) {
errstr = mail_storage_get_last_error(mailbox_get_storage(box),
&error);
if (error != MAIL_ERROR_NOTFOUND) {
i_error("Syncing mailbox %s failed: %s",
mailbox, errstr);
} else if (user->mail_debug) {
i_debug("Syncing mailbox %s failed: %s",
mailbox, errstr);
}
ret = -1;
} else if (strchr(what, 'i') != NULL) {
if (index_mailbox_precache(conn, box) < 0)
ret = -1;
}
mailbox_free(&box);
return ret;
}
static int
master_connection_input_line(struct master_connection *conn, const char *line)
{
const char *const *args = t_strsplit_tabescaped(line);
struct mail_storage_service_input input;
struct mail_storage_service_user *service_user;
struct mail_user *user;
const char *str, *error;
unsigned int max_recent_msgs;
int ret;
/* <username> <mailbox> <max_recent_msgs> [i][o] */
if (str_array_length(args) != 4 ||
str_to_uint(args[2], &max_recent_msgs) < 0 || args[3][0] == '\0') {
i_error("Invalid input from master: %s", line);
return -1;
}
memset(&input, 0, sizeof(input));
input.module = "mail";
input.service = "indexer-worker";
input.username = args[0];
if (mail_storage_service_lookup_next(conn->storage_service, &input,
&service_user, &user, &error) <= 0) {
i_error("User %s lookup failed: %s", args[0], error);
ret = -1;
} else {
indexer_worker_refresh_proctitle(user->username, args[1], 0, 0);
ret = index_mailbox(conn, user, args[1],
max_recent_msgs, args[3]);
indexer_worker_refresh_proctitle(NULL, NULL, 0, 0);
mail_user_unref(&user);
mail_storage_service_user_free(&service_user);
}
str = ret < 0 ? "-1\n" : "100\n";
return write_full(conn->fd, str, strlen(str));
}
static void master_connection_input(struct master_connection *conn)
{
const char *line;
int ret;
if (i_stream_read(conn->input) < 0) {
master_service_stop(master_service);
return;
}
if (!conn->version_received) {
if ((line = i_stream_next_line(conn->input)) == NULL)
return;
if (!version_string_verify(line, INDEXER_MASTER_NAME,
INDEXER_PROTOCOL_MAJOR_VERSION)) {
i_error("Indexer master not compatible with this master "
"(mixed old and new binaries?)");
master_service_stop(master_service);
return;
}
conn->version_received = TRUE;
}
while ((line = i_stream_next_line(conn->input)) != NULL) {
T_BEGIN {
ret = master_connection_input_line(conn, line);
} T_END;
if (ret < 0) {
master_service_stop(master_service);
break;
}
}
}
struct master_connection *
master_connection_create(int fd, struct mail_storage_service_ctx *storage_service)
{
struct master_connection *conn;
const char *handshake;
conn = i_new(struct master_connection, 1);
conn->storage_service = storage_service;
conn->fd = fd;
conn->io = io_add(conn->fd, IO_READ, master_connection_input, conn);
conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE);
handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE,
master_service_get_process_limit(master_service));
(void)write_full(conn->fd, handshake, strlen(handshake));
return conn;
}
void master_connection_destroy(struct master_connection **_conn)
{
struct master_connection *conn = *_conn;
*_conn = NULL;
io_remove(&conn->io);
i_stream_destroy(&conn->input);
if (close(conn->fd) < 0)
i_error("close(master conn) failed: %m");
i_free(conn);
master_service_client_connection_destroyed(master_service);
}