driver-cassandra.c revision ef597c4619eb021563f659b886c67762fce7a817
5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2015-2017 Dovecot authors, see the included COPYING file */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainentypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
9eef11df882f9c14d164f42cb438f32fe724041cTimo Sirainen "recv_err_no_hosts",
9eef11df882f9c14d164f42cb438f32fe724041cTimo Sirainen "recv_err_queue_full",
9eef11df882f9c14d164f42cb438f32fe724041cTimo Sirainen "recv_err_client_timeout",
9eef11df882f9c14d164f42cb438f32fe724041cTimo Sirainen "recv_err_server_timeout",
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainen "recv_err_server_unavailable",
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainen "recv_err_other",
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int id;
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainen CassConsistency read_consistency, write_consistency, delete_consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
4ee00532a265bdfb38539d811fcd12d51210ac35Timo Sirainen unsigned int connect_timeout_msecs, request_timeout_msecs;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int heartbeat_interval_secs, idle_timeout_secs;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int execution_retry_interval_msecs, execution_retry_times;
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainen /* for synchronous queries: */
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainen CassConsistency consistency, fallback_consistency;
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainen struct timeval page0_start_time, start_time, finish_time;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int row_count, total_row_count, page_num;
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainenextern const struct sql_db driver_cassandra_db;
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainenextern const struct sql_result driver_cassandra_result;
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainenstatic struct {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic struct {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result);
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void result_finish(struct cassandra_result *result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic int consistency_parse(const char *str, CassConsistency *consistency_r)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i;
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (strcmp(cass_consistency_names[i].name, str) == 0) {
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainen *consistency_r = cass_consistency_names[i].consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (strcmp(cass_log_level_names[i].name, str) == 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen *log_level_r = cass_log_level_names[i].log_level;
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainenstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen /* switch back to original ioloop in case the caller wants to
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainenstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen /* running a sync query, stop it */
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainenstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen cass_future_error_message(future, &message, &size);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen i_error("cassandra: %s: %.*s", str, (int)size, message);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainenstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen /* this isn't the main thread - communicate with main thread by
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen writing the callback id to the pipe. note that we must not use
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen almost any dovecot functions here because most of them are using
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen data-stack, which isn't thread-safe. especially don't use
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen i_error() here. */
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen "cassandra: write(pipe) failed: %s\n",
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen (void)write_full(STDERR_FILENO, str, strlen(str));
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainenstatic void cassandra_callback_run(struct cassandra_callback *cb)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainenstatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen /* usually there are only a few callbacks, so don't bother with using
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen a hash table */
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen i_panic("cassandra: Received unknown ID %u", id);
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainenstatic void driver_cassandra_input(struct cassandra_db *db)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen else if (ret == 0)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen i_error("cassandra: read(pipe) returned wrong amount of data");
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen /* success */
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen driver_cassandra_close(db, "IPC pipe closed");
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainendriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen cass_future_set_callback(future, driver_cassandra_future_callback, cb);
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainenstatic void connect_callback(CassFuture *future, void *context)
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen if ((rc = cass_future_error_code(future)) != CASS_OK) {
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen "Couldn't connect to Cassandra");
ab3c52cff40218f248fac2bd7c93125cc2ae4c9dTimo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen /* driver_cassandra_sync_init() waiting for connection to
719123a3ec5aeb45ef1c50c265039666c71981d7Timo Sirainenstatic int driver_cassandra_connect(struct sql_db *_db)
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen driver_cassandra_set_callback(future, db, connect_callback, db);
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainenstatic void driver_cassandra_disconnect(struct sql_db *_db)
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainenstatic const char *
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainendriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen unsigned int i;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainenstatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen const char *const *args, *key, *value, *error;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
55d33f807765482eb47374aaaced1fe714e0b256Timo Sirainen db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen args = t_strsplit_spaces(connect_string, " ");
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen i_fatal("cassandra: Missing value in connect string: %s",
41e51b972f02e8b16c19fab9160294ea0a07c343Timo Sirainen i_fatal("cassandra: Invalid port: %s", value);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "read_consistency") == 0) {
e34d170f8f0e084bd94bfbc1a7085ece67e508dfTimo Sirainen if (consistency_parse(value, &db->read_consistency) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Unknown read_consistency: %s", value);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen } else if (strcmp(key, "read_fallback_consistency") == 0) {
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen if (consistency_parse(value, &db->read_fallback_consistency) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "write_consistency") == 0) {
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen if (consistency_parse(value, &db->write_consistency) < 0)
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen i_fatal("cassandra: Unknown write_consistency: %s", value);
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen } else if (strcmp(key, "write_fallback_consistency") == 0) {
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen if (consistency_parse(value, &db->write_fallback_consistency) < 0)
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen } else if (strcmp(key, "delete_consistency") == 0) {
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen if (consistency_parse(value, &db->delete_consistency) < 0)
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen i_fatal("cassandra: Unknown delete_consistency: %s", value);
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen } else if (strcmp(key, "delete_fallback_consistency") == 0) {
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen if (log_level_parse(value, &db->log_level) < 0)
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen i_fatal("cassandra: Unknown log_level: %s", value);
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen } else if (strcmp(key, "debug_queries") == 0) {
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen } else if (strcmp(key, "latency_aware_routing") == 0) {
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen if (str_to_uint(value, &db->protocol_version) < 0)
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen i_fatal("cassandra: Invalid version: %s", value);
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen i_fatal("cassandra: Invalid num_threads: %s", value);
03936179f87aebde358dbe1ca8c34e5b5551db45Timo Sirainen } else if (strcmp(key, "heartbeat_interval") == 0) {
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen i_fatal("cassandra: Invalid heartbeat_interval '%s': %s", value, error);
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen } else if (strcmp(key, "idle_timeout") == 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen i_fatal("cassandra: Invalid idle_timeout '%s': %s", value, error);
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen } else if (strcmp(key, "connect_timeout") == 0) {
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "request_timeout") == 0) {
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "warn_timeout") == 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid warn_timeout '%s': %s", value, error);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "execution_retry_interval") == 0) {
86bde2c1838d1ce967fa2b394bb952004a4adcb7Timo Sirainen if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen } else if (strcmp(key, "execution_retry_times") == 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (str_to_uint(value, &db->execution_retry_times) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid execution_retry_times %s", value);
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen i_fatal("cassandra: This cassandra version does not support execution_retry_times");
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Invalid page_size: %s", value);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_fatal("cassandra: Unknown connect string: %s", key);
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen db->read_fallback_consistency = db->read_consistency;
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen db->write_fallback_consistency = db->write_consistency;
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen db->delete_fallback_consistency = db->delete_consistency;
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen i_fatal("cassandra: No hosts given in connect string");
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen i_fatal("cassandra: No dbname given in connect string");
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
d52f5dcb05092e126058874772f2c367499e650aTimo Sirainen cass_session_get_metrics(db->session, &metrics);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen ADD_UINT64(stats, exceeded_pending_requests_water_mark);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen ADD_UINT64(stats, exceeded_write_bytes_water_mark);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen str_printfa(dest, "\"%s\": %llu,", counter_names[i],
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_metrics_write(struct cassandra_db *db)
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_error("cassandra: Failed to expand metrics_path=%s: %s",
975a784c2e02ecdcb56efb7a1db5e4769c7756d8Timo Sirainen fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (write_full(fd, str_data(data), str_len(data)) < 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_parse_connect_string(db, connect_string);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->timestamp_gen = cass_timestamp_gen_monotonic_new();
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_contact_points(db->cluster, db->hosts);
8576eb5abb66178f251c00209e564c7673c0e4cfTimo Sirainen cass_cluster_set_credentials(db->cluster, db->user, db->password);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
8576eb5abb66178f251c00209e564c7673c0e4cfTimo Sirainen cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
3ea86ed7cf06ba04e4aa6cd1c4df9be336c06cd3Timo Sirainen cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
471a6b2b4e64eca5d5779ae20a477312b32c89eeTimo Sirainen db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainenstatic void driver_cassandra_deinit_v(struct sql_db *_db)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_result_unlink(struct cassandra_db *db,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i, count;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for (i = 0; i < count; i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_log_result(struct cassandra_result *result,
a10ed8c47534b4c6b6bf2711ccfe577e720a47b4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen str_printfa(str, "cassandra: Finished query '%s' (", result->query);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen str_printfa(str, "%u pages in total, ", result->page_num);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen if (result->page_num > 0 || result->paging_continues)
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen str_printfa(str, "page %u, ", result->page_num);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen timeval_diff_usecs(&now, &result->finish_time),
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen result->error != NULL ? result->error : "success");
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen if (reply_usecs/1000 >= db->warn_timeout_msecs) {
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainenstatic void driver_cassandra_result_free(struct sql_result *_result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_result->db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_log_result(result, FALSE, reply_usecs);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (result->page_num > 0 && !result->paging_continues) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* Multi-page query finishes now. Log a debug/warning summary
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen message about it separate from the per-page messages. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen reply_usecs = timeval_diff_usecs(&result->finish_time,
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen driver_cassandra_log_result(result, TRUE, reply_usecs);
34b8b14ce06c0939932b60f22f61aea124198438Timo Sirainenstatic void result_finish(struct cassandra_result *result)
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
cd70f7aec3bf49147fa80b77dd7ede7d7697202eTimo Sirainen i_assert((result->error != NULL) == (result->iterator == NULL));
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->callback(&result->api, result->context);
5446576156fbe26e07a5cb964a900281d283f387Timo Sirainen free_result = db->sync_result != &result->api;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_assert(!free_result || result->api.refcount > 0);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void query_resend_with_fallback(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen ioloop_time - db->last_fallback_warning[result->query_type];
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->error, cassandra_query_type_names[result->query_type],
2e86ce0239b4bc88c94d7edb5813d38ab080627eTimo Sirainen cass_consistency_string(result->fallback_consistency),
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_consistency_string(result->consistency));
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen db->last_fallback_warning[result->query_type] = ioloop_time;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->consistency = result->fallback_consistency;
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainenstatic void counters_inc_error(struct cassandra_db *db, CassError error)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
cf848255bf65a5e2382c59c093da72f877f7535aTimo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
cf848255bf65a5e2382c59c093da72f877f7535aTimo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainenstatic void query_callback(CassFuture *future, void *context)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen CassError error = cass_future_error_code(future);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_future_error_message(future, &errmsg, &errsize);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* Timeouts bring uncertainty whether the query succeeded or
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen not. Also _SERVER_UNAVAILABLE could have actually written
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen enough copies of the data for the query to succeed. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* unavailable = cassandra server knows that there aren't
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen enough nodes available. "All hosts in current policy
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen attempted and were either unavailable or failed"
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen no hosts available = The client library couldn't connect to
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen enough cassanra nodes. Error message is the same as for
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen "unavailable".
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen write timeout = cassandra server couldn't reach all the
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen needed nodes. this may be because it hasn't yet detected
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen that the servers are down, or because the servers are just
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen too busy. we'll try the fallback consistency to avoid
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unnecessary temporary errors. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->fallback_consistency != result->consistency) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* retry with fallback consistency */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (result->fallback_consistency != result->consistency) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* non-fallback query finished successfully. if there had been
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen any fallbacks, reset them. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->fallback_failures[result->query_type] = 0;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->result = cass_future_get_result(future);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->iterator = cass_iterator_from_result(result->result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_init_statement(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* continuing a paged result */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->statement = cass_statement_new(result->query, 0);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_statement_set_consistency(result->statement, result->consistency);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_statement_set_is_idempotent(result->statement, cass_true);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_statement_set_paging_size(result->statement, db->page_size);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen future = cass_session_execute(db->session, result->statement);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_set_callback(future, db, query_callback, result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_want_fallback_query(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int failure_count = db->fallback_failures[result->query_type];
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* double the retries every time. */
252f5c6a63878e7a8a7ffb5847eecbad7f8737e8Timo Sirainen if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
252f5c6a63878e7a8a7ffb5847eecbad7f8737e8Timo Sirainen /* If last primary query sent timestamp + msecs is older than current
252f5c6a63878e7a8a7ffb5847eecbad7f8737e8Timo Sirainen time, we need to retry the primary query. Note that this practically
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen prevents multiple primary queries from being attempted
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen simultaneously, because the caller updates primary_query_last_sent
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen immediately when returning.
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen The only time when multiple primary queries can be running in
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen parallel is when the earlier query is being slow and hasn't finished
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen early enough. This could even be a wanted feature, since while the
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen first query might have to wait for a timeout, Cassandra could have
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen been fixed in the meantime and the second query finishes
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen successfully. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen tv = db->primary_query_last_sent[result->query_type];
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic int driver_cassandra_send_query(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->row_pool = pool_alloconly_create("cassandra result", 512);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->fallback_consistency = db->read_fallback_consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* consistency is already set and we don't want to fallback
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen at this point anymore. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->fallback_consistency = result->consistency;
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen result->fallback_consistency = db->write_fallback_consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->fallback_consistency = db->delete_fallback_consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (driver_cassandra_want_fallback_query(result))
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->consistency = result->fallback_consistency;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen db->primary_query_last_sent[result->query_type] = ioloop_timeval;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i, count;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for (i = 0; i < count; i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (driver_cassandra_send_query(results[i]) <= 0)
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainenstatic void exec_callback(struct sql_result *_result ATTR_UNUSED,
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainendriver_cassandra_query_init(struct cassandra_db *db, const char *query,
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainen sql_query_callback_t *callback, void *context)
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainendriver_cassandra_query_full(struct sql_db *_db, const char *query,
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainen sql_query_callback_t *callback, void *context)
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
ccd7b4e0a5f09058a59cc4b3f878254e93e7cb0aTimo Sirainen result = driver_cassandra_query_init(db, query, query_type,
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainenstatic void driver_cassandra_exec(struct sql_db *db, const char *query)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic void driver_cassandra_query(struct sql_db *db, const char *query,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen sql_query_callback_t *callback, void *context)
53c384be5f8f27945fd61b8a0d731a93a261628fTimo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
5446576156fbe26e07a5cb964a900281d283f387Timo Sirainenstatic void cassandra_query_s_callback(struct sql_result *result, void *context)
cd70f7aec3bf49147fa80b77dd7ede7d7697202eTimo Sirainenstatic void driver_cassandra_sync_init(struct cassandra_db *db)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
efeb13303798b47d2c4295468d233c1bcfd79c94Timo Sirainen /* wait for connecting to finish */
a75d470c9223a75801418fcdda258885c36317e0Timo Sirainenstatic void driver_cassandra_sync_deinit(struct cassandra_db *db)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic struct sql_result *
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_sync_query(struct cassandra_db *db, const char *query)
6abf66a3731d52889517bd644595c540e3a9b3ecTimo Sirainen driver_cassandra_query(&db->api, query, cassandra_query_s_callback, db);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* we don't end up in cassandra's free function, so sync_result
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen won't be set to NULL if we don't do it here. */
53c384be5f8f27945fd61b8a0d731a93a261628fTimo Sirainenstatic struct sql_result *
53c384be5f8f27945fd61b8a0d731a93a261628fTimo Sirainendriver_cassandra_query_s(struct sql_db *_db, const char *query)
53c384be5f8f27945fd61b8a0d731a93a261628fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
53c384be5f8f27945fd61b8a0d731a93a261628fTimo Sirainen result = driver_cassandra_sync_query(db, query);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_get_value(struct cassandra_result *result,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen const unsigned char *output;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen switch (cass_data_type_type(cass_value_data_type(value))) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen const char *str = t_strdup_printf("%lld", (long long)num);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen rc = cass_value_get_bytes(value, &output, &output_size);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->error = i_strdup_printf("Couldn't get value as %s: %s",
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen output_dup = p_malloc(result->row_pool, output_size + 1);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic int driver_cassandra_result_next_page(struct cassandra_result *result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* no paging */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (cass_result_has_more_pages(result->result) == cass_false)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* callers that don't support sql_query_more() will still get a useful
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen error message. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result->error = i_strdup("Paged query has more results, but not supported by the caller");
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic int driver_cassandra_result_next_row(struct sql_result *_result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen const char *str;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen unsigned int i;
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainen if (cass_iterator_next(result->iterator) == 0)
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainen return driver_cassandra_result_next_page(result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen p_array_init(&result->fields, result->row_pool, 8);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen p_array_init(&result->field_sizes, result->row_pool, 8);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen row = cass_iterator_get_row(result->iterator);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_result_more(struct sql_result **_result, bool async,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen sql_query_callback_t *callback, void *context)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* Initialize the next page as a new sql_result */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen new_result = driver_cassandra_query_init(db, old_result->query,
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen /* Preserve the statement and update its paging state */
9bbfe7f5ff821cac11d1d2550a91b148f389d82cTimo Sirainen new_result->statement = old_result->statement;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen cass_statement_set_paging_state(new_result->statement,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* The caller did support paging. Clear out the "...not supported by
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen the caller" error text, so it won't be in the debug log output. */
173fc9ed37637e4609b1ecc9415a9b92067eeeb2Timo Sirainen new_result->page_num = old_result->page_num + 1;
173fc9ed37637e4609b1ecc9415a9b92067eeeb2Timo Sirainen new_result->page0_start_time = old_result->page0_start_time;
173fc9ed37637e4609b1ecc9415a9b92067eeeb2Timo Sirainen new_result->total_row_count = old_result->total_row_count;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen (void)driver_cassandra_send_query(new_result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen (void)driver_cassandra_send_query(new_result);
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainenstatic unsigned int
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainendriver_cassandra_result_get_fields_count(struct sql_result *_result)
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainenstatic const char *
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainendriver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
c3aebe3bd173bb339947d4fb6fa4f0e090c38e69Timo Sirainendriver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
d097ad375f7fc532ab5cb91020c206c0def55179Timo Sirainenstatic const char *
d097ad375f7fc532ab5cb91020c206c0def55179Timo Sirainendriver_cassandra_result_get_field_value(struct sql_result *_result,
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainen unsigned int idx)
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainen const char *const *strp;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic const unsigned char *
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
a5f297ac8cb8fb168edabf9fe93a7061539a0afaTimo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
a5f297ac8cb8fb168edabf9fe93a7061539a0afaTimo Sirainen const char *const *strp;
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainen return (const void *)*strp;
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainenstatic const char *
f3053c3637f64633c8ef642e1b9b689b333ebf73Timo Sirainendriver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic const char *const *
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_result_get_values(struct sql_result *_result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainenstatic const char *driver_cassandra_result_get_error(struct sql_result *_result)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen return "FIXME";
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_transaction_begin(struct sql_db *db)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen ctx = i_new(struct cassandra_transaction_context, 1);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* we need to be able to handle multiple open transactions, so at least
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen for now just keep them in memory until commit time. */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen ctx->query_pool = pool_alloconly_create("cassandra transaction", 1024);
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainendriver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
31d32d39dd09be0625a6d92ee715155f5d679515Timo Sirainen struct cassandra_transaction_context *ctx = *_ctx;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainentransaction_set_failed(struct cassandra_transaction_context *ctx,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainentransaction_commit_callback(struct sql_result *result, void *context)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen struct cassandra_transaction_context *ctx = context;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen commit_result.error = sql_result_get_error(result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen commit_result.error_type = sql_result_get_error_type(result);
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen sql_commit_callback_t *callback, void *context)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* just a single query, send it */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen /* multiple queries - we don't actually have a transaction though */
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result.error = "Multiple changes in transaction not supported";
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainencommit_multi_fail(struct cassandra_transaction_context *ctx,
c45baa023828a14f440eb184c24d186ffe8666edTimo Sirainen "%s (query: %s)", sql_result_get_error(result), query));
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainendriver_cassandra_transaction_commit_multi(struct cassandra_transaction_context *ctx,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)ctx->ctx.db;
7bd5b1c64cc987715bdaf8cc4907c3c37d5d7b29Timo Sirainen result = driver_cassandra_sync_query(db, "BEGIN");
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen /* send queries */
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen for (query = ctx->ctx.head; query != NULL; query = query->next) {
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen result = driver_cassandra_sync_query(db, query->query);
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen *result_r = driver_cassandra_sync_query(db, ctx->failed ?
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainendriver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen struct sql_transaction_context *_ctx = &ctx->ctx;
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen struct sql_transaction_query *single_query = NULL;
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen /* just a single query, send it */
8b3a4836da0b032673918941cb49c956d3b89b25Timo Sirainen result = sql_query_s(_ctx->db, single_query->query);
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainen /* multiple queries, use a transaction */
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainen ret = driver_cassandra_transaction_commit_multi(ctx, &result);
ec0cc8fa647794e44a1afaa448f495a713048dc4Timo Sirainen transaction_set_failed(ctx, sql_result_get_error(result));
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainendriver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen const char **error_r)
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainendriver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
add74aa767e4b6aaa08e3a389022883f0dd3f43dTimo Sirainendriver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
add74aa767e4b6aaa08e3a389022883f0dd3f43dTimo Sirainen sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
add74aa767e4b6aaa08e3a389022883f0dd3f43dTimo Sirainenstatic const char *
add74aa767e4b6aaa08e3a389022883f0dd3f43dTimo Sirainendriver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen .escape_string = driver_cassandra_escape_string,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen .transaction_begin = driver_cassandra_transaction_begin,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen .transaction_commit = driver_cassandra_transaction_commit,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen .transaction_commit_s = driver_cassandra_transaction_commit_s,
d9b9687bf8cae9cfb070b1b7aadefa683220269fTimo Sirainen .transaction_rollback = driver_cassandra_transaction_rollback,
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainenconst struct sql_result driver_cassandra_result = {
0c47c2096714b50880d48d00ce0bf28349eb4aceTimo Sirainen driver_cassandra_result_get_field_value_binary,