driver-cassandra.c revision 2599a77a28bde0653fa090802424469904d518ee
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose/* Copyright (c) 2015-2017 Dovecot authors, see the included COPYING file */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosetypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int id;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency read_consistency, write_consistency, delete_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int connect_timeout_msecs, request_timeout_msecs;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int heartbeat_interval_secs, idle_timeout_secs;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* for synchronous queries: */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency consistency, fallback_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int row_count;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Boseextern const struct sql_result driver_cassandra_result;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic struct {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *name;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic struct {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *name;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_result_send_query(struct cassandra_result *result);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_send_queries(struct cassandra_db *db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void result_finish(struct cassandra_result *result);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int consistency_parse(const char *str, CassConsistency *consistency_r)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (strcmp(cass_consistency_names[i].name, str) == 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose *consistency_r = cass_consistency_names[i].consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (strcmp(cass_log_level_names[i].name, str) == 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose *log_level_r = cass_log_level_names[i].log_level;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* switch back to original ioloop in case the caller wants to
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* running a sync query, stop it */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_log_error(CassFuture *future, const char *str)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cass_future_error_message(future, &message, &size);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: %s: %.*s", str, (int)size, message);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* this isn't the main thread - communicate with main thread by
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose writing the callback id to the pipe */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void cassandra_callback_run(struct cassandra_callback *cb)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* usually there are only a few callbacks, so don't bother with using
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose a hash table */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_panic("cassandra: Received unknown ID %u", id);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_input(struct cassandra_db *db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose else if (ret == 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: read(pipe) returned wrong amount of data");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* success */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosedriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cass_future_set_callback(future, driver_cassandra_future_callback, cb);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void connect_callback(CassFuture *future, void *context)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if ((rc = cass_future_error_code(future)) != CASS_OK) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose "Couldn't connect to Cassandra");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_close(db, "Couldn't connect to Cassandra");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* driver_cassandra_sync_init() waiting for connection to
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int driver_cassandra_connect(struct sql_db *_db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db = (struct cassandra_db *)_db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_callback(future, db, connect_callback, db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_disconnect(struct sql_db *_db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db = (struct cassandra_db *)_db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic const char *
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosedriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *string)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
*args);
if (!read_fallback_set)
if (!write_fallback_set)
if (!delete_fallback_set)
const char *error;
int fd;
T_BEGIN {
} T_END;
unsigned int i, count;
for (i = 0; i < count; i++) {
i_unreached();
const char *str;
long long reply_usecs;
T_BEGIN {
} T_END;
if (free_result)
const char *errmsg;
int msecs;
if (failure_count == 0)
return FALSE;
int ret;
if (ret < 0)
return ret;
unsigned int i, count;
for (i = 0; i < count; i++) {
static struct sql_result *
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
return result;
static struct sql_result *
return result;
const unsigned char *output;
void *output_dup;
const char *type;
case CASS_VALUE_TYPE_INT: {
case CASS_VALUE_TYPE_BIGINT: {
const char *str;
return ret;
i_unreached();
i_unreached();
unsigned int idx)
const char *const *strp;
return *strp;
const char *const *strp;
return (const void *)*strp;
i_unreached();
static struct sql_transaction_context *
const char *error)
int ret = 0;
return ret;
int ret = 0;
const char **error_r)
unsigned int *affected_rows)
.flags = 0,
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
void driver_cassandra_deinit(void)