driver-cassandra.c revision 37e8420b32a0fa3442c405616980e45beb494104
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen/* Copyright (c) 2015 Dovecot authors, see the included COPYING file */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainentypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int id;
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen CassConsistency read_consistency, write_consistency, delete_consistency;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen /* for synchronous queries: */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_db driver_cassandra_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenextern const struct sql_result driver_cassandra_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic struct {
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int consistency_parse(const char *str, CassConsistency *consistency_r)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (strcmp(cass_consistency_names[i].name, str) == 0) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen *consistency_r = cass_consistency_names[i].consistency;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainenstatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen unsigned int i;
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (strcmp(cass_log_level_names[i].name, str) == 0) {
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen *log_level_r = cass_log_level_names[i].log_level;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* switch back to original ioloop in case the caller wants to
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* running a sync query, stop it */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &message, &size);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: %s: %.*s", str, (int)size, message);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* this isn't the main thread - communicate with main thread by
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen writing the callback id to the pipe */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_callback_run(struct cassandra_callback *cb)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* usually there are only a few callbacks, so don't bother with using
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen a hash table */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_panic("cassandra: Received unknown ID %u", id);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_input(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen else if (ret == 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_error("cassandra: read(pipe) returned wrong amount of data");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* success */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "IPC pipe closed");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_set_callback(future, driver_cassandra_future_callback, cb);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void connect_callback(CassFuture *future, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if ((rc = cass_future_error_code(future)) != CASS_OK) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen "Couldn't connect to Cassandra");
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* driver_cassandra_sync_init() waiting for connection to
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_connect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_callback(future, db, connect_callback, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_disconnect(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen args = t_strsplit_spaces(connect_string, " ");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Missing value in connect string: %s",
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "read_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->read_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown read_consistency: %s", value);
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen } else if (strcmp(key, "write_consistency") == 0) {
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen if (consistency_parse(value, &db->write_consistency) < 0)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen i_fatal("cassandra: Unknown write_consistency: %s", value);
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen } else if (strcmp(key, "delete_consistency") == 0) {
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen if (consistency_parse(value, &db->delete_consistency) < 0)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen i_fatal("cassandra: Unknown delete_consistency: %s", value);
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen if (log_level_parse(value, &db->log_level) < 0)
a1044a46a8f3512173f4ea2684ef1fc3e61645c7Timo Sirainen i_fatal("cassandra: Unknown log_level: %s", value);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: Unknown connect string: %s", key);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No hosts given in connect string");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_fatal("cassandra: No dbname given in connect string");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_parse_connect_string(db, connect_string);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_cluster_set_connect_timeout(db->cluster, SQL_CONNECT_TIMEOUT_SECS * 1000);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_cluster_set_request_timeout(db->cluster, SQL_QUERY_TIMEOUT_SECS * 1000);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_cluster_set_contact_points(db->cluster, db->hosts);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_deinit_v(struct sql_db *_db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_result_unlink(struct cassandra_db *db,
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen unsigned int i, count;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_result_free(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_result->db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
37e8420b32a0fa3442c405616980e45beb494104Timo Sirainen i_debug("cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s", result->query,
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen timeval_diff_usecs(&result->finish_time, &result->start_time),
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen timeval_diff_usecs(&now, &result->finish_time),
2ccb478c35972517721ce415d81fcbd11a73fad3Timo Sirainen result->error != NULL ? result->error : "success");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void result_finish(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen i_assert((result->error != NULL) == (result->iterator == NULL));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->callback(&result->api, result->context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen free_result = db->sync_result != &result->api;
1856c361aad526948d56d8aafd576bca94516b92Timo Sirainen i_assert(!free_result || result->api.refcount > 0);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void query_callback(CassFuture *future, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen if (cass_future_error_code(future) != CASS_OK) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen cass_future_error_message(future, &errmsg, &errsize);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->error = i_strdup_printf("Query '%s' failed: %.*s",
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->result = cass_future_get_result(future);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->iterator = cass_iterator_from_result(result->result);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic int driver_cassandra_send_query(struct cassandra_result *result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen driver_cassandra_close(db, "Couldn't connect to Cassandra");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->row_pool = pool_alloconly_create("cassandra result", 512);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen result->statement = cass_statement_new(result->query, 0);
6ea145a99eeee923602f04d3c9183bbdba6cd190Timo Sirainen cass_statement_set_consistency(result->statement, db->read_consistency);
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen cass_statement_set_consistency(result->statement, db->write_consistency);
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen cass_statement_set_consistency(result->statement, db->delete_consistency);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen future = cass_session_execute(db->session, result->statement);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_set_callback(future, db, query_callback, result);
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainenstatic void driver_cassandra_send_queries(struct cassandra_db *db)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen unsigned int i, count;
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen for (i = 0; i < count; i++) {
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen if (driver_cassandra_send_query(results[i]) <= 0)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void exec_callback(struct sql_result *_result ATTR_UNUSED,
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainendriver_cassandra_query_full(struct sql_db *_db, const char *query,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_query_callback_t *callback, void *context)
1fb5e50695bbbc0da082e5a6f19f29d2bb2f6531Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
ce74395e2a932342e04fb682395bcce111574969Timo Sirainenstatic void driver_cassandra_exec(struct sql_db *db, const char *query)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_query(struct sql_db *db, const char *query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen sql_query_callback_t *callback, void *context)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void cassandra_query_s_callback(struct sql_result *result, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_init(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* wait for connecting to finish */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic void driver_cassandra_sync_deinit(struct cassandra_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_sync_query(struct cassandra_db *db, const char *query)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_query(&db->api, query, cassandra_query_s_callback, db);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* we don't end up in cassandra's free function, so sync_result
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen won't be set to NULL if we don't do it here. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic struct sql_result *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_query_s(struct sql_db *_db, const char *query)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, query);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_get_value(struct cassandra_result *result,
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen const unsigned char *output;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen rc = cass_value_get_bytes(value, &output, &output_size);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result->error = i_strdup_printf("Couldn't get value as string (code=%d)", rc);
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen output_dup = p_malloc(result->row_pool, output_size + 1);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic int driver_cassandra_result_next_row(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char *str;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int i;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen p_array_init(&result->fields, result->row_pool, 8);
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen p_array_init(&result->field_sizes, result->row_pool, 8);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen row = cass_iterator_get_row(result->iterator);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic unsigned int
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_fields_count(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value(struct sql_result *_result,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen unsigned int idx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char *const *strp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const unsigned char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen const char *const *strp;
61f39b0358a72ebc693d84ba5bac74489ee7df41Timo Sirainen return (const void *)*strp;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *const *
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_result_get_values(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenstatic const char *driver_cassandra_result_get_error(struct sql_result *_result)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_result *result = (struct cassandra_result *)_result;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen return "FIXME";
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_begin(struct sql_db *db)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx = i_new(struct cassandra_transaction_context, 1);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* we need to be able to handle multiple open transactions, so at least
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for now just keep them in memory until commit time. */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx->query_pool = pool_alloconly_create("cassandra transaction", 1024);
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainendriver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
6b4b3e5fe8d9e84f4b1356ee898ca76996a11fe1Timo Sirainen struct cassandra_transaction_context *ctx = *_ctx;
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainentransaction_set_failed(struct cassandra_transaction_context *ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainentransaction_commit_callback(struct sql_result *result, void *context)
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen struct cassandra_transaction_context *ctx = context;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen ctx->callback(sql_result_get_error(result), ctx->context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen sql_commit_callback_t *callback, void *context)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen callback(ctx->failed ? ctx->error : NULL, context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* just a single query, send it */
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
4db61af2cfe2b206113bcc4b6153521679702bb4Timo Sirainen driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen /* multiple queries - we don't actually have a transaction though */
ce74395e2a932342e04fb682395bcce111574969Timo Sirainen callback("Multiple changes in transaction not supported", context);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainencommit_multi_fail(struct cassandra_transaction_context *ctx,
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainen "%s (query: %s)", sql_result_get_error(result), query));
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainendriver_cassandra_transaction_commit_multi(struct cassandra_transaction_context *ctx,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)ctx->ctx.db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, "BEGIN");
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* send queries */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen for (query = ctx->ctx.head; query != NULL; query = query->next) {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = driver_cassandra_sync_query(db, query->query);
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen *result_r = driver_cassandra_sync_query(db, ctx->failed ?
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainendriver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct sql_transaction_context *_ctx = &ctx->ctx;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen struct sql_transaction_query *single_query = NULL;
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* just a single query, send it */
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen result = sql_query_s(_ctx->db, single_query->query);
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen /* multiple queries, use a transaction */
b650f04c3b2e7dea2295bdbe3239eb82ec03ada0Timo Sirainen ret = driver_cassandra_transaction_commit_multi(ctx, &result);
e22ec7998afd426c53c658483ce66b6e404e27c6Timo Sirainen transaction_set_failed(ctx, sql_result_get_error(result));
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen const char **error_r)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainendriver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainenstatic const char *
b87761f9bbef949f31dae297e619ac3f5e9c2b2eTimo Sirainendriver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainenconst struct sql_result driver_cassandra_result = {
b772ddf3cfb606dddaa465b317a0dc01bf06c6e4Timo Sirainen driver_cassandra_result_get_field_value_binary,