dict-redis.c revision 3fdae9cc0f5d6ce05afdaf1e2ab35fb5dbdf0d17
0N/A/* Copyright (c) 2008-2016 Dovecot authors, see the included COPYING redis */
0N/A
0N/A#include "lib.h"
0N/A#include "array.h"
0N/A#include "str.h"
0N/A#include "istream.h"
0N/A#include "ostream.h"
0N/A#include "connection.h"
0N/A#include "dict-private.h"
0N/A
0N/A#define REDIS_DEFAULT_PORT 6379
0N/A#define REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
0N/A#define DICT_USERNAME_SEPARATOR '/'
0N/A
0N/Aenum redis_input_state {
0N/A /* expecting +OK reply for SELECT */
0N/A REDIS_INPUT_STATE_SELECT,
0N/A /* expecting $-1 / $<size> followed by GET reply */
0N/A REDIS_INPUT_STATE_GET,
0N/A /* expecting +QUEUED */
0N/A REDIS_INPUT_STATE_MULTI,
0N/A /* expecting +OK reply for DISCARD */
0N/A REDIS_INPUT_STATE_DISCARD,
0N/A /* expecting *<nreplies> */
0N/A REDIS_INPUT_STATE_EXEC,
0N/A /* expecting EXEC reply */
0N/A REDIS_INPUT_STATE_EXEC_REPLY
0N/A};
0N/A
0N/Astruct redis_connection {
0N/A struct connection conn;
0N/A struct redis_dict *dict;
0N/A
0N/A string_t *last_reply;
0N/A unsigned int bytes_left;
0N/A bool value_not_found;
0N/A bool value_received;
0N/A};
0N/A
0N/Astruct redis_dict_reply {
0N/A unsigned int reply_count;
0N/A dict_transaction_commit_callback_t *callback;
0N/A void *context;
0N/A};
0N/A
0N/Astruct redis_dict {
0N/A struct dict dict;
0N/A char *username, *key_prefix, *expire_value;
0N/A unsigned int timeout_msecs, db_id;
0N/A
0N/A struct ioloop *ioloop, *prev_ioloop;
0N/A struct redis_connection conn;
0N/A
0N/A ARRAY(enum redis_input_state) input_states;
0N/A ARRAY(struct redis_dict_reply) replies;
0N/A
0N/A bool connected;
0N/A bool transaction_open;
0N/A bool db_id_set;
0N/A};
0N/A
0N/Astruct redis_dict_transaction_context {
0N/A struct dict_transaction_context ctx;
0N/A unsigned int cmd_count;
0N/A char *error;
0N/A};
0N/A
0N/Astatic struct connection_list *redis_connections;
0N/A
0N/Astatic void
0N/Aredis_input_state_add(struct redis_dict *dict, enum redis_input_state state)
0N/A{
0N/A array_append(&dict->input_states, &state, 1);
0N/A}
0N/A
0N/Astatic void redis_input_state_remove(struct redis_dict *dict)
0N/A{
0N/A array_delete(&dict->input_states, 0, 1);
0N/A}
0N/A
0N/Astatic void redis_callback(struct redis_dict *dict,
0N/A const struct redis_dict_reply *reply,
0N/A const struct dict_commit_result *result)
0N/A{
0N/A if (reply->callback != NULL) {
0N/A if (dict->prev_ioloop != NULL) {
0N/A /* Don't let callback see that we've created our
0N/A internal ioloop in case it wants to add some ios
0N/A or timeouts. */
0N/A current_ioloop = dict->prev_ioloop;
0N/A }
0N/A reply->callback(result, reply->context);
0N/A if (dict->prev_ioloop != NULL)
0N/A current_ioloop = dict->ioloop;
0N/A }
0N/A}
0N/A
0N/Astatic void
0N/Aredis_disconnected(struct redis_connection *conn, const char *reason)
0N/A{
0N/A const struct dict_commit_result result = { -1, reason };
0N/A const struct redis_dict_reply *reply;
0N/A
0N/A conn->dict->db_id_set = FALSE;
0N/A conn->dict->connected = FALSE;
0N/A connection_disconnect(&conn->conn);
0N/A
0N/A array_foreach(&conn->dict->replies, reply)
0N/A redis_callback(conn->dict, reply, &result);
0N/A array_clear(&conn->dict->replies);
0N/A array_clear(&conn->dict->input_states);
0N/A
0N/A if (conn->dict->ioloop != NULL)
0N/A io_loop_stop(conn->dict->ioloop);
0N/A}
0N/A
0N/Astatic void redis_conn_destroy(struct connection *_conn)
0N/A{
0N/A struct redis_connection *conn = (struct redis_connection *)_conn;
0N/A
0N/A redis_disconnected(conn, connection_disconnect_reason(_conn));
0N/A}
0N/A
0N/Astatic void redis_dict_wait_timeout(struct redis_dict *dict)
0N/A{
0N/A const char *reason = t_strdup_printf(
0N/A "redis: Commit timed out in %u.%03u secs",
0N/A dict->timeout_msecs/1000, dict->timeout_msecs%1000);
0N/A redis_disconnected(&dict->conn, reason);
0N/A}
0N/A
0N/Astatic void redis_wait(struct redis_dict *dict)
0N/A{
0N/A struct timeout *to;
0N/A
0N/A i_assert(dict->ioloop == NULL);
0N/A
0N/A dict->prev_ioloop = current_ioloop;
0N/A dict->ioloop = io_loop_create();
0N/A to = timeout_add(dict->timeout_msecs, redis_dict_wait_timeout, dict);
0N/A connection_switch_ioloop(&dict->conn.conn);
0N/A
0N/A do {
0N/A io_loop_run(dict->ioloop);
0N/A } while (array_count(&dict->input_states) > 0);
0N/A
0N/A timeout_remove(&to);
io_loop_set_current(dict->prev_ioloop);
connection_switch_ioloop(&dict->conn.conn);
io_loop_set_current(dict->ioloop);
io_loop_destroy(&dict->ioloop);
dict->prev_ioloop = NULL;
}
static int redis_input_get(struct redis_connection *conn, const char **error_r)
{
const unsigned char *data;
size_t size;
const char *line;
if (conn->bytes_left == 0) {
/* read the size first */
line = i_stream_next_line(conn->conn.input);
if (line == NULL)
return 0;
if (strcmp(line, "$-1") == 0) {
conn->value_received = TRUE;
conn->value_not_found = TRUE;
if (conn->dict->ioloop != NULL)
io_loop_stop(conn->dict->ioloop);
redis_input_state_remove(conn->dict);
return 1;
}
if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) {
*error_r = t_strdup_printf(
"redis: Unexpected input (wanted $size): %s", line);
return -1;
}
conn->bytes_left += 2; /* include trailing CRLF */
}
data = i_stream_get_data(conn->conn.input, &size);
if (size > conn->bytes_left)
size = conn->bytes_left;
str_append_n(conn->last_reply, data, size);
conn->bytes_left -= size;
i_stream_skip(conn->conn.input, size);
if (conn->bytes_left > 0)
return 0;
/* reply fully read - drop trailing CRLF */
conn->value_received = TRUE;
str_truncate(conn->last_reply, str_len(conn->last_reply)-2);
if (conn->dict->ioloop != NULL)
io_loop_stop(conn->dict->ioloop);
redis_input_state_remove(conn->dict);
return 1;
}
static int
redis_conn_input_more(struct redis_connection *conn, const char **error_r)
{
struct redis_dict *dict = conn->dict;
struct redis_dict_reply *reply;
const enum redis_input_state *states;
enum redis_input_state state;
unsigned int count, num_replies;
const char *line;
states = array_get(&dict->input_states, &count);
if (count == 0) {
line = i_stream_next_line(conn->conn.input);
if (line == NULL)
return 0;
*error_r = t_strdup_printf(
"redis: Unexpected input (expected nothing): %s", line);
return -1;
}
state = states[0];
if (state == REDIS_INPUT_STATE_GET)
return redis_input_get(conn, error_r);
line = i_stream_next_line(conn->conn.input);
if (line == NULL)
return 0;
redis_input_state_remove(dict);
switch (state) {
case REDIS_INPUT_STATE_GET:
i_unreached();
case REDIS_INPUT_STATE_SELECT:
case REDIS_INPUT_STATE_MULTI:
case REDIS_INPUT_STATE_DISCARD:
if (line[0] != '+')
break;
return 1;
case REDIS_INPUT_STATE_EXEC:
if (line[0] != '*' || str_to_uint(line+1, &num_replies) < 0)
break;
reply = array_idx_modifiable(&dict->replies, 0);
i_assert(reply->reply_count > 0);
if (reply->reply_count != num_replies) {
*error_r = t_strdup_printf(
"redis: EXEC expected %u replies, not %u",
reply->reply_count, num_replies);
return -1;
}
return 1;
case REDIS_INPUT_STATE_EXEC_REPLY:
if (*line != '+' && *line != ':')
break;
/* success, just ignore the actual reply */
reply = array_idx_modifiable(&dict->replies, 0);
i_assert(reply->reply_count > 0);
if (--reply->reply_count == 0) {
const struct dict_commit_result result = { 1, NULL };
redis_callback(dict, reply, &result);
array_delete(&dict->replies, 0, 1);
/* if we're running in a dict-ioloop, we're handling a
synchronous commit and need to stop now */
if (array_count(&dict->replies) == 0 &&
conn->dict->ioloop != NULL)
io_loop_stop(conn->dict->ioloop);
}
return 1;
}
*error_r = t_strdup_printf("redis: Unexpected input (state=%d): %s", state, line);
return -1;
}
static void redis_conn_input(struct connection *_conn)
{
struct redis_connection *conn = (struct redis_connection *)_conn;
const char *error = NULL;
int ret;
switch (i_stream_read(_conn->input)) {
case 0:
return;
case -1:
redis_disconnected(conn, i_stream_get_error(_conn->input));
return;
default:
break;
}
while ((ret = redis_conn_input_more(conn, &error)) > 0) ;
if (ret < 0) {
i_assert(error != NULL);
redis_disconnected(conn, error);
}
}
static void redis_conn_connected(struct connection *_conn, bool success)
{
struct redis_connection *conn = (struct redis_connection *)_conn;
if (!success) {
i_error("redis: connect(%s) failed: %m", _conn->name);
} else {
conn->dict->connected = TRUE;
}
if (conn->dict->ioloop != NULL)
io_loop_stop(conn->dict->ioloop);
}
static const struct connection_settings redis_conn_set = {
.input_max_size = (size_t)-1,
.output_max_size = (size_t)-1,
.client = TRUE
};
static const struct connection_vfuncs redis_conn_vfuncs = {
.destroy = redis_conn_destroy,
.input = redis_conn_input,
.client_connected = redis_conn_connected
};
static const char *redis_escape_username(const char *username)
{
const char *p;
string_t *str = t_str_new(64);
for (p = username; *p != '\0'; p++) {
switch (*p) {
case DICT_USERNAME_SEPARATOR:
str_append(str, "\\-");
break;
case '\\':
str_append(str, "\\\\");
break;
default:
str_append_c(str, *p);
}
}
return str_c(str);
}
static int
redis_dict_init(struct dict *driver, const char *uri,
const struct dict_settings *set,
struct dict **dict_r, const char **error_r)
{
struct redis_dict *dict;
struct ip_addr ip;
unsigned int secs;
in_port_t port = REDIS_DEFAULT_PORT;
const char *const *args, *unix_path = NULL;
int ret = 0;
if (redis_connections == NULL) {
redis_connections =
connection_list_init(&redis_conn_set,
&redis_conn_vfuncs);
}
dict = i_new(struct redis_dict, 1);
if (net_addr2ip("127.0.0.1", &ip) < 0)
i_unreached();
dict->timeout_msecs = REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS;
dict->key_prefix = i_strdup("");
args = t_strsplit(uri, ":");
for (; *args != NULL; args++) {
if (strncmp(*args, "path=", 5) == 0) {
unix_path = *args + 5;
} else if (strncmp(*args, "host=", 5) == 0) {
if (net_addr2ip(*args+5, &ip) < 0) {
*error_r = t_strdup_printf("Invalid IP: %s",
*args+5);
ret = -1;
}
} else if (strncmp(*args, "port=", 5) == 0) {
if (net_str2port(*args+5, &port) < 0) {
*error_r = t_strdup_printf("Invalid port: %s",
*args+5);
ret = -1;
}
} else if (strncmp(*args, "prefix=", 7) == 0) {
i_free(dict->key_prefix);
dict->key_prefix = i_strdup(*args + 7);
} else if (strncmp(*args, "db=", 3) == 0) {
if (str_to_uint(*args+3, &dict->db_id) < 0) {
*error_r = t_strdup_printf(
"Invalid db number: %s", *args+3);
ret = -1;
}
} else if (strncmp(*args, "expire_secs=", 12) == 0) {
const char *value = *args + 12;
if (str_to_uint(value, &secs) < 0 || secs == 0) {
*error_r = t_strdup_printf(
"Invalid expire_secs: %s", value);
ret = -1;
}
i_free(dict->expire_value);
dict->expire_value = i_strdup(value);
} else if (strncmp(*args, "timeout_msecs=", 14) == 0) {
if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) {
*error_r = t_strdup_printf(
"Invalid timeout_msecs: %s", *args+14);
ret = -1;
}
} else {
*error_r = t_strdup_printf("Unknown parameter: %s",
*args);
ret = -1;
}
}
if (ret < 0) {
i_free(dict->key_prefix);
i_free(dict);
return -1;
}
if (unix_path != NULL) {
connection_init_client_unix(redis_connections, &dict->conn.conn,
unix_path);
} else {
connection_init_client_ip(redis_connections, &dict->conn.conn,
&ip, port);
}
dict->dict = *driver;
dict->conn.last_reply = str_new(default_pool, 256);
dict->conn.dict = dict;
i_array_init(&dict->input_states, 4);
i_array_init(&dict->replies, 4);
if (strchr(set->username, DICT_USERNAME_SEPARATOR) == NULL)
dict->username = i_strdup(set->username);
else {
/* escape the username */
dict->username = i_strdup(redis_escape_username(set->username));
}
*dict_r = &dict->dict;
return 0;
}
static void redis_dict_deinit(struct dict *_dict)
{
struct redis_dict *dict = (struct redis_dict *)_dict;
if (array_count(&dict->input_states) > 0) {
i_assert(dict->connected);
redis_wait(dict);
}
connection_deinit(&dict->conn.conn);
str_free(&dict->conn.last_reply);
array_free(&dict->replies);
array_free(&dict->input_states);
i_free(dict->expire_value);
i_free(dict->key_prefix);
i_free(dict->username);
i_free(dict);
if (redis_connections->connections == NULL)
connection_list_deinit(&redis_connections);
}
static void redis_dict_lookup_timeout(struct redis_dict *dict)
{
const char *reason = t_strdup_printf(
"redis: Lookup timed out in %u.%03u secs",
dict->timeout_msecs/1000, dict->timeout_msecs%1000);
redis_disconnected(&dict->conn, reason);
}
static const char *
redis_dict_get_full_key(struct redis_dict *dict, const char *key)
{
if (strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0)
key += strlen(DICT_PATH_SHARED);
else if (strncmp(key, DICT_PATH_PRIVATE, strlen(DICT_PATH_PRIVATE)) == 0) {
key = t_strdup_printf("%s%c%s", dict->username,
DICT_USERNAME_SEPARATOR,
key + strlen(DICT_PATH_PRIVATE));
} else {
i_unreached();
}
if (*dict->key_prefix != '\0')
key = t_strconcat(dict->key_prefix, key, NULL);
return key;
}
static void redis_dict_select_db(struct redis_dict *dict)
{
const char *cmd, *db_str;
if (dict->db_id_set)
return;
dict->db_id_set = TRUE;
if (dict->db_id == 0) {
/* 0 is the default */
return;
}
db_str = dec2str(dict->db_id);
cmd = t_strdup_printf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
(int)strlen(db_str), db_str);
o_stream_nsend_str(dict->conn.conn.output, cmd);
redis_input_state_add(dict, REDIS_INPUT_STATE_SELECT);
}
static int redis_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
const char **value_r, const char **error_r)
{
struct redis_dict *dict = (struct redis_dict *)_dict;
struct timeout *to;
const char *cmd;
key = redis_dict_get_full_key(dict, key);
dict->conn.value_received = FALSE;
dict->conn.value_not_found = FALSE;
i_assert(dict->ioloop == NULL);
dict->prev_ioloop = current_ioloop;
dict->ioloop = io_loop_create();
connection_switch_ioloop(&dict->conn.conn);
if (dict->conn.conn.fd_in == -1 &&
connection_client_connect(&dict->conn.conn) < 0) {
i_error("redis: Couldn't connect to %s", dict->conn.conn.name);
} else {
to = timeout_add(dict->timeout_msecs,
redis_dict_lookup_timeout, dict);
if (!dict->connected) {
/* wait for connection */
io_loop_run(dict->ioloop);
}
if (dict->connected) {
redis_dict_select_db(dict);
cmd = t_strdup_printf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n",
(int)strlen(key), key);
o_stream_nsend_str(dict->conn.conn.output, cmd);
str_truncate(dict->conn.last_reply, 0);
redis_input_state_add(dict, REDIS_INPUT_STATE_GET);
do {
io_loop_run(dict->ioloop);
} while (array_count(&dict->input_states) > 0);
}
timeout_remove(&to);
}
io_loop_set_current(dict->prev_ioloop);
connection_switch_ioloop(&dict->conn.conn);
io_loop_set_current(dict->ioloop);
io_loop_destroy(&dict->ioloop);
dict->prev_ioloop = NULL;
if (!dict->conn.value_received) {
/* we failed in some way. make sure we disconnect since the
connection state isn't known anymore */
*error_r = "redis: Communication failure";
redis_disconnected(&dict->conn, *error_r);
return -1;
}
if (dict->conn.value_not_found)
return 0;
*value_r = p_strdup(pool, str_c(dict->conn.last_reply));
return 1;
}
static struct dict_transaction_context *
redis_transaction_init(struct dict *_dict)
{
struct redis_dict *dict = (struct redis_dict *)_dict;
struct redis_dict_transaction_context *ctx;
i_assert(!dict->transaction_open);
dict->transaction_open = TRUE;
ctx = i_new(struct redis_dict_transaction_context, 1);
ctx->ctx.dict = _dict;
if (dict->conn.conn.fd_in == -1 &&
connection_client_connect(&dict->conn.conn) < 0) {
i_error("redis: Couldn't connect to %s",
dict->conn.conn.name);
} else if (!dict->connected) {
/* wait for connection */
redis_wait(dict);
}
if (dict->connected)
redis_dict_select_db(dict);
return &ctx->ctx;
}
static void
redis_transaction_commit(struct dict_transaction_context *_ctx, bool async,
dict_transaction_commit_callback_t *callback,
void *context)
{
struct redis_dict_transaction_context *ctx =
(struct redis_dict_transaction_context *)_ctx;
struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
struct redis_dict_reply *reply;
unsigned int i;
struct dict_commit_result result = { .ret = 1 };
i_assert(dict->transaction_open);
dict->transaction_open = FALSE;
if (ctx->error != NULL) {
/* make sure we're disconnected */
redis_disconnected(&dict->conn, ctx->error);
result.ret = -1;
result.error = ctx->error;
} else if (_ctx->changed) {
i_assert(ctx->cmd_count > 0);
o_stream_nsend_str(dict->conn.conn.output,
"*1\r\n$4\r\nEXEC\r\n");
reply = array_append_space(&dict->replies);
reply->callback = callback;
reply->context = context;
reply->reply_count = ctx->cmd_count;
redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC);
for (i = 0; i < ctx->cmd_count; i++)
redis_input_state_add(dict, REDIS_INPUT_STATE_EXEC_REPLY);
if (async) {
i_free(ctx);
return;
}
redis_wait(dict);
}
callback(&result, context);
i_free(ctx->error);
i_free(ctx);
}
static void redis_transaction_rollback(struct dict_transaction_context *_ctx)
{
struct redis_dict_transaction_context *ctx =
(struct redis_dict_transaction_context *)_ctx;
struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
struct redis_dict_reply *reply;
i_assert(dict->transaction_open);
dict->transaction_open = FALSE;
if (ctx->error != NULL) {
/* make sure we're disconnected */
redis_disconnected(&dict->conn, ctx->error);
} else if (_ctx->changed) {
o_stream_nsend_str(dict->conn.conn.output,
"*1\r\n$7\r\nDISCARD\r\n");
reply = array_append_space(&dict->replies);
reply->reply_count = 1;
redis_input_state_add(dict, REDIS_INPUT_STATE_DISCARD);
}
i_free(ctx->error);
i_free(ctx);
}
static int redis_check_transaction(struct redis_dict_transaction_context *ctx)
{
struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict;
if (ctx->error != NULL)
return -1;
if (!dict->connected) {
ctx->error = i_strdup("Disconnected during transaction");
return -1;
}
if (ctx->ctx.changed)
return 0;
redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
if (o_stream_send_str(dict->conn.conn.output,
"*1\r\n$5\r\nMULTI\r\n") < 0) {
ctx->error = i_strdup_printf("write() failed: %s",
o_stream_get_error(dict->conn.conn.output));
return -1;
}
return 0;
}
static void
redis_append_expire(struct redis_dict_transaction_context *ctx,
string_t *cmd, const char *key)
{
struct redis_dict *dict = (struct redis_dict *)ctx->ctx.dict;
if (dict->expire_value == NULL)
return;
str_printfa(cmd, "*3\r\n$6\r\nEXPIRE\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
(unsigned int)strlen(key), key,
(unsigned int)strlen(dict->expire_value),
dict->expire_value);
redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
ctx->cmd_count++;
}
static void redis_set(struct dict_transaction_context *_ctx,
const char *key, const char *value)
{
struct redis_dict_transaction_context *ctx =
(struct redis_dict_transaction_context *)_ctx;
struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
string_t *cmd;
if (redis_check_transaction(ctx) < 0)
return;
key = redis_dict_get_full_key(dict, key);
cmd = t_str_new(128);
str_printfa(cmd, "*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
(unsigned int)strlen(key), key,
(unsigned int)strlen(value), value);
redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
ctx->cmd_count++;
redis_append_expire(ctx, cmd, key);
if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) {
ctx->error = i_strdup_printf("write() failed: %s",
o_stream_get_error(dict->conn.conn.output));
}
}
static void redis_unset(struct dict_transaction_context *_ctx,
const char *key)
{
struct redis_dict_transaction_context *ctx =
(struct redis_dict_transaction_context *)_ctx;
struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
const char *cmd;
if (redis_check_transaction(ctx) < 0)
return;
key = redis_dict_get_full_key(dict, key);
cmd = t_strdup_printf("*2\r\n$3\r\nDEL\r\n$%u\r\n%s\r\n",
(unsigned int)strlen(key), key);
if (o_stream_send_str(dict->conn.conn.output, cmd) < 0) {
ctx->error = i_strdup_printf("write() failed: %s",
o_stream_get_error(dict->conn.conn.output));
}
redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
ctx->cmd_count++;
}
static void redis_atomic_inc(struct dict_transaction_context *_ctx,
const char *key, long long diff)
{
struct redis_dict_transaction_context *ctx =
(struct redis_dict_transaction_context *)_ctx;
struct redis_dict *dict = (struct redis_dict *)_ctx->dict;
const char *diffstr;
string_t *cmd;
if (redis_check_transaction(ctx) < 0)
return;
key = redis_dict_get_full_key(dict, key);
diffstr = t_strdup_printf("%lld", diff);
cmd = t_str_new(128);
str_printfa(cmd, "*3\r\n$6\r\nINCRBY\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n",
(unsigned int)strlen(key), key,
(unsigned int)strlen(diffstr), diffstr);
redis_input_state_add(dict, REDIS_INPUT_STATE_MULTI);
ctx->cmd_count++;
redis_append_expire(ctx, cmd, key);
if (o_stream_send(dict->conn.conn.output, str_data(cmd), str_len(cmd)) < 0) {
ctx->error = i_strdup_printf("write() failed: %s",
o_stream_get_error(dict->conn.conn.output));
}
}
struct dict dict_driver_redis = {
.name = "redis",
{
redis_dict_init,
redis_dict_deinit,
NULL,
redis_dict_lookup,
NULL,
NULL,
NULL,
redis_transaction_init,
redis_transaction_commit,
redis_transaction_rollback,
redis_set,
redis_unset,
redis_atomic_inc,
NULL
}
};