solr-connection.c revision df1713bd29d29a3e3f3ebfdf05f929525825a7d3
/* Copyright (c) 2006-2012 Dovecot authors, see the included COPYING file */
/* curl: 7.16.0 curl_multi_timeout */
#include "lib.h"
#include "array.h"
#include "hash.h"
#include "str.h"
#include "strescape.h"
#include "solr-connection.h"
#include <curl/curl.h>
#include <expat.h>
enum solr_xml_response_state {
SOLR_XML_RESPONSE_STATE_ROOT,
SOLR_XML_RESPONSE_STATE_RESPONSE,
SOLR_XML_RESPONSE_STATE_RESULT,
SOLR_XML_RESPONSE_STATE_DOC,
SOLR_XML_RESPONSE_STATE_CONTENT
};
enum solr_xml_content_state {
SOLR_XML_CONTENT_STATE_NONE = 0,
SOLR_XML_CONTENT_STATE_UID,
SOLR_XML_CONTENT_STATE_SCORE,
SOLR_XML_CONTENT_STATE_MAILBOX,
SOLR_XML_CONTENT_STATE_NAMESPACE,
SOLR_XML_CONTENT_STATE_UIDVALIDITY
};
struct solr_lookup_xml_context {
enum solr_xml_response_state state;
enum solr_xml_content_state content_state;
int depth;
uint32_t uid, uidvalidity;
float score;
char *mailbox, *ns;
pool_t result_pool;
/* box_id -> solr_result */
struct hash_table *mailboxes;
ARRAY_DEFINE(results, struct solr_result *);
};
struct solr_connection_post {
struct solr_connection *conn;
const unsigned char *data;
size_t size, pos;
char *url;
unsigned int failed:1;
};
struct solr_connection {
CURL *curl;
CURLM *curlm;
char curl_errorbuf[CURL_ERROR_SIZE];
struct curl_slist *headers, *headers_post;
XML_Parser xml_parser;
char *url, *last_sent_url;
char *http_failure;
unsigned int debug:1;
unsigned int posting:1;
unsigned int xml_failed:1;
};
static size_t
curl_output_func(void *data, size_t element_size, size_t nmemb, void *context)
{
struct solr_connection_post *post = context;
size_t size = element_size * nmemb;
/* @UNSAFE */
if (size > post->size - post->pos)
size = post->size - post->pos;
memcpy(data, post->data + post->pos, size);
post->pos += size;
return size;
}
static int solr_xml_parse(struct solr_connection *conn,
const void *data, size_t size, bool done)
{
enum XML_Error err;
int line, col;
if (conn->xml_failed)
return -1;
if (XML_Parse(conn->xml_parser, data, size, done))
return 0;
err = XML_GetErrorCode(conn->xml_parser);
if (err != XML_ERROR_FINISHED) {
line = XML_GetCurrentLineNumber(conn->xml_parser);
col = XML_GetCurrentColumnNumber(conn->xml_parser);
i_error("fts_solr: Invalid XML input at %d:%d: %s "
"(near: %.*s)", line, col, XML_ErrorString(err),
(int)I_MIN(size, 128), data);
conn->xml_failed = TRUE;
return -1;
}
return 0;
}
static size_t
curl_input_func(void *data, size_t element_size, size_t nmemb, void *context)
{
struct solr_connection *conn = context;
size_t size = element_size * nmemb;
(void)solr_xml_parse(conn, data, size, FALSE);
return size;
}
static size_t
curl_header_func(void *data, size_t element_size, size_t nmemb, void *context)
{
struct solr_connection *conn = context;
size_t size = element_size * nmemb;
const unsigned char *p;
size_t i;
if (conn->http_failure != NULL)
return size;
for (i = 0, p = data; i < size; i++) {
if (p[i] == ' ') {
i++;
break;
}
}
if (i == size || p[i] < '0' || p[i] > '9')
i = 0;
conn->http_failure = i_strndup(p + i, size - i);
return size;
}
struct solr_connection *solr_connection_init(const char *url, bool debug)
{
struct solr_connection *conn;
conn = i_new(struct solr_connection, 1);
conn->url = i_strdup(url);
conn->debug = debug;
conn->curlm = curl_multi_init();
conn->curl = curl_easy_init();
if (conn->curl == NULL || conn->curlm == NULL) {
i_fatal_status(FATAL_OUTOFMEM,
"fts_solr: Failed to allocate curl");
}
/* set global curl options */
curl_easy_setopt(conn->curl, CURLOPT_ERRORBUFFER, conn->curl_errorbuf);
if (conn->debug)
curl_easy_setopt(conn->curl, CURLOPT_VERBOSE, 1L);
curl_easy_setopt(conn->curl, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(conn->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(conn->curl, CURLOPT_READFUNCTION, curl_output_func);
curl_easy_setopt(conn->curl, CURLOPT_WRITEFUNCTION, curl_input_func);
curl_easy_setopt(conn->curl, CURLOPT_WRITEDATA, conn);
curl_easy_setopt(conn->curl, CURLOPT_HEADERFUNCTION, curl_header_func);
curl_easy_setopt(conn->curl, CURLOPT_HEADERDATA, conn);
conn->headers = curl_slist_append(NULL, "Content-Type: text/xml");
conn->headers_post = curl_slist_append(NULL, "Content-Type: text/xml");
conn->headers_post = curl_slist_append(conn->headers_post,
"Transfer-Encoding: chunked");
conn->headers_post = curl_slist_append(conn->headers_post,
"Expect:");
curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, conn->headers);
conn->xml_parser = XML_ParserCreate("UTF-8");
if (conn->xml_parser == NULL) {
i_fatal_status(FATAL_OUTOFMEM,
"fts_solr: Failed to allocate XML parser");
}
return conn;
}
void solr_connection_deinit(struct solr_connection *conn)
{
curl_slist_free_all(conn->headers);
curl_slist_free_all(conn->headers_post);
curl_multi_cleanup(conn->curlm);
curl_easy_cleanup(conn->curl);
i_free(conn->last_sent_url);
i_free(conn->url);
i_free(conn);
}
void solr_connection_http_escape(struct solr_connection *conn, string_t *dest,
const char *str)
{
char *encoded;
encoded = curl_easy_escape(conn->curl, str, 0);
str_append(dest, encoded);
curl_free(encoded);
}
static const char *attrs_get_name(const char **attrs)
{
for (; *attrs != NULL; attrs += 2) {
if (strcmp(attrs[0], "name") == 0)
return attrs[1];
}
return "";
}
static void
solr_lookup_xml_start(void *context, const char *name, const char **attrs)
{
struct solr_lookup_xml_context *ctx = context;
const char *name_attr;
i_assert(ctx->depth >= (int)ctx->state);
ctx->depth++;
if (ctx->depth - 1 > (int)ctx->state) {
/* skipping over unwanted elements */
return;
}
/* response -> result -> doc */
switch (ctx->state) {
case SOLR_XML_RESPONSE_STATE_ROOT:
if (strcmp(name, "response") == 0)
ctx->state++;
break;
case SOLR_XML_RESPONSE_STATE_RESPONSE:
if (strcmp(name, "result") == 0)
ctx->state++;
break;
case SOLR_XML_RESPONSE_STATE_RESULT:
if (strcmp(name, "doc") == 0) {
ctx->state++;
ctx->uid = 0;
ctx->score = 0;
i_free_and_null(ctx->mailbox);
i_free_and_null(ctx->ns);
ctx->uidvalidity = 0;
}
break;
case SOLR_XML_RESPONSE_STATE_DOC:
name_attr = attrs_get_name(attrs);
if (strcmp(name_attr, "uid") == 0)
ctx->content_state = SOLR_XML_CONTENT_STATE_UID;
else if (strcmp(name_attr, "score") == 0)
ctx->content_state = SOLR_XML_CONTENT_STATE_SCORE;
else if (strcmp(name_attr, "box") == 0)
ctx->content_state = SOLR_XML_CONTENT_STATE_MAILBOX;
else if (strcmp(name_attr, "ns") == 0)
ctx->content_state = SOLR_XML_CONTENT_STATE_NAMESPACE;
else if (strcmp(name_attr, "uidv") == 0)
ctx->content_state = SOLR_XML_CONTENT_STATE_UIDVALIDITY;
else
break;
ctx->state++;
break;
case SOLR_XML_RESPONSE_STATE_CONTENT:
break;
}
}
static struct solr_result *
solr_result_get(struct solr_lookup_xml_context *ctx, const char *box_id)
{
struct solr_result *result;
char *box_id_dup;
result = hash_table_lookup(ctx->mailboxes, box_id);
if (result != NULL)
return result;
box_id_dup = p_strdup(ctx->result_pool, box_id);
result = p_new(ctx->result_pool, struct solr_result, 1);
result->box_id = box_id_dup;
p_array_init(&result->uids, ctx->result_pool, 32);
p_array_init(&result->scores, ctx->result_pool, 32);
hash_table_insert(ctx->mailboxes, box_id_dup, result);
array_append(&ctx->results, &result, 1);
return result;
}
static void solr_lookup_add_doc(struct solr_lookup_xml_context *ctx)
{
struct fts_score_map *score;
struct solr_result *result;
const char *box_id;
if (ctx->uid == 0) {
i_error("fts_solr: Query didn't return uid");
return;
}
if (ctx->mailbox == NULL) {
/* looking up from a single mailbox only */
box_id = "";
} else if (ctx->uidvalidity != 0) {
/* old style lookup */
string_t *str = t_str_new(64);
str_printfa(str, "%u\001", ctx->uidvalidity);
str_append(str, ctx->mailbox);
if (ctx->ns != NULL)
str_printfa(str, "\001%s", ctx->ns);
box_id = str_c(str);
} else {
/* new style lookup */
box_id = ctx->mailbox;
}
result = solr_result_get(ctx, box_id);
seq_range_array_add(&result->uids, 0, ctx->uid);
if (ctx->score != 0) {
score = array_append_space(&result->scores);
score->uid = ctx->uid;
score->score = ctx->score;
}
}
static void solr_lookup_xml_end(void *context, const char *name ATTR_UNUSED)
{
struct solr_lookup_xml_context *ctx = context;
i_assert(ctx->depth >= (int)ctx->state);
if (ctx->state == SOLR_XML_RESPONSE_STATE_CONTENT &&
ctx->content_state == SOLR_XML_CONTENT_STATE_MAILBOX &&
ctx->mailbox == NULL) {
/* mailbox is namespace prefix */
ctx->mailbox = i_strdup("");
}
if (ctx->depth == (int)ctx->state) {
if (ctx->state == SOLR_XML_RESPONSE_STATE_DOC) {
T_BEGIN {
solr_lookup_add_doc(ctx);
} T_END;
}
ctx->state--;
ctx->content_state = SOLR_XML_CONTENT_STATE_NONE;
}
ctx->depth--;
}
static int uint32_parse(const char *str, int len, uint32_t *value_r)
{
uint32_t value = 0;
int i;
for (i = 0; i < len; i++) {
if (str[i] < '0' || str[i] > '9')
break;
value = value*10 + str[i]-'0';
}
if (i != len)
return -1;
*value_r = value;
return 0;
}
static void solr_lookup_xml_data(void *context, const char *str, int len)
{
struct solr_lookup_xml_context *ctx = context;
char *new_name;
switch (ctx->content_state) {
case SOLR_XML_CONTENT_STATE_NONE:
break;
case SOLR_XML_CONTENT_STATE_UID:
if (uint32_parse(str, len, &ctx->uid) < 0)
i_error("fts_solr: received invalid uid");
break;
case SOLR_XML_CONTENT_STATE_SCORE:
T_BEGIN {
ctx->score = strtod(t_strndup(str, len), NULL);
} T_END;
break;
case SOLR_XML_CONTENT_STATE_MAILBOX:
/* this may be called multiple times, for example if input
contains '&' characters */
new_name = ctx->mailbox == NULL ? i_strndup(str, len) :
i_strconcat(ctx->mailbox, t_strndup(str, len), NULL);
i_free(ctx->mailbox);
ctx->mailbox = new_name;
break;
case SOLR_XML_CONTENT_STATE_NAMESPACE:
new_name = ctx->ns == NULL ? i_strndup(str, len) :
i_strconcat(ctx->ns, t_strndup(str, len), NULL);
i_free(ctx->ns);
ctx->ns = new_name;
break;
case SOLR_XML_CONTENT_STATE_UIDVALIDITY:
if (uint32_parse(str, len, &ctx->uidvalidity) < 0)
i_error("fts_solr: received invalid uidvalidity");
break;
}
}
int solr_connection_select(struct solr_connection *conn, const char *query,
pool_t pool, struct solr_result ***box_results_r)
{
struct solr_lookup_xml_context solr_lookup_context;
CURLcode ret;
long httpret;
int parse_ret;
i_assert(!conn->posting);
memset(&solr_lookup_context, 0, sizeof(solr_lookup_context));
solr_lookup_context.result_pool = pool;
solr_lookup_context.mailboxes =
hash_table_create(default_pool, default_pool, 0,
str_hash, (hash_cmp_callback_t *)strcmp);
p_array_init(&solr_lookup_context.results, pool, 32);
i_free_and_null(conn->http_failure);
conn->xml_failed = FALSE;
XML_ParserReset(conn->xml_parser, "UTF-8");
XML_SetElementHandler(conn->xml_parser,
solr_lookup_xml_start, solr_lookup_xml_end);
XML_SetCharacterDataHandler(conn->xml_parser, solr_lookup_xml_data);
XML_SetUserData(conn->xml_parser, &solr_lookup_context);
/* curl v7.16 and older don't strdup() the URL */
i_free(conn->last_sent_url);
conn->last_sent_url = i_strconcat(conn->url, "select?", query, NULL);
curl_easy_setopt(conn->curl, CURLOPT_URL, conn->last_sent_url);
ret = curl_easy_perform(conn->curl);
if (ret != 0) {
i_error("fts_solr: HTTP GET failed: %s",
conn->curl_errorbuf);
return -1;
}
curl_easy_getinfo(conn->curl, CURLINFO_RESPONSE_CODE, &httpret);
if (httpret != 200) {
i_error("fts_solr: Lookup failed: %s", conn->http_failure);
return -1;
}
parse_ret = solr_xml_parse(conn, NULL, 0, TRUE);
hash_table_destroy(&solr_lookup_context.mailboxes);
(void)array_append_space(&solr_lookup_context.results);
*box_results_r = array_idx_modifiable(&solr_lookup_context.results, 0);
return parse_ret;
}
struct solr_connection_post *
solr_connection_post_begin(struct solr_connection *conn)
{
struct solr_connection_post *post;
CURLMcode merr;
post = i_new(struct solr_connection_post, 1);
post->conn = conn;
i_assert(!conn->posting);
conn->posting = TRUE;
i_free_and_null(conn->http_failure);
curl_easy_setopt(conn->curl, CURLOPT_READDATA, post);
merr = curl_multi_add_handle(conn->curlm, conn->curl);
if (merr != CURLM_OK) {
i_error("fts_solr: curl_multi_add_handle() failed: %s",
curl_multi_strerror(merr));
post->failed = TRUE;
} else {
/* curl v7.16 and older don't strdup() the URL */
post->url = i_strconcat(conn->url, "update", NULL);
curl_easy_setopt(conn->curl, CURLOPT_URL, post->url);
curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER,
conn->headers_post);
curl_easy_setopt(conn->curl, CURLOPT_POST, (long)1);
XML_ParserReset(conn->xml_parser, "UTF-8");
}
return post;
}
void solr_connection_post_more(struct solr_connection_post *post,
const unsigned char *data, size_t size)
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
struct timeval timeout_tv;
long timeout;
CURLMsg *msg;
CURLMcode merr;
int ret, handles, maxfd, n;
i_assert(post->conn->posting);
if (post->failed)
return;
post->data = data;
post->size = size;
post->pos = 0;
for (;;) {
merr = curl_multi_perform(post->conn->curlm, &handles);
if (merr == CURLM_CALL_MULTI_PERFORM)
continue;
if (merr != CURLM_OK) {
i_error("fts_solr: curl_multi_perform() failed: %s",
curl_multi_strerror(merr));
break;
}
if ((post->pos == post->size && post->size != 0) ||
(handles == 0 && post->size == 0)) {
/* everything sent successfully */
return;
}
msg = curl_multi_info_read(post->conn->curlm, &n);
if (msg != NULL && msg->msg == CURLMSG_DONE &&
msg->data.result != CURLE_OK) {
i_error("fts_solr: curl post failed: %s",
curl_easy_strerror(msg->data.result));
break;
}
/* everything wasn't sent - wait. just use select,
since libcurl interface is easiest with it. */
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
merr = curl_multi_fdset(post->conn->curlm, &fdread, &fdwrite,
&fdexcep, &maxfd);
if (merr != CURLM_OK) {
i_error("fts_solr: curl_multi_fdset() failed: %s",
curl_multi_strerror(merr));
break;
}
i_assert(maxfd >= 0);
merr = curl_multi_timeout(post->conn->curlm, &timeout);
if (merr != CURLM_OK) {
i_error("fts_solr: curl_multi_timeout() failed: %s",
curl_multi_strerror(merr));
break;
}
if (timeout < 0) {
timeout_tv.tv_sec = 1;
timeout_tv.tv_usec = 0;
} else {
timeout_tv.tv_sec = timeout / 1000;
timeout_tv.tv_usec = (timeout % 1000) * 1000;
}
ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout_tv);
if (ret < 0) {
i_error("fts_solr: select() failed: %m");
break;
}
}
post->failed = TRUE;
}
int solr_connection_post_end(struct solr_connection_post *post)
{
struct solr_connection *conn = post->conn;
long httpret;
int ret = post->failed ? -1 : 0;
i_assert(conn->posting);
solr_connection_post_more(post, NULL, 0);
curl_easy_getinfo(conn->curl, CURLINFO_RESPONSE_CODE, &httpret);
if (httpret != 200 && ret == 0) {
i_error("fts_solr: Indexing failed: %s", conn->http_failure);
ret = -1;
}
curl_easy_setopt(conn->curl, CURLOPT_READDATA, NULL);
curl_easy_setopt(conn->curl, CURLOPT_POST, (long)0);
curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, conn->headers);
(void)curl_multi_remove_handle(conn->curlm, conn->curl);
i_free(post->url);
i_free(post);
conn->posting = FALSE;
return ret;
}
int solr_connection_post(struct solr_connection *conn, const char *cmd)
{
struct solr_connection_post *post;
post = solr_connection_post_begin(conn);
solr_connection_post_more(post, (const unsigned char *)cmd,
strlen(cmd));
return solr_connection_post_end(post);
}