driver-cassandra.c revision af619a25952f5ba550800daf69a119247b1fcda3
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina/* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březinatypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březinastatic const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina "recv_err_no_hosts",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_queue_full",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_client_timeout",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_server_timeout",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_server_unavailable",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_other",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
afdc0179af0ad8ddbedd67422193ef02dcd2bf84Lukas Slebodnik unsigned int id;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassConsistency read_consistency, write_consistency, delete_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int connect_timeout_msecs, request_timeout_msecs;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int heartbeat_interval_secs, idle_timeout_secs;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int execution_retry_interval_msecs, execution_retry_times;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* for synchronous queries: */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassConsistency consistency, fallback_consistency;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct timeval page0_start_time, start_time, finish_time;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int row_count, total_row_count, page_num;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned char *value_binary;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct cassandra_sql_prepared_statement *prep;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* NULL, until the prepare is asynchronously finished */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* statements waiting for prepare to finish */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina ARRAY(struct cassandra_sql_statement *) pending_statements;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* an error here will cause the prepare to be retried on the next
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina execution attempt. */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaextern const struct sql_db driver_cassandra_db;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaextern const struct sql_result driver_cassandra_result;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic struct {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic struct {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_prepare_pending(struct cassandra_db *db);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaprepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_result_send_query(struct cassandra_result *result);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_send_queries(struct cassandra_db *db);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void result_finish(struct cassandra_result *result);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic int consistency_parse(const char *str, CassConsistency *consistency_r)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int i;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (strcmp(cass_consistency_names[i].name, str) == 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek *consistency_r = cass_consistency_names[i].consistency;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int i;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (strcmp(cass_log_level_names[i].name, str) == 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek *log_level_r = cass_log_level_names[i].log_level;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* switch back to original ioloop in case the caller wants to
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_sql_prepared_statement *const *prep_stmtp;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_foreach(&db->pending_prepares, prep_stmtp) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek prepare_finish_pending_statements(*prep_stmtp);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* running a sync query, stop it */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cass_future_error_message(future, &message, &size);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_error("cassandra: %s: %.*s", str, (int)size, message);
877b92e80bde510d5cd9f03dbf01e2bcf73ab072Michal Židekstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* this isn't the main thread - communicate with main thread by
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek writing the callback id to the pipe. note that we must not use
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek almost any dovecot functions here because most of them are using
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek data-stack, which isn't thread-safe. especially don't use
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_error() here. */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek "cassandra: write(pipe) failed: %s\n",
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek (void)write_full(STDERR_FILENO, str, strlen(str));
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void cassandra_callback_run(struct cassandra_callback *cb)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* usually there are only a few callbacks, so don't bother with using
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek a hash table */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_panic("cassandra: Received unknown ID %u", id);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_input(struct cassandra_db *db)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek else if (ret == 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("cassandra: read(pipe) returned wrong amount of data");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* success */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekdriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cass_future_set_callback(future, driver_cassandra_future_callback, cb);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void connect_callback(CassFuture *future, void *context)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if ((rc = cass_future_error_code(future)) != CASS_OK) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek "Couldn't connect to Cassandra");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_close(db, "Couldn't connect to Cassandra");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* driver_cassandra_sync_init() waiting for connection to
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic int driver_cassandra_connect(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_set_callback(future, db, connect_callback, db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_disconnect(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic const char *
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinadriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina unsigned int i;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina const char *const *args, *key, *value, *error;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina args = t_strsplit_spaces(connect_string, " ");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Missing value in connect string: %s",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid port: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "read_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->read_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown read_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "read_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->read_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "write_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->write_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown write_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "write_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->write_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "delete_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->delete_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown delete_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "delete_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (log_level_parse(value, &db->log_level) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown log_level: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "debug_queries") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "latency_aware_routing") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->protocol_version) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid version: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid num_threads: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "heartbeat_interval") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid heartbeat_interval '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "idle_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid idle_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "connect_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "request_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "warn_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid warn_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "execution_retry_interval") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "execution_retry_times") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->execution_retry_times) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid execution_retry_times %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: This cassandra version does not support execution_retry_times");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid page_size: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown connect string: %s", key);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->read_fallback_consistency = db->read_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->write_fallback_consistency = db->write_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->delete_fallback_consistency = db->delete_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: No hosts given in connect string");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: No dbname given in connect string");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinadriver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_session_get_metrics(db->session, &metrics);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, exceeded_pending_requests_water_mark);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, exceeded_write_bytes_water_mark);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i],
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_metrics_write(struct cassandra_db *db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("cassandra: Failed to expand metrics_path=%s: %s",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (write_full(fd, str_data(data), str_len(data)) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_parse_connect_string(db, connect_string);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->protocol_version > 0 && db->protocol_version < 4) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina /* binding with column indexes requires v4 */
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->timestamp_gen = cass_timestamp_gen_monotonic_new();
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_contact_points(db->cluster, db->hosts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_credentials(db->cluster, db->user, db->password);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_deinit_v(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(array_count(&db->pending_prepares) == 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_result_unlink(struct cassandra_db *db,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina unsigned int i, count;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (i = 0; i < count; i++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_log_result(struct cassandra_result *result,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "cassandra: Finished %squery '%s' (",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->is_prepared ? "prepared " : "", result->query);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "timestamp=%"PRId64", ", result->timestamp);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "%u pages in total, ", result->page_num);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->page_num > 0 || result->paging_continues)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "page %u, ", result->page_num);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina timeval_diff_usecs(&now, &result->finish_time),
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->error != NULL ? result->error : "success");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (reply_usecs/1000 >= db->warn_timeout_msecs) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_result_free(struct sql_result *_result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_result->db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_result *result = (struct cassandra_result *)_result;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_log_result(result, FALSE, reply_usecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->page_num > 0 && !result->paging_continues) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina /* Multi-page query finishes now. Log a debug/warning summary
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina message about it separate from the per-page messages. */
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina reply_usecs = timeval_diff_usecs(&result->finish_time,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_log_result(result, TRUE, reply_usecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void result_finish(struct cassandra_result *result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert((result->error != NULL) == (result->iterator == NULL));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->callback(&result->api, result->context);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina free_result = db->sync_result != &result->api;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(!free_result || result->api.refcount > 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void query_resend_with_fallback(struct cassandra_result *result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ioloop_time - db->last_fallback_warning[result->query_type];
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->error, cassandra_query_type_names[result->query_type],
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_consistency_string(result->fallback_consistency),
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_consistency_string(result->consistency));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->last_fallback_warning[result->query_type] = ioloop_time;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->consistency = result->fallback_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void counters_inc_error(struct cassandra_db *db, CassError error)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březinastatic void query_callback(CassFuture *future, void *context)
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina CassError error = cass_future_error_code(future);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina cass_future_error_message(future, &errmsg, &errsize);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina /* Timeouts bring uncertainty whether the query succeeded or
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina not. Also _SERVER_UNAVAILABLE could have actually written
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough copies of the data for the query to succeed. */
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina /* unavailable = cassandra server knows that there aren't
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough nodes available. "All hosts in current policy
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina attempted and were either unavailable or failed"
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina no hosts available = The client library couldn't connect to
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough cassanra nodes. Error message is the same as for
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina "unavailable".
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina write timeout = cassandra server couldn't reach all the
if (failure_count == 0)
return FALSE;
int ret;
if (ret < 0)
return ret;
i_unreached();
unsigned int i, count;
for (i = 0; i < count; i++) {
static struct cassandra_result *
bool is_prepared,
return result;
static struct sql_result *
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
return result;
static struct sql_result *
return result;
const unsigned char *output;
void *output_dup;
const char *type;
*len_r = 0;
case CASS_VALUE_TYPE_INT: {
case CASS_VALUE_TYPE_BIGINT: {
return SQL_RESULT_NEXT_MORE;
const char *str;
return ret;
if (async)
i_unreached();
i_unreached();
unsigned int idx)
const char *const *strp;
return *strp;
const char *const *strp;
return (const void *)*strp;
i_unreached();
static struct sql_transaction_context *
const char *error)
const char **error_r)
unsigned int *affected_rows)
static CassError
switch (value_type) {
case CASS_VALUE_TYPE_INT:
case CASS_VALUE_TYPE_BIGINT:
case CASS_VALUE_TYPE_TINY_INT:
const char *errmsg;
i_unreached();
static struct sql_prepared_statement *
const char *query_template)
static struct sql_statement *
static struct sql_statement *
static struct cassandra_sql_arg *
unsigned int column_idx)
return arg;
unsigned int column_idx,
const char *value)
unsigned int column_idx,
static struct sql_result *
unsigned int *affected_rows)
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
void driver_cassandra_deinit(void)