driver-cassandra.c revision ba4626cd5be3d225a7a89aa338d92b8fb411fd1c
02c335c23bf5fa225a467c19f2c063fb0dc7b8c3Timo Sirainen/* Copyright (c) 2015-2016 Dovecot authors, see the included COPYING file */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainentypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int id;
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen CassConsistency read_consistency, write_consistency, delete_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen unsigned int connect_timeout_msecs, request_timeout_msecs;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen /* for synchronous queries: */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassConsistency consistency, fallback_consistency;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_db driver_cassandra_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_result driver_cassandra_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic struct {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int consistency_parse(const char *str, CassConsistency *consistency_r)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (strcmp(cass_consistency_names[i].name, str) == 0) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen *consistency_r = cass_consistency_names[i].consistency;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen unsigned int i;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (strcmp(cass_log_level_names[i].name, str) == 0) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen *log_level_r = cass_log_level_names[i].log_level;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* switch back to original ioloop in case the caller wants to
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* running a sync query, stop it */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &message, &size);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: %s: %.*s", str, (int)size, message);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* this isn't the main thread - communicate with main thread by
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen writing the callback id to the pipe */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_callback_run(struct cassandra_callback *cb)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* usually there are only a few callbacks, so don't bother with using
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen a hash table */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_panic("cassandra: Received unknown ID %u", id);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen else if (ret == 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: read(pipe) returned wrong amount of data");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* success */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "IPC pipe closed");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_set_callback(future, driver_cassandra_future_callback, cb);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void connect_callback(CassFuture *future, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if ((rc = cass_future_error_code(future)) != CASS_OK) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen "Couldn't connect to Cassandra");
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* driver_cassandra_sync_init() waiting for connection to
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_connect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_callback(future, db, connect_callback, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_disconnect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen const char *const *args, *key, *value, *error;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen args = t_strsplit_spaces(connect_string, " ");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Missing value in connect string: %s",
e8434aad92ea6ff1c915b708294dbd0c7ff5908dMichael M Slusarz i_fatal("cassandra: Invalid port: %s", value);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "read_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->read_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown read_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "read_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->read_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "write_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->write_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown write_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "write_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->write_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen } else if (strcmp(key, "delete_consistency") == 0) {
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen if (consistency_parse(value, &db->delete_consistency) < 0)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen i_fatal("cassandra: Unknown delete_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "delete_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (log_level_parse(value, &db->log_level) < 0)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen i_fatal("cassandra: Unknown log_level: %s", value);
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen if (str_to_uint(value, &db->protocol_version) < 0)
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen i_fatal("cassandra: Invalid version: %s", value);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid num_threads: %s", value);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen } else if (strcmp(key, "connect_timeout") == 0) {
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen } else if (strcmp(key, "request_timeout") == 0) {
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Unknown connect string: %s", key);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->read_fallback_consistency = db->read_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->write_fallback_consistency = db->write_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->delete_fallback_consistency = db->delete_consistency;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No hosts given in connect string");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No dbname given in connect string");
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainendriver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen cass_session_get_metrics(db->session, &metrics);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen ADD_UINT64(stats, exceeded_pending_requests_water_mark);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen ADD_UINT64(stats, exceeded_write_bytes_water_mark);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainenstatic void driver_cassandra_metrics_write(struct cassandra_db *db)
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen if (write_full(fd, str_data(data), str_len(data)) < 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_parse_connect_string(db, connect_string);
be59f9ae981dbe4bdd264053e9febd4ea5dad75bTimo Sirainen db->timestamp_gen = cass_timestamp_gen_monotonic_new();
f0e416aa42058e7ccc0dc6deec0d4f4a19ee6ebeTimo Sirainen cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_cluster_set_contact_points(db->cluster, db->hosts);
c72cfe4a2bda39fff3b8a8bd64b31a7cc14d7d11Timo Sirainen cass_cluster_set_credentials(db->cluster, db->user, db->password);
e8434aad92ea6ff1c915b708294dbd0c7ff5908dMichael M Slusarz cass_cluster_set_port(db->cluster, db->port);
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_deinit_v(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_result_unlink(struct cassandra_db *db,
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen unsigned int i, count;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_result_free(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_result->db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
37e8420b32a0fa3442c405616980e45beb494104Timo Sirainen i_debug("cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s", result->query,
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen timeval_diff_usecs(&result->finish_time, &result->start_time),
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen timeval_diff_usecs(&now, &result->finish_time),
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen result->error != NULL ? result->error : "success");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen i_assert((result->error != NULL) == (result->iterator == NULL));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->callback(&result->api, result->context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen free_result = db->sync_result != &result->api;
1856c361aad526948d56d8aafd576bca94516b92Timo Sirainen i_assert(!free_result || result->api.refcount > 0);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic void query_resend_with_fallback(struct cassandra_result *result)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen ioloop_time - db->last_fallback_warning[result->query_type];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->error, cassandra_query_type_names[result->query_type],
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_consistency_string(result->fallback_consistency),
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_consistency_string(result->consistency));
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->last_fallback_warning[result->query_type] = ioloop_time;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (db->fallback_failures[result->query_type]++ == 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->first_fallback_sent[result->query_type] = ioloop_timeval;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->consistency = result->fallback_consistency;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void query_callback(CassFuture *future, void *context)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassError error = cass_future_error_code(future);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &errmsg, &errsize);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->error = i_strdup_printf("Query '%s' failed: %.*s",
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency != result->consistency) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen /* retry with fallback consistency */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (result->fallback_consistency != result->consistency) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen /* non-fallback query finished successfully. if there had been
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen any fallbacks, reset them. */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->fallback_failures[result->query_type] = 0;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->result = cass_future_get_result(future);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->iterator = cass_iterator_from_result(result->result);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->statement = cass_statement_new(result->query, 0);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_statement_set_consistency(result->statement, result->consistency);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen future = cass_session_execute(db->session, result->statement);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen driver_cassandra_set_callback(future, db, query_callback, result);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainendriver_cassandra_want_fallback_query(struct cassandra_result *result)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int failure_count = db->fallback_failures[result->query_type];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen tv = db->first_fallback_sent[result->query_type];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic int driver_cassandra_send_query(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->row_pool = pool_alloconly_create("cassandra result", 512);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->read_fallback_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->write_fallback_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->delete_fallback_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (driver_cassandra_want_fallback_query(result))
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->consistency = result->fallback_consistency;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen unsigned int i, count;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen if (driver_cassandra_send_query(results[i]) <= 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void exec_callback(struct sql_result *_result ATTR_UNUSED,
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainendriver_cassandra_query_full(struct sql_db *_db, const char *query,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_query_callback_t *callback, void *context)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
ce74395e2a932342e04fb682395bcce111574969Timo Sirainenstatic void driver_cassandra_exec(struct sql_db *db, const char *query)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_query(struct sql_db *db, const char *query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen sql_query_callback_t *callback, void *context)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_query_s_callback(struct sql_result *result, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_init(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* wait for connecting to finish */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_deinit(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_sync_query(struct cassandra_db *db, const char *query)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_query(&db->api, query, cassandra_query_s_callback, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* we don't end up in cassandra's free function, so sync_result
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen won't be set to NULL if we don't do it here. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_query_s(struct sql_db *_db, const char *query)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, query);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_get_value(struct cassandra_result *result,
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen const unsigned char *output;
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen switch (cass_data_type_type(cass_value_data_type(value))) {
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen rc = cass_value_get_bytes(value, &output, &output_size);
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen result->error = i_strdup_printf("Couldn't get value as %s: %s",
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen output_dup = p_malloc(result->row_pool, output_size + 1);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_result_next_row(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char *str;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen p_array_init(&result->fields, result->row_pool, 8);
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen p_array_init(&result->field_sizes, result->row_pool, 8);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen row = cass_iterator_get_row(result->iterator);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic unsigned int
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_fields_count(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value(struct sql_result *_result,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int idx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char *const *strp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const unsigned char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen const char *const *strp;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen return (const void *)*strp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *const *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_values(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *driver_cassandra_result_get_error(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen return "FIXME";
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_begin(struct sql_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx = i_new(struct cassandra_transaction_context, 1);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* we need to be able to handle multiple open transactions, so at least
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for now just keep them in memory until commit time. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx->query_pool = pool_alloconly_create("cassandra transaction", 1024);
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainendriver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainen struct cassandra_transaction_context *ctx = *_ctx;
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainentransaction_set_failed(struct cassandra_transaction_context *ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainentransaction_commit_callback(struct sql_result *result, void *context)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen struct cassandra_transaction_context *ctx = context;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx->callback(sql_result_get_error(result), ctx->context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_commit_callback_t *callback, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen callback(ctx->failed ? ctx->error : NULL, context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* just a single query, send it */
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen /* multiple queries - we don't actually have a transaction though */
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen callback("Multiple changes in transaction not supported", context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainencommit_multi_fail(struct cassandra_transaction_context *ctx,
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainen "%s (query: %s)", sql_result_get_error(result), query));
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainendriver_cassandra_transaction_commit_multi(struct cassandra_transaction_context *ctx,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)ctx->ctx.db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, "BEGIN");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* send queries */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (query = ctx->ctx.head; query != NULL; query = query->next) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, query->query);
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen *result_r = driver_cassandra_sync_query(db, ctx->failed ?
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainendriver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct sql_transaction_context *_ctx = &ctx->ctx;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct sql_transaction_query *single_query = NULL;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* just a single query, send it */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = sql_query_s(_ctx->db, single_query->query);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* multiple queries, use a transaction */
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen ret = driver_cassandra_transaction_commit_multi(ctx, &result);
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainen transaction_set_failed(ctx, sql_result_get_error(result));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char **error_r)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainenstatic const char *
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainendriver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenconst struct sql_result driver_cassandra_result = {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_result_get_field_value_binary,