bcb4e51a409d94ae670de96afb8483a4f7855294Stephan Bosch/* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainentypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainenstatic const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_no_hosts",
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_queue_full",
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_client_timeout",
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_server_timeout",
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_server_unavailable",
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen "recv_err_other",
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen CassConsistency read_consistency, write_consistency, delete_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen unsigned int connect_timeout_msecs, request_timeout_msecs;
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen unsigned int heartbeat_interval_secs, idle_timeout_secs;
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi unsigned int execution_retry_interval_msecs, execution_retry_times;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen /* for synchronous queries: */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassConsistency consistency, fallback_consistency;
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen struct timeval page0_start_time, start_time, finish_time;
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen unsigned int row_count, total_row_count, page_num;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *prep;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* NULL, until the prepare is asynchronously finished */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* statements waiting for prepare to finish */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen ARRAY(struct cassandra_sql_statement *) pending_statements;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* an error here will cause the prepare to be retried on the next
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen execution attempt. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_db driver_cassandra_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_result driver_cassandra_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic struct {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void driver_cassandra_prepare_pending(struct cassandra_db *db);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenprepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int consistency_parse(const char *str, CassConsistency *consistency_r)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (strcmp(cass_consistency_names[i].name, str) == 0) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen *consistency_r = cass_consistency_names[i].consistency;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen unsigned int i;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (strcmp(cass_log_level_names[i].name, str) == 0) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen *log_level_r = cass_log_level_names[i].log_level;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* switch back to original ioloop in case the caller wants to
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *const *prep_stmtp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen array_foreach(&db->pending_prepares, prep_stmtp) {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen prepare_finish_pending_statements(*prep_stmtp);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* running a sync query, stop it */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &message, &size);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: %s: %.*s", str, (int)size, message);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* this isn't the main thread - communicate with main thread by
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen writing the callback id to the pipe. note that we must not use
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen almost any dovecot functions here because most of them are using
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen data-stack, which isn't thread-safe. especially don't use
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen i_error() here. */
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen "cassandra: write(pipe) failed: %s\n",
7c04ede0da5749691624a1fb962ac29cd0167050Timo Sirainen (void)write_full(STDERR_FILENO, str, strlen(str));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_callback_run(struct cassandra_callback *cb)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* usually there are only a few callbacks, so don't bother with using
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen a hash table */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_panic("cassandra: Received unknown ID %u", id);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen else if (ret == 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: read(pipe) returned wrong amount of data");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* success */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "IPC pipe closed");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_set_callback(future, driver_cassandra_future_callback, cb);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void connect_callback(CassFuture *future, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if ((rc = cass_future_error_code(future)) != CASS_OK) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen "Couldn't connect to Cassandra");
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* driver_cassandra_sync_init() waiting for connection to
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_connect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_callback(future, db, connect_callback, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_disconnect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen const char *const *args, *key, *value, *error;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen args = t_strsplit_spaces(connect_string, " ");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Missing value in connect string: %s",
e8434aad92ea6ff1c915b708294dbd0c7ff5908dMichael M Slusarz i_fatal("cassandra: Invalid port: %s", value);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "read_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->read_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown read_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "read_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->read_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "write_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->write_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown write_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "write_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->write_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen } else if (strcmp(key, "delete_consistency") == 0) {
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen if (consistency_parse(value, &db->delete_consistency) < 0)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen i_fatal("cassandra: Unknown delete_consistency: %s", value);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen } else if (strcmp(key, "delete_fallback_consistency") == 0) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (log_level_parse(value, &db->log_level) < 0)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen i_fatal("cassandra: Unknown log_level: %s", value);
14189e0d0af45ddcb888d026bd8d7e4609912ec5Timo Sirainen } else if (strcmp(key, "debug_queries") == 0) {
e564ff0581fc44b78badf8da36e68f9f7a27807eTimo Sirainen } else if (strcmp(key, "latency_aware_routing") == 0) {
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen if (str_to_uint(value, &db->protocol_version) < 0)
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen i_fatal("cassandra: Invalid version: %s", value);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid num_threads: %s", value);
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen } else if (strcmp(key, "heartbeat_interval") == 0) {
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen i_fatal("cassandra: Invalid heartbeat_interval '%s': %s", value, error);
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen } else if (strcmp(key, "idle_timeout") == 0) {
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen i_fatal("cassandra: Invalid idle_timeout '%s': %s", value, error);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen } else if (strcmp(key, "connect_timeout") == 0) {
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen } else if (strcmp(key, "request_timeout") == 0) {
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen } else if (strcmp(key, "warn_timeout") == 0) {
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen i_fatal("cassandra: Invalid warn_timeout '%s': %s", value, error);
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi } else if (strcmp(key, "execution_retry_interval") == 0) {
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi } else if (strcmp(key, "execution_retry_times") == 0) {
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi if (str_to_uint(value, &db->execution_retry_times) < 0)
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi i_fatal("cassandra: Invalid execution_retry_times %s", value);
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi i_fatal("cassandra: This cassandra version does not support execution_retry_times");
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen i_fatal("cassandra: Invalid page_size: %s", value);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Unknown connect string: %s", key);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->read_fallback_consistency = db->read_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->write_fallback_consistency = db->write_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->delete_fallback_consistency = db->delete_consistency;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No hosts given in connect string");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No dbname given in connect string");
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainendriver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen cass_session_get_metrics(db->session, &metrics);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen ADD_UINT64(stats, exceeded_pending_requests_water_mark);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen ADD_UINT64(stats, exceeded_write_bytes_water_mark);
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
47a5a7e8296f3b8f2fac9a0659d4de3f2723ba4aMartti Rannanjärvi str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i],
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainenstatic void driver_cassandra_metrics_write(struct cassandra_db *db)
0f5dc4da3982053036be65190e44bf28a67b1ca2Timo Sirainen if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
0f5dc4da3982053036be65190e44bf28a67b1ca2Timo Sirainen i_error("cassandra: Failed to expand metrics_path=%s: %s",
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen if (write_full(fd, str_data(data), str_len(data)) < 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_parse_connect_string(db, connect_string);
4d8f538565145fd90eae48df5c4f2ed76e51ca78Timo Sirainen if (db->protocol_version > 0 && db->protocol_version < 4) {
4d8f538565145fd90eae48df5c4f2ed76e51ca78Timo Sirainen /* binding with column indexes requires v4 */
be59f9ae981dbe4bdd264053e9febd4ea5dad75bTimo Sirainen db->timestamp_gen = cass_timestamp_gen_monotonic_new();
f0e416aa42058e7ccc0dc6deec0d4f4a19ee6ebeTimo Sirainen cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
7cd055a212d44067e2d94452c05691d696c9f699Timo Sirainen cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_cluster_set_contact_points(db->cluster, db->hosts);
c72cfe4a2bda39fff3b8a8bd64b31a7cc14d7d11Timo Sirainen cass_cluster_set_credentials(db->cluster, db->user, db->password);
e8434aad92ea6ff1c915b708294dbd0c7ff5908dMichael M Slusarz cass_cluster_set_port(db->cluster, db->port);
f9eee365367f37b1692c07db6c23d30243844aaaTimo Sirainen cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
f7d018e7e0980044e3d537958126e44ef4c45056Timo Sirainen cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
00bcc83b18793b9ec5e5d264480a88bf78b10b33Timo Sirainen cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
2599a77a28bde0653fa090802424469904d518eeTimo Sirainen cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
ba4626cd5be3d225a7a89aa338d92b8fb411fd1cTimo Sirainen db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_deinit_v(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_assert(array_count(&db->pending_prepares) == 0);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_result_unlink(struct cassandra_db *db,
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainenstatic void driver_cassandra_log_result(struct cassandra_result *result,
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
b6c9cc2bf7517adcc0b9f98696c61bde321900f6Timo Sirainen str_printfa(str, "cassandra: Finished %squery '%s' (",
b6c9cc2bf7517adcc0b9f98696c61bde321900f6Timo Sirainen result->is_prepared ? "prepared " : "", result->query);
18da63ba64987f2157cf8c490b4c4d1efba28733Timo Sirainen str_printfa(str, "timestamp=%"PRId64", ", result->timestamp);
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen str_printfa(str, "%u pages in total, ", result->page_num);
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen if (result->page_num > 0 || result->paging_continues)
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen str_printfa(str, "page %u, ", result->page_num);
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen timeval_diff_usecs(&now, &result->finish_time),
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen result->error != NULL ? result->error : "success");
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen if (reply_usecs/1000 >= db->warn_timeout_msecs) {
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_result_free(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_result->db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
8952d797eca36f997ec36569e783871b597a9216Timo Sirainen reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen driver_cassandra_log_result(result, FALSE, reply_usecs);
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen if (result->page_num > 0 && !result->paging_continues) {
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen /* Multi-page query finishes now. Log a debug/warning summary
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen message about it separate from the per-page messages. */
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen reply_usecs = timeval_diff_usecs(&result->finish_time,
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen driver_cassandra_log_result(result, TRUE, reply_usecs);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen i_assert((result->error != NULL) == (result->iterator == NULL));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->callback(&result->api, result->context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen free_result = db->sync_result != &result->api;
1856c361aad526948d56d8aafd576bca94516b92Timo Sirainen i_assert(!free_result || result->api.refcount > 0);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainenstatic void query_resend_with_fallback(struct cassandra_result *result)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen ioloop_time - db->last_fallback_warning[result->query_type];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->error, cassandra_query_type_names[result->query_type],
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_consistency_string(result->fallback_consistency),
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_consistency_string(result->consistency));
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->last_fallback_warning[result->query_type] = ioloop_time;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->consistency = result->fallback_consistency;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainenstatic void counters_inc_error(struct cassandra_db *db, CassError error)
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void query_callback(CassFuture *future, void *context)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen CassError error = cass_future_error_code(future);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &errmsg, &errsize);
e401fa68eb1e7761ffd0b747919d44568555efeeTimo Sirainen msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
0098ac3b6dcd8ef6ac20f87a8285da201db75a01Timo Sirainen /* Timeouts bring uncertainty whether the query succeeded or
0098ac3b6dcd8ef6ac20f87a8285da201db75a01Timo Sirainen not. Also _SERVER_UNAVAILABLE could have actually written
0098ac3b6dcd8ef6ac20f87a8285da201db75a01Timo Sirainen enough copies of the data for the query to succeed. */
b8eb3211af2987d6e8f0d416156171fbd74f0737Timo Sirainen result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen /* unavailable = cassandra server knows that there aren't
0098ac3b6dcd8ef6ac20f87a8285da201db75a01Timo Sirainen enough nodes available. "All hosts in current policy
0098ac3b6dcd8ef6ac20f87a8285da201db75a01Timo Sirainen attempted and were either unavailable or failed"
b394d41ad4da0e2e7b8bfafccf3b4f3e9ac26ad1Timo Sirainen no hosts available = The client library couldn't connect to
b394d41ad4da0e2e7b8bfafccf3b4f3e9ac26ad1Timo Sirainen enough cassanra nodes. Error message is the same as for
b394d41ad4da0e2e7b8bfafccf3b4f3e9ac26ad1Timo Sirainen "unavailable".
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen write timeout = cassandra server couldn't reach all the
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen needed nodes. this may be because it hasn't yet detected
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen that the servers are down, or because the servers are just
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen too busy. we'll try the fallback consistency to avoid
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen unnecessary temporary errors. */
2522acb523343dd37bf788747d86d9470fc08025Timo Sirainen if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency != result->consistency) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen /* retry with fallback consistency */
f9cf9852b0338910f1a710297374943d66fea480Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (result->fallback_consistency != result->consistency) {
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen /* non-fallback query finished successfully. if there had been
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen any fallbacks, reset them. */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen db->fallback_failures[result->query_type] = 0;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->result = cass_future_get_result(future);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->iterator = cass_iterator_from_result(result->result);
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainenstatic void driver_cassandra_init_statement(struct cassandra_result *result)
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen cass_statement_set_consistency(result->statement, result->consistency);
07038d3a12a915e98f794566f56a0ed12e0653ebAki Tuomi cass_statement_set_is_idempotent(result->statement, cass_true);
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen cass_statement_set_paging_size(result->statement, db->page_size);
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainenstatic void driver_cassandra_result_send_query(struct cassandra_result *result)
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
214aff73cd9809446bef169b216d6eb5a81079d8Timo Sirainen if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen future = cass_session_execute(db->session, result->statement);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen driver_cassandra_set_callback(future, db, query_callback, result);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainendriver_cassandra_want_fallback_query(struct cassandra_result *result)
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int failure_count = db->fallback_failures[result->query_type];
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen /* double the retries every time. */
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen /* If last primary query sent timestamp + msecs is older than current
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen time, we need to retry the primary query. Note that this practically
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen prevents multiple primary queries from being attempted
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen simultaneously, because the caller updates primary_query_last_sent
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen immediately when returning.
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen The only time when multiple primary queries can be running in
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen parallel is when the earlier query is being slow and hasn't finished
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen early enough. This could even be a wanted feature, since while the
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen first query might have to wait for a timeout, Cassandra could have
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen been fixed in the meantime and the second query finishes
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen successfully. */
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen tv = db->primary_query_last_sent[result->query_type];
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic int driver_cassandra_send_query(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->row_pool = pool_alloconly_create("cassandra result", 512);
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->read_fallback_consistency;
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen /* consistency is already set and we don't want to fallback
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen at this point anymore. */
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen result->fallback_consistency = result->consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->write_fallback_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->fallback_consistency = db->delete_fallback_consistency;
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen if (driver_cassandra_want_fallback_query(result))
1128c114416bdc4df0b41d3e15429a1522e5cfe4Timo Sirainen result->consistency = result->fallback_consistency;
2a24f3565c61cb429d1e428601f153ce53b8bae3Timo Sirainen db->primary_query_last_sent[result->query_type] = ioloop_timeval;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen if (!results[i]->query_sent && results[i]->statement != NULL) {
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen if (driver_cassandra_send_query(results[i]) <= 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void exec_callback(struct sql_result *_result ATTR_UNUSED,
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainendriver_cassandra_query_init(struct cassandra_db *db, const char *query,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_query_callback_t *callback, void *context)
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainendriver_cassandra_query_full(struct sql_db *_db, const char *query,
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen sql_query_callback_t *callback, void *context)
a5f2707224b10f26e3d478a2b11e8d01f1b8f609Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b6c9cc2bf7517adcc0b9f98696c61bde321900f6Timo Sirainen result = driver_cassandra_query_init(db, query, query_type, FALSE,
a2c4998f6e1fe5ea9a2c9bafd678cd4b6b064a0bTimo Sirainen result->statement = cass_statement_new(query, 0);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainenstatic void driver_cassandra_exec(struct sql_db *db, const char *query)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_query(struct sql_db *db, const char *query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen sql_query_callback_t *callback, void *context)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_query_s_callback(struct sql_result *result, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_init(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* wait for connecting to finish */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_deinit(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
8759c5d294e762fe9c5b7b19f3842b23aaaaf4ebTimo Sirainendriver_cassandra_sync_query(struct cassandra_db *db, const char *query,
8759c5d294e762fe9c5b7b19f3842b23aaaaf4ebTimo Sirainen driver_cassandra_query_full(&db->api, query, query_type,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* we don't end up in cassandra's free function, so sync_result
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen won't be set to NULL if we don't do it here. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_query_s(struct sql_db *_db, const char *query)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
8759c5d294e762fe9c5b7b19f3842b23aaaaf4ebTimo Sirainen result = driver_cassandra_sync_query(db, query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_get_value(struct cassandra_result *result,
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen switch (cass_data_type_type(cass_value_data_type(value))) {
769cbb608e9ed620063708aff49fc1b6e924394aTimo Sirainen const char *str = t_strdup_printf("%lld", (long long)num);
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen rc = cass_value_get_bytes(value, &output, &output_size);
b096ecf3188cdb9162460ed7ae885c03f3161462Timo Sirainen result->error = i_strdup_printf("Couldn't get value as %s: %s",
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen output_dup = p_malloc(result->row_pool, output_size + 1);
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainenstatic int driver_cassandra_result_next_page(struct cassandra_result *result)
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen /* no paging */
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen if (cass_result_has_more_pages(result->result) == cass_false)
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen /* callers that don't support sql_query_more() will still get a useful
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen error message. */
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen result->error = i_strdup("Paged query has more results, but not supported by the caller");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_result_next_row(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
23bdbb7b1831785c6ba6df190f6369da882d2b9dTimo Sirainen if (cass_iterator_next(result->iterator) == 0)
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen return driver_cassandra_result_next_page(result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen p_array_init(&result->fields, result->row_pool, 8);
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen p_array_init(&result->field_sizes, result->row_pool, 8);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen row = cass_iterator_get_row(result->iterator);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainendriver_cassandra_result_more(struct sql_result **_result, bool async,
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen sql_query_callback_t *callback, void *context)
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen /* Initialize the next page as a new sql_result */
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen new_result = driver_cassandra_query_init(db, old_result->query,
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen /* Preserve the statement and update its paging state */
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen new_result->statement = old_result->statement;
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen cass_statement_set_paging_state(new_result->statement,
b07a3abc4cc692661f5afd2fb654acb687884613Timo Sirainen /* The caller did support paging. Clear out the "...not supported by
b07a3abc4cc692661f5afd2fb654acb687884613Timo Sirainen the caller" error text, so it won't be in the debug log output. */
fa649b1fa91fa7aed260fe5d6da8d4d7b42ed3fbTimo Sirainen new_result->timestamp = old_result->timestamp;
214aff73cd9809446bef169b216d6eb5a81079d8Timo Sirainen new_result->consistency = old_result->consistency;
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen new_result->page_num = old_result->page_num + 1;
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen new_result->page0_start_time = old_result->page0_start_time;
caf029d36a826106e48b8682f15ea0fc01fdd8f4Timo Sirainen new_result->total_row_count = old_result->total_row_count;
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen (void)driver_cassandra_send_query(new_result);
dd3d20d9b5821077164183a260af9bde0db3ff3fTimo Sirainen (void)driver_cassandra_send_query(new_result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic unsigned int
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_fields_count(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value(struct sql_result *_result,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int idx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const unsigned char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen return (const void *)*strp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *const *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_values(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *driver_cassandra_result_get_error(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen return "FIXME";
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_begin(struct sql_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx = i_new(struct cassandra_transaction_context, 1);
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainendriver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainen struct cassandra_transaction_context *ctx = *_ctx;
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainentransaction_set_failed(struct cassandra_transaction_context *ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainentransaction_commit_callback(struct sql_result *result, void *context)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen struct cassandra_transaction_context *ctx = context;
bb2b3656ef7635acc374f7fc19b25aeeb454ae95Timo Sirainen commit_result.error = sql_result_get_error(result);
bb2b3656ef7635acc374f7fc19b25aeeb454ae95Timo Sirainen commit_result.error_type = sql_result_get_error_type(result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_commit_callback_t *callback, void *context)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* just a single query, send it */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
af619a25952f5ba550800daf69a119247b1fcda3Timo Sirainen cass_result = driver_cassandra_query_init(db, query, query_type,
af619a25952f5ba550800daf69a119247b1fcda3Timo Sirainen cass_result->statement = cass_statement_new(query, 0);
af619a25952f5ba550800daf69a119247b1fcda3Timo Sirainen cass_result->timestamp = ctx->query_timestamp;
af619a25952f5ba550800daf69a119247b1fcda3Timo Sirainen cass_statement_set_timestamp(cass_result->statement,
af619a25952f5ba550800daf69a119247b1fcda3Timo Sirainen (void)driver_cassandra_send_query(cass_result);
b6c9cc2bf7517adcc0b9f98696c61bde321900f6Timo Sirainen driver_cassandra_query_init(db, query, query_type, TRUE,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* wait for prepare to finish */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen ctx->stmt->result->statement = ctx->stmt->cass_stmt;
fa649b1fa91fa7aed260fe5d6da8d4d7b42ed3fbTimo Sirainen ctx->stmt->result->timestamp = ctx->stmt->timestamp;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen (void)driver_cassandra_send_query(ctx->stmt->result);
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainendriver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct sql_transaction_context *_ctx = &ctx->ctx;
8759c5d294e762fe9c5b7b19f3842b23aaaaf4ebTimo Sirainen struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
92a7f5f9bf20c0bd1b1ac309d100f9c144e2b127Timo Sirainen /* just a single query, send it */
92a7f5f9bf20c0bd1b1ac309d100f9c144e2b127Timo Sirainen if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
92a7f5f9bf20c0bd1b1ac309d100f9c144e2b127Timo Sirainen result = driver_cassandra_sync_query(db, ctx->query, query_type);
92a7f5f9bf20c0bd1b1ac309d100f9c144e2b127Timo Sirainen transaction_set_failed(ctx, sql_result_get_error(result));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char **error_r)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* nothing should be using this - don't bother implementing */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen if (ctx->query != NULL || ctx->stmt != NULL) {
92a7f5f9bf20c0bd1b1ac309d100f9c144e2b127Timo Sirainen transaction_set_failed(ctx, "Multiple changes in transaction not supported");
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainenstatic const char *
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainendriver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen /* statements require exactly correct value type */
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen if (value < -2147483648 || value > 2147483647)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void prepare_finish_arg(struct cassandra_sql_statement *stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen rc = driver_cassandra_bind_int(stmt, arg->column_idx,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void prepare_finish_statement(struct cassandra_sql_statement *stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen stmt->result->error = i_strdup(stmt->prep->error);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
125081bc1cac5f9fe006c36c88ca0535377c461cTimo Sirainen cass_statement_set_timestamp(stmt->cass_stmt, stmt->timestamp);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen (void)driver_cassandra_send_query(stmt->result);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenprepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen array_foreach(&prep_stmt->pending_statements, stmtp)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void prepare_callback(CassFuture *future, void *context)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *prep_stmt = context;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen CassError error = cass_future_error_code(future);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen cass_future_error_message(future, &errmsg, &errsize);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen prep_stmt->error = i_strndup(errmsg, errsize);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen prep_stmt->prepared = cass_future_get_prepared(future);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen array_append(&db->pending_prepares, &prep_stmt, 1);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* clear the current error in case we're retrying */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen future = cass_session_prepare(db->session, prep_stmt->query_template);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic void driver_cassandra_prepare_pending(struct cassandra_db *db)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *const *prep_stmtp;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen array_foreach(&db->pending_prepares, prep_stmtp) {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_prepared_statement_init(struct sql_db *db,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *prep_stmt =
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_new(struct cassandra_sql_prepared_statement, 1);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen prep_stmt->query_template = i_strdup(query_template);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_array_init(&prep_stmt->pending_statements, 4);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *prep_stmt =
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen (struct cassandra_sql_prepared_statement *)_prep_stmt;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_assert(array_count(&prep_stmt->pending_statements) == 0);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen p_new(pool, struct cassandra_sql_statement, 1);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_sql_prepared_statement *prep_stmt =
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen (struct cassandra_sql_prepared_statement *)_prep_stmt;
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen p_new(pool, struct cassandra_sql_statement, 1);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen p_strdup(stmt->stmt.pool, prep_stmt->query_template);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* statement is already prepared. we can use it immediately. */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* need to wait until prepare is finished */
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen array_append(&prep_stmt->pending_statements, &stmt, 1);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_abort(struct sql_statement *_stmt)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
186c97450c84494ba98e2905ca275f47ad334f4cTimo Sirainen cass_statement_set_timestamp(stmt->cass_stmt, ts_usecs);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen arg = array_append_space(&stmt->pending_args);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_bind_str(struct sql_statement *_stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen driver_cassandra_add_pending_arg(stmt, column_idx);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen arg->value_str = p_strdup(_stmt->pool, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen driver_cassandra_add_pending_arg(stmt, column_idx);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen arg->value_binary = p_memdup(_stmt->pool, value, value_size);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen driver_cassandra_bind_int(stmt, column_idx, value);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen driver_cassandra_add_pending_arg(stmt, column_idx);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_query(struct sql_statement *_stmt,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen sql_query_callback_t *callback, void *context)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen const char *query = sql_statement_get_query(_stmt);
b6c9cc2bf7517adcc0b9f98696c61bde321900f6Timo Sirainen bool is_prepared = stmt->cass_stmt != NULL || stmt->prep != NULL;
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen stmt->result = driver_cassandra_query_init(db, query,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen /* wait for prepare to finish */
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen stmt->result->statement = cass_statement_new(query, 0);
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen cass_statement_set_timestamp(stmt->result->statement,
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen (void)driver_cassandra_send_query(stmt->result);
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainenstatic struct sql_result *
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen i_panic("cassandra: sql_statement_query_s() not supported");
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainendriver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen if (ctx->query != NULL || ctx->stmt != NULL) {
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen transaction_set_failed(ctx, "Multiple changes in transaction not supported");
27688ec19adf1af07f9a996620caba05bb80acfbTimo Sirainen ctx->query = i_strdup(sql_statement_get_query(_stmt));
ef597c4619eb021563f659b886c67762fce7a817Timo Sirainen .escape_string = driver_cassandra_escape_string,
ef597c4619eb021563f659b886c67762fce7a817Timo Sirainen .transaction_begin = driver_cassandra_transaction_begin,
ef597c4619eb021563f659b886c67762fce7a817Timo Sirainen .transaction_commit = driver_cassandra_transaction_commit,
ef597c4619eb021563f659b886c67762fce7a817Timo Sirainen .transaction_commit_s = driver_cassandra_transaction_commit_s,
ef597c4619eb021563f659b886c67762fce7a817Timo Sirainen .transaction_rollback = driver_cassandra_transaction_rollback,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .prepared_statement_init = driver_cassandra_prepared_statement_init,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_init = driver_cassandra_statement_init,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_init_prepared = driver_cassandra_statement_init_prepared,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_abort = driver_cassandra_statement_abort,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_set_timestamp = driver_cassandra_statement_set_timestamp,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_bind_str = driver_cassandra_statement_bind_str,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_bind_binary = driver_cassandra_statement_bind_binary,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_bind_int64 = driver_cassandra_statement_bind_int64,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_query = driver_cassandra_statement_query,
709ee5a909d482f31611f9e6cc10d893a272e061Timo Sirainen .statement_query_s = driver_cassandra_statement_query_s,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenconst struct sql_result driver_cassandra_result = {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_result_get_field_value_binary,