fts-storage.c revision c63c3c4d548416914b8c6734fe18dd69bb900775
/* Copyright (c) 2006-2008 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "str.h"
#include "istream.h"
#include "message-parser.h"
#include "message-decoder.h"
#include "mail-search.h"
#include "mail-storage-private.h"
#include "fts-api-private.h"
#include "fts-storage.h"
#include "fts-plugin.h"
#include <stdlib.h>
#define FTS_CONTEXT(obj) \
MODULE_CONTEXT(obj, fts_storage_module)
#define FTS_MAIL_CONTEXT(obj) \
MODULE_CONTEXT(obj, fts_mail_module)
#define FTS_SEARCH_NONBLOCK_COUNT 10
#define FTS_BUILD_NOTIFY_INTERVAL_SECS 10
struct fts_storage_build_context {
struct mail_search_context *search_ctx;
struct mail_search_seqset seqset;
struct mail_search_arg search_arg;
struct mail *mail;
struct fts_backend_build_context *build;
struct timeval search_start_time, last_notify;
uint32_t uid;
string_t *headers;
};
struct fts_transaction_context {
union mailbox_transaction_module_context module_ctx;
struct fts_storage_build_context *build_ctx;
struct mail *mail;
uint32_t last_uid;
unsigned int free_mail:1;
unsigned int expunges:1;
};
static MODULE_CONTEXT_DEFINE_INIT(fts_storage_module,
&mail_storage_module_register);
static MODULE_CONTEXT_DEFINE_INIT(fts_mail_module, &mail_module_register);
static int fts_mailbox_close(struct mailbox *box)
{
struct fts_mailbox *fbox = FTS_CONTEXT(box);
int ret;
if (fbox->backend_substr != NULL)
fts_backend_deinit(&fbox->backend_substr);
if (fbox->backend_fast != NULL)
fts_backend_deinit(&fbox->backend_fast);
ret = fbox->module_ctx.super.close(box);
i_free(fbox);
return ret;
}
static int fts_build_mail_flush_headers(struct fts_storage_build_context *ctx)
{
if (str_len(ctx->headers) == 0)
return 0;
if (fts_backend_build_more(ctx->build, ctx->uid, str_data(ctx->headers),
str_len(ctx->headers), TRUE) < 0)
return -1;
str_truncate(ctx->headers, 0);
return 0;
}
static bool fts_build_want_index_part(const struct message_block *block)
{
/* we'll index only text/xxx and message/rfc822 parts for now */
return (block->part->flags &
(MESSAGE_PART_FLAG_TEXT |
MESSAGE_PART_FLAG_MESSAGE_RFC822)) != 0;
}
static void fts_build_mail_header(struct fts_storage_build_context *ctx,
const struct message_block *block)
{
const struct message_header_line *hdr = block->hdr;
/* hdr->full_value is always set because we get the block from
message_decoder */
str_append(ctx->headers, hdr->name);
str_append_n(ctx->headers, hdr->middle, hdr->middle_len);
str_append_n(ctx->headers, hdr->full_value, hdr->full_value_len);
if (!hdr->no_newline)
str_append_c(ctx->headers, '\n');
}
static int fts_build_mail(struct fts_storage_build_context *ctx, uint32_t uid)
{
struct istream *input;
struct message_parser_ctx *parser;
struct message_decoder_context *decoder;
struct message_block raw_block, block;
struct message_part *prev_part, *parts;
int ret;
ctx->uid = uid;
if (mail_get_stream(ctx->mail, NULL, NULL, &input) < 0)
return -1;
prev_part = NULL;
parser = message_parser_init(pool_datastack_create(), input,
MESSAGE_HEADER_PARSER_FLAG_CLEAN_ONELINE,
0);
decoder = message_decoder_init(TRUE);
for (;;) {
ret = message_parser_parse_next_block(parser, &raw_block);
i_assert(ret != 0);
if (ret < 0) {
if (input->stream_errno == 0)
ret = 0;
break;
}
if (raw_block.hdr == NULL && raw_block.size != 0 &&
!fts_build_want_index_part(&raw_block)) {
/* skipping this body */
continue;
}
if (!message_decoder_decode_next_block(decoder, &raw_block,
&block))
continue;
if (block.hdr != NULL)
fts_build_mail_header(ctx, &block);
else if (block.size == 0) {
/* end of headers */
str_append_c(ctx->headers, '\n');
} else {
if (fts_backend_build_more(ctx->build, ctx->uid,
block.data, block.size,
FALSE) < 0) {
ret = -1;
break;
}
}
}
if (message_parser_deinit(&parser, &parts) < 0)
mail_set_cache_corrupted(ctx->mail, MAIL_FETCH_MESSAGE_PARTS);
message_decoder_deinit(&decoder);
if (ret == 0) {
/* Index all headers at the end. This is required for Squat,
because it can handle only incremental UIDs. */
ret = fts_build_mail_flush_headers(ctx);
}
return ret;
}
static int fts_build_init(struct fts_search_context *fctx)
{
struct mailbox_transaction_context *t = fctx->t;
struct fts_backend *backend = fctx->build_backend;
struct fts_storage_build_context *ctx;
struct fts_backend_build_context *build;
struct mail_search_seqset seqset;
uint32_t last_uid, last_uid_locked;
if (fts_backend_get_last_uid(backend, &last_uid) < 0)
return -1;
memset(&seqset, 0, sizeof(seqset));
mailbox_get_uids(t->box, last_uid+1, (uint32_t)-1,
&seqset.seq1, &seqset.seq2);
if (seqset.seq1 == 0) {
/* no new messages */
return 0;
}
if (fctx->best_arg->type == SEARCH_HEADER) {
/* we're not updating the index just for header lookups */
return 0;
}
if (fts_backend_build_init(backend, &last_uid_locked, &build) < 0)
return -1;
if (last_uid != last_uid_locked) {
/* changed, need to get again the sequences */
i_assert(last_uid < last_uid_locked);
last_uid = last_uid_locked;
mailbox_get_uids(t->box, last_uid+1, (uint32_t)-1,
&seqset.seq1, &seqset.seq2);
if (seqset.seq1 == 0) {
/* no new messages */
(void)fts_backend_build_deinit(&build);
return 0;
}
}
ctx = i_new(struct fts_storage_build_context, 1);
ctx->build = build;
ctx->seqset = seqset;
ctx->search_arg.type = SEARCH_SEQSET;
ctx->search_arg.value.seqset = &ctx->seqset;
ctx->headers = str_new(default_pool, 512);
ctx->mail = mail_alloc(t, 0, NULL);
ctx->search_ctx = mailbox_search_init(t, NULL, &ctx->search_arg, NULL);
fctx->build_ctx = ctx;
return 0;
}
static int fts_build_deinit(struct fts_storage_build_context **_ctx)
{
struct fts_storage_build_context *ctx = *_ctx;
struct mailbox *box = ctx->mail->transaction->box;
int ret = 0;
*_ctx = NULL;
if (mailbox_search_deinit(&ctx->search_ctx) < 0)
ret = -1;
mail_free(&ctx->mail);
if (fts_backend_build_deinit(&ctx->build) < 0)
ret = -1;
if (ioloop_time - ctx->search_start_time.tv_sec >=
FTS_BUILD_NOTIFY_INTERVAL_SECS) {
/* we notified at least once */
box->storage->callbacks->
notify_ok(box, "Mailbox indexing finished",
box->storage->callback_context);
}
str_free(&ctx->headers);
i_free(ctx);
return ret;
}
static void fts_build_notify(struct fts_storage_build_context *ctx)
{
struct mailbox *box = ctx->mail->transaction->box;
float percentage;
unsigned int msecs, secs;
if (ctx->last_notify.tv_sec == 0) {
/* set the search time in here, in case a plugin
already spent some time indexing the mailbox */
ctx->search_start_time = ioloop_timeval;
} else if (box->storage->callbacks->notify_ok != NULL) {
percentage = (ctx->mail->seq - ctx->seqset.seq1) * 100.0 /
(ctx->seqset.seq2 - ctx->seqset.seq1);
msecs = (ioloop_timeval.tv_sec -
ctx->search_start_time.tv_sec) * 1000 +
(ioloop_timeval.tv_usec -
ctx->search_start_time.tv_usec) / 1000;
secs = (msecs / (percentage / 100.0) - msecs) / 1000;
T_BEGIN {
const char *text;
text = t_strdup_printf("Indexed %d%% of the mailbox, "
"ETA %d:%02d", (int)percentage,
secs/60, secs%60);
box->storage->callbacks->
notify_ok(box, text,
box->storage->callback_context);
} T_END;
}
ctx->last_notify = ioloop_timeval;
}
static int fts_build_more(struct fts_storage_build_context *ctx)
{
unsigned int count = 0;
int ret;
if (ioloop_time - ctx->last_notify.tv_sec >=
FTS_BUILD_NOTIFY_INTERVAL_SECS)
fts_build_notify(ctx);
while (mailbox_search_next(ctx->search_ctx, ctx->mail) > 0) {
T_BEGIN {
ret = fts_build_mail(ctx, ctx->mail->uid);
} T_END;
if (ret < 0)
return -1;
if (++count == FTS_SEARCH_NONBLOCK_COUNT)
return 0;
}
return 1;
}
static bool fts_try_build_init(struct fts_search_context *fctx)
{
if (fctx->build_backend == NULL) {
fctx->build_initialized = TRUE;
return TRUE;
}
if (fts_backend_is_building(fctx->build_backend)) {
/* this process is already building the indexes */
return FALSE;
}
fctx->build_initialized = TRUE;
if (fts_build_init(fctx) < 0) {
fctx->build_backend = NULL;
return TRUE;
}
if (fctx->build_ctx == NULL) {
/* the index was up to date */
fts_search_lookup(fctx);
}
return TRUE;
}
static struct mail_search_context *
fts_mailbox_search_init(struct mailbox_transaction_context *t,
const char *charset, struct mail_search_arg *args,
const enum mail_sort_type *sort_program)
{
struct fts_mailbox *fbox = FTS_CONTEXT(t->box);
struct mail_search_context *ctx;
struct fts_search_context *fctx;
ctx = fbox->module_ctx.super.
search_init(t, charset, args, sort_program);
fctx = i_new(struct fts_search_context, 1);
fctx->fbox = fbox;
fctx->t = t;
fctx->args = args;
fctx->charset = ctx->charset;
MODULE_CONTEXT_SET(ctx, fts_storage_module, fctx);
if (fbox->backend_substr == NULL && fbox->backend_fast == NULL)
return ctx;
fts_search_analyze(fctx);
(void)fts_try_build_init(fctx);
return ctx;
}
static int fts_mailbox_search_next_nonblock(struct mail_search_context *ctx,
struct mail *mail, bool *tryagain_r)
{
struct fts_mailbox *fbox = FTS_CONTEXT(ctx->transaction->box);
struct fts_search_context *fctx = FTS_CONTEXT(ctx);
int ret;
if (!fctx->build_initialized) {
/* we're still waiting for this process (but another command)
to finish building the indexes */
if (!fts_try_build_init(fctx)) {
*tryagain_r = TRUE;
return 0;
}
}
if (fctx->build_ctx != NULL) {
/* this command is still building the indexes */
ret = fts_build_more(fctx->build_ctx);
if (ret == 0) {
*tryagain_r = TRUE;
return 0;
}
/* finished / error */
fts_build_deinit(&fctx->build_ctx);
if (ret > 0)
fts_search_lookup(fctx);
}
/* if we're here, the indexes are either built or they're not used */
return fbox->module_ctx.super.
search_next_nonblock(ctx, mail, tryagain_r);
}
static void
fts_mailbox_search_args_definite_set(struct fts_search_context *fctx)
{
struct mail_search_arg *arg;
for (arg = fctx->args; arg != NULL; arg = arg->next) {
switch (arg->type) {
case SEARCH_TEXT:
case SEARCH_BODY:
case SEARCH_BODY_FAST:
case SEARCH_TEXT_FAST:
arg->result = 1;
break;
default:
break;
}
}
}
static int fts_mailbox_search_next_update_seq(struct mail_search_context *ctx)
{
struct fts_mailbox *fbox = FTS_CONTEXT(ctx->transaction->box);
struct fts_search_context *fctx = FTS_CONTEXT(ctx);
struct seq_range *def_range, *maybe_range, *range;
unsigned int def_count, maybe_count;
uint32_t wanted_seq;
bool use_maybe;
int ret;
if (!fctx->seqs_set)
return fbox->module_ctx.super.search_next_update_seq(ctx);
wanted_seq = ctx->seq + 1;
/* fts_search_lookup() was called successfully */
for (;;) {
def_range = array_get_modifiable(&fctx->definite_seqs,
&def_count);
maybe_range = array_get_modifiable(&fctx->maybe_seqs,
&maybe_count);
/* if we're ahead of current positions, skip them */
while (fctx->definite_idx < def_count &&
wanted_seq > def_range[fctx->definite_idx].seq2)
fctx->definite_idx++;
while (fctx->maybe_idx < maybe_count &&
wanted_seq > maybe_range[fctx->maybe_idx].seq2)
fctx->maybe_idx++;
/* use whichever is lower of definite/maybe */
if (fctx->definite_idx == def_count) {
if (fctx->maybe_idx == maybe_count) {
/* we're finished */
return 0;
}
use_maybe = TRUE;
} else if (fctx->maybe_idx == maybe_count) {
use_maybe = FALSE;
} else {
use_maybe = maybe_range[fctx->maybe_idx].seq1 <
def_range[fctx->definite_idx].seq2;
}
if (use_maybe)
range = maybe_range + fctx->maybe_idx;
else
range = def_range + fctx->definite_idx;
i_assert(range->seq1 <= range->seq2);
if (wanted_seq > range->seq1) {
/* current sequence is already larger than where
range begins, so use the current sequence. */
range->seq1 = wanted_seq+1;
} else {
wanted_seq = range->seq1;
range->seq1++;
}
if (range->seq1 > range->seq2)
range->seq2 = 0;
/* ctx->seq points to previous sequence we want */
ctx->seq = wanted_seq - 1;
ret = fbox->module_ctx.super.search_next_update_seq(ctx);
if (ret <= 0 || wanted_seq == ctx->seq)
break;
wanted_seq = ctx->seq;
mail_search_args_reset(ctx->args, FALSE);
}
if (!use_maybe) {
/* we have definite results, update args */
fts_mailbox_search_args_definite_set(fctx);
}
return ret;
}
static int fts_mailbox_search_deinit(struct mail_search_context *ctx)
{
struct fts_mailbox *fbox = FTS_CONTEXT(ctx->transaction->box);
struct fts_search_context *fctx = FTS_CONTEXT(ctx);
if (fctx->build_ctx != NULL) {
/* the search was cancelled */
fts_build_deinit(&fctx->build_ctx);
}
if (array_is_created(&fctx->definite_seqs))
array_free(&fctx->definite_seqs);
if (array_is_created(&fctx->maybe_seqs))
array_free(&fctx->maybe_seqs);
i_free(fctx);
return fbox->module_ctx.super.search_deinit(ctx);
}
static void fts_mail_expunge(struct mail *_mail)
{
struct mail_private *mail = (struct mail_private *)_mail;
union mail_module_context *fmail = FTS_MAIL_CONTEXT(mail);
struct fts_mailbox *fbox = FTS_CONTEXT(_mail->box);
struct fts_transaction_context *ft = FTS_CONTEXT(_mail->transaction);
ft->expunges = TRUE;
if (fbox->backend_substr != NULL)
fts_backend_expunge(fbox->backend_substr, _mail);
if (fbox->backend_fast != NULL)
fts_backend_expunge(fbox->backend_fast, _mail);
fmail->super.expunge(_mail);
}
static struct mail *
fts_mail_alloc(struct mailbox_transaction_context *t,
enum mail_fetch_field wanted_fields,
struct mailbox_header_lookup_ctx *wanted_headers)
{
struct fts_mailbox *fbox = FTS_CONTEXT(t->box);
union mail_module_context *fmail;
struct mail *_mail;
struct mail_private *mail;
_mail = fbox->module_ctx.super.
mail_alloc(t, wanted_fields, wanted_headers);
if (fbox->backend_substr != NULL || fbox->backend_fast != NULL) {
mail = (struct mail_private *)_mail;
fmail = p_new(mail->pool, union mail_module_context, 1);
fmail->super = mail->v;
mail->v.expunge = fts_mail_expunge;
MODULE_CONTEXT_SET_SELF(mail, fts_mail_module, fmail);
}
return _mail;
}
static void fts_box_backends_init(struct mailbox *box)
{
struct fts_mailbox *fbox = FTS_CONTEXT(box);
struct fts_backend *backend;
const char *const *tmp;
for (tmp = t_strsplit(fbox->env, ", "); *tmp != NULL; tmp++) {
backend = fts_backend_init(*tmp, box);
if (backend == NULL)
continue;
if ((backend->flags &
FTS_BACKEND_FLAG_SUBSTRING_LOOKUPS) != 0) {
if (fbox->backend_substr != NULL) {
i_fatal("fts: duplicate substring backend: %s",
*tmp);
}
fbox->backend_substr = backend;
} else {
if (fbox->backend_fast != NULL) {
i_fatal("fts: duplicate fast backend: %s",
*tmp);
}
fbox->backend_fast = backend;
}
}
}
static struct mailbox_transaction_context *
fts_transaction_begin(struct mailbox *box,
enum mailbox_transaction_flags flags)
{
struct fts_mailbox *fbox = FTS_CONTEXT(box);
struct mailbox_transaction_context *t;
struct fts_transaction_context *ft;
ft = i_new(struct fts_transaction_context, 1);
/* the backend creation is delayed until the first transaction is
started. at that point the mailbox has been synced at least once. */
if (!fbox->backend_set) {
fts_box_backends_init(box);
fbox->backend_set = TRUE;
}
t = fbox->module_ctx.super.transaction_begin(box, flags);
MODULE_CONTEXT_SET(t, fts_storage_module, ft);
return t;
}
static void
fts_storage_build_context_deinit(struct fts_storage_build_context *build_ctx)
{
(void)fts_backend_build_deinit(&build_ctx->build);
str_free(&build_ctx->headers);
i_free(build_ctx);
}
static void
fts_transaction_finish(struct mailbox *box, struct fts_transaction_context *ft,
bool committed)
{
struct fts_mailbox *fbox = FTS_CONTEXT(box);
if (ft->expunges) {
if (fbox->backend_fast != NULL) {
fts_backend_expunge_finish(fbox->backend_fast,
box, committed);
}
}
i_free(ft);
}
static void fts_transaction_rollback(struct mailbox_transaction_context *t)
{
struct mailbox *box = t->box;
struct fts_mailbox *fbox = FTS_CONTEXT(box);
struct fts_transaction_context *ft = FTS_CONTEXT(t);
if (ft->build_ctx != NULL) {
fts_storage_build_context_deinit(ft->build_ctx);
ft->build_ctx = NULL;
}
if (ft->free_mail)
mail_free(&ft->mail);
fbox->module_ctx.super.transaction_rollback(t);
fts_transaction_finish(box, ft, FALSE);
}
static int fts_transaction_commit(struct mailbox_transaction_context *t,
uint32_t *uid_validity_r,
uint32_t *first_saved_uid_r,
uint32_t *last_saved_uid_r)
{
struct mailbox *box = t->box;
struct fts_mailbox *fbox = FTS_CONTEXT(box);
struct fts_transaction_context *ft = FTS_CONTEXT(t);
int ret;
if (ft->build_ctx != NULL) {
fts_storage_build_context_deinit(ft->build_ctx);
ft->build_ctx = NULL;
}
if (ft->free_mail)
mail_free(&ft->mail);
ret = fbox->module_ctx.super.transaction_commit(t,
uid_validity_r,
first_saved_uid_r,
last_saved_uid_r);
fts_transaction_finish(box, ft, ret == 0);
return ret;
}
void fts_mailbox_opened(struct mailbox *box)
{
struct fts_mailbox *fbox;
const char *env;
if (fts_next_hook_mailbox_opened != NULL)
fts_next_hook_mailbox_opened(box);
env = getenv("FTS");
if (env == NULL)
return;
fbox = i_new(struct fts_mailbox, 1);
fbox->env = env;
fbox->module_ctx.super = box->v;
box->v.close = fts_mailbox_close;
box->v.search_init = fts_mailbox_search_init;
box->v.search_next_nonblock = fts_mailbox_search_next_nonblock;
box->v.search_next_update_seq = fts_mailbox_search_next_update_seq;
box->v.search_deinit = fts_mailbox_search_deinit;
box->v.mail_alloc = fts_mail_alloc;
box->v.transaction_begin = fts_transaction_begin;
box->v.transaction_rollback = fts_transaction_rollback;
box->v.transaction_commit = fts_transaction_commit;
MODULE_CONTEXT_SET(box, fts_storage_module, fbox);
}