dict-redis.c revision 35f3b7e05afecacd0332c210c6e253911c2813d8
5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2008-2013 Dovecot authors, see the included COPYING redis */
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen#define REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* expecting $-1 / $<size> followed by GET reply */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* expecting +QUEUED */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* expecting +OK reply for DISCARD */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* expecting *<nreplies> */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* expecting EXEC reply */
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen unsigned int port;
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic struct connection_list *redis_connections;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenredis_input_state_add(struct redis_dict *dict, enum redis_input_state state)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_input_state_remove(struct redis_dict *dict)
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic void redis_conn_destroy(struct connection *_conn)
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen struct redis_connection *conn = (struct redis_connection *)_conn;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_wait(struct redis_dict *dict)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen } while (array_count(&dict->input_states) > 0);
531ddc880d8c8d1d3b4b6c5cf8abb1abb4a2e0daTimo Sirainenstatic int redis_input_get(struct redis_connection *conn)
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen const unsigned char *data;
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen /* read the size first */
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) {
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen i_error("redis: Unexpected input (wanted $size): %s",
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen conn->bytes_left += 2; /* include trailing CRLF */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen data = i_stream_get_data(conn->conn.input, &size);
531ddc880d8c8d1d3b4b6c5cf8abb1abb4a2e0daTimo Sirainen /* reply fully read - drop trailing CRLF */
531ddc880d8c8d1d3b4b6c5cf8abb1abb4a2e0daTimo Sirainen str_truncate(conn->last_reply, str_len(conn->last_reply)-2);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic int redis_conn_input_more(struct redis_connection *conn)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen states = array_get(&dict->input_states, &count);
531ddc880d8c8d1d3b4b6c5cf8abb1abb4a2e0daTimo Sirainen i_error("redis: Unexpected input (expected nothing): %s", line);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen if (line[0] != '*' || str_to_uint(line+1, &num_replies) < 0)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen reply = array_idx_modifiable(&dict->replies, 0);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen i_error("redis: EXEC expected %u replies, not %u",
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* success, just ignore the actual reply */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen reply = array_idx_modifiable(&dict->replies, 0);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* if we're running in a dict-ioloop, we're handling a
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen synchronous commit and need to stop now */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen i_error("redis: Unexpected input (state=%d): %s", state, line);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_conn_input(struct connection *_conn)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_connection *conn = (struct redis_connection *)_conn;
531ddc880d8c8d1d3b4b6c5cf8abb1abb4a2e0daTimo Sirainen while ((ret = redis_conn_input_more(conn)) > 0) ;
7db7fbea5d8a07463b625f93d69166d56018dadfTimo Sirainenstatic void redis_conn_connected(struct connection *_conn, bool success)
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen struct redis_connection *conn = (struct redis_connection *)_conn;
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen net_ip2addr(&conn->dict->ip), conn->dict->port);
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic const struct connection_settings redis_conn_set = {
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic const struct connection_vfuncs redis_conn_vfuncs = {
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic const char *redis_escape_username(const char *username)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen const char *p;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen switch (*p) {
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenredis_dict_init(struct dict *driver, const char *uri,
eca38954bcf972618f6b85932a3690acbd2b673aTimo Sirainen const char *base_dir ATTR_UNUSED, struct dict **dict_r,
eca38954bcf972618f6b85932a3690acbd2b673aTimo Sirainen const char **error_r)
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen const char *const *args;
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen dict->timeout_msecs = REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS;
eca38954bcf972618f6b85932a3690acbd2b673aTimo Sirainen *error_r = t_strdup_printf("Invalid port: %s",
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen } else if (strncmp(*args, "prefix=", 7) == 0) {
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen } else if (strncmp(*args, "timeout_msecs=", 14) == 0) {
8f2eb1ee9ec07661bd50275da99b5f351972a49aTimo Sirainen if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) {
eca38954bcf972618f6b85932a3690acbd2b673aTimo Sirainen *error_r = t_strdup_printf("Unknown parameter: %s",
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen connection_init_client_ip(redis_connections, &dict->conn.conn,
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen dict->conn.last_reply = str_new(default_pool, 256);
8f2eb1ee9ec07661bd50275da99b5f351972a49aTimo Sirainen if (strchr(username, DICT_USERNAME_SEPARATOR) == NULL)
8f2eb1ee9ec07661bd50275da99b5f351972a49aTimo Sirainen /* escape the username */
8f2eb1ee9ec07661bd50275da99b5f351972a49aTimo Sirainen dict->username = i_strdup(redis_escape_username(username));
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic void redis_dict_deinit(struct dict *_dict)
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen struct redis_dict *dict = (struct redis_dict *)_dict;
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainenstatic void redis_dict_lookup_timeout(struct redis_dict *dict)
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen i_error("redis: Lookup timed out in %u.%03u secs",
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen dict->timeout_msecs/1000, dict->timeout_msecs%1000);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic const char *
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenredis_dict_get_full_key(struct redis_dict *dict, const char *key)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen if (strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen else if (strncmp(key, DICT_PATH_PRIVATE, strlen(DICT_PATH_PRIVATE)) == 0) {
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen key = t_strdup_printf("%s%c%s", dict->username,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen key = t_strconcat(dict->key_prefix, key, NULL);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenredis_dict_lookup_real(struct redis_dict *dict, pool_t pool,
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen const char *cmd;
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen connection_client_connect(&dict->conn.conn) < 0) {
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen /* wait for connection */
aa8e63009c5aa3866bbf5a3e69a86b1ab480c4ddTimo Sirainen cmd = t_strdup_printf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n",
1d2c463d23f09f15727edae9c78b07ec6a7a27daTimo Sirainen o_stream_nsend_str(dict->conn.conn.output, cmd);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_GET);
97d5d23d41aab07e0e7d98c1f9b834e325d7d1b3Timo Sirainen } while (array_count(&dict->input_states) > 0);
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen /* we failed in some way. make sure we disconnect since the
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen connection state isn't known anymore */
31257b47d47510ceb093a6b218810a1a5b830c55Timo Sirainen *value_r = p_strdup(pool, str_c(dict->conn.last_reply));
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic int redis_dict_lookup(struct dict *_dict, pool_t pool,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen ret = redis_dict_lookup_real(dict, pool, key, value_r);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen ret = redis_dict_lookup_real(dict, pool, key, value_r);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen ctx = i_new(struct redis_dict_transaction_context, 1);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen connection_client_connect(&dict->conn.conn) < 0) {
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* wait for connection */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenredis_transaction_commit(struct dict_transaction_context *_ctx, bool async,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen (struct redis_dict_transaction_context *)_ctx;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen unsigned int i;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* make sure we're disconnected */
1d2c463d23f09f15727edae9c78b07ec6a7a27daTimo Sirainen "*1\r\n$4\r\nEXEC\r\n");
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC_REPLY);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_transaction_rollback(struct dict_transaction_context *_ctx)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen (struct redis_dict_transaction_context *)_ctx;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen /* make sure we're disconnected */
1d2c463d23f09f15727edae9c78b07ec6a7a27daTimo Sirainen "*1\r\n$7\r\nDISCARD\r\n");
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_DISCARD);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic int redis_check_transaction(struct redis_dict_transaction_context *ctx)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict;
6ac32bfcdcd9e96e9b6614914fc4b0a926dcfa69Timo Sirainen /* disconnected during transaction */
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen "*1\r\n$5\r\nMULTI\r\n") < 0) {
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_set(struct dict_transaction_context *_ctx,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen (struct redis_dict_transaction_context *)_ctx;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen const char *cmd;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen cmd = t_strdup_printf("*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen if (o_stream_send_str(dict->conn.conn.output, cmd) < 0)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_unset(struct dict_transaction_context *_ctx,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen const char *key)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen (struct redis_dict_transaction_context *)_ctx;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen const char *cmd;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen cmd = t_strdup_printf("*2\r\n$3\r\nDEL\r\n$%u\r\n%s\r\n",
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen if (o_stream_send_str(dict->conn.conn.output, cmd) < 0)
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainenstatic void redis_append(struct dict_transaction_context *_ctx,
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen (struct redis_dict_transaction_context *)_ctx;
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen const char *cmd;
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen cmd = t_strdup_printf("*3\r\n$6\r\nAPPEND\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen if (o_stream_send_str(dict->conn.conn.output, cmd) < 0)
2aed6c2062317d1750f59c5c88e77d9f10967462Timo Sirainen redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainenstatic void redis_atomic_inc(struct dict_transaction_context *_ctx,
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen (struct redis_dict_transaction_context *)_ctx;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen cmd = t_strdup_printf("*3\r\n$6\r\nINCRBY\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
1c7df2f6d12f5e7f0cf1e73452d4ae216a3d092cTimo Sirainen if (o_stream_send_str(dict->conn.conn.output, cmd) < 0)