driver-cassandra.c revision 2599a77a28bde0653fa090802424469904d518ee
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose/* Copyright (c) 2015-2017 Dovecot authors, see the included COPYING file */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "lib.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "array.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "hex-binary.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "str.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "ioloop.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "net.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "write-full.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "time-util.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "var-expand.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "settings-parser.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include "sql-api-private.h"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#ifdef BUILD_CASSANDRA
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include <fcntl.h>
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include <unistd.h>
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#include <cassandra.h>
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define IS_CONNECTED(db) \
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose (db)->api.state != SQL_DB_STATE_CONNECTING)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosetypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Boseenum cassandra_query_type {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CASSANDRA_QUERY_TYPE_READ,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CASSANDRA_QUERY_TYPE_WRITE,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CASSANDRA_QUERY_TYPE_DELETE
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose#define CASSANDRA_QUERY_TYPE_COUNT 3
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose "read", "write", "delete"
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestruct cassandra_callback {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int id;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassFuture *future;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_callback_t *callback;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose void *context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestruct cassandra_db {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct sql_db api;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *hosts, *keyspace, *user, *password;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency read_consistency, write_consistency, delete_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassLogLevel log_level;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool debug_queries;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool latency_aware_routing;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int protocol_version;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int num_threads;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int connect_timeout_msecs, request_timeout_msecs;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int warn_timeout_msecs;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int heartbeat_interval_secs, idle_timeout_secs;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose in_port_t port;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassCluster *cluster;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassSession *session;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassTimestampGen *timestamp_gen;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose int fd_pipe[2];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct io *io_pipe;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ARRAY(struct cassandra_callback *) callbacks;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ARRAY(struct cassandra_result *) results;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int callback_ids;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *metrics_path;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct timeout *to_metrics;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* for synchronous queries: */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct ioloop *ioloop, *orig_ioloop;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct sql_result *sync_result;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *error;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestruct cassandra_result {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct sql_result api;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassStatement *statement;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const CassResult *result;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassIterator *iterator;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *query;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *error;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency consistency, fallback_consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose enum cassandra_query_type query_type;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct timeval start_time, finish_time;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int row_count;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose pool_t row_pool;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ARRAY_TYPE(const_string) fields;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ARRAY(size_t) field_sizes;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose sql_query_callback_t *callback;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose void *context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool query_sent:1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool finished:1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestruct cassandra_transaction_context {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct sql_transaction_context ctx;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose int refcount;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose sql_commit_callback_t *callback;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose void *context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose pool_t query_pool;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose char *error;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool begin_succeeded:1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool begin_failed:1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool failed:1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Boseextern const struct sql_db driver_cassandra_db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Boseextern const struct sql_result driver_cassandra_result;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic struct {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassConsistency consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *name;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose} cass_consistency_names[] = {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_ANY, "any" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_ONE, "one" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_TWO, "two" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_THREE, "three" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_QUORUM, "" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_ALL, "all" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_QUORUM, "" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_ALL, "all" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_SERIAL, "serial" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic struct {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassLogLevel log_level;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *name;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose} cass_log_level_names[] = {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_CRITICAL, "critical" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_ERROR, "error" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_WARN, "warn" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_INFO, "info" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_DEBUG, "debug" },
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose { CASS_LOG_TRACE, "trace" }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose};
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_result_send_query(struct cassandra_result *result);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_send_queries(struct cassandra_db *db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void result_finish(struct cassandra_result *result);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int consistency_parse(const char *str, CassConsistency *consistency_r)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (strcmp(cass_consistency_names[i].name, str) == 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose *consistency_r = cass_consistency_names[i].consistency;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return 0;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return -1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (strcmp(cass_log_level_names[i].name, str) == 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose *log_level_r = cass_log_level_names[i].log_level;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return 0;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return -1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* switch back to original ioloop in case the caller wants to
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose add/remove timeouts */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->ioloop != NULL)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose io_loop_set_current(db->orig_ioloop);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose sql_db_set_state(&db->api, state);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->ioloop != NULL)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose io_loop_set_current(db->ioloop);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_result *const *resultp;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->io_pipe != NULL)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose io_remove(&db->io_pipe);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->fd_pipe[0] != -1) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_close_fd(&db->fd_pipe[0]);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_close_fd(&db->fd_pipe[1]);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose while (array_count(&db->results) > 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose resultp = array_idx(&db->results, 0);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if ((*resultp)->error == NULL)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose (*resultp)->error = i_strdup(error);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose result_finish(*resultp);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->ioloop != NULL) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* running a sync query, stop it */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose io_loop_stop(db->ioloop);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_log_error(CassFuture *future, const char *str)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *message;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose size_t size;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cass_future_error_message(future, &message, &size);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: %s: %.*s", str, (int)size, message);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose void *context)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_callback *cb = context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* this isn't the main thread - communicate with main thread by
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose writing the callback id to the pipe */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: write(pipe) failed: %m");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void cassandra_callback_run(struct cassandra_callback *cb)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->callback(cb->future, cb->context);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cass_future_free(cb->future);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_free(cb);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_callback *cb, *const *cbp;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* usually there are only a few callbacks, so don't bother with using
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose a hash table */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose array_foreach(&db->callbacks, cbp) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb = *cbp;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (cb->id == id) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose array_delete(&db->callbacks,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose array_foreach_idx(&db->callbacks, cbp), 1);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cassandra_callback_run(cb);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_panic("cassandra: Received unknown ID %u", id);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_input(struct cassandra_db *db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int ids[1024];
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ssize_t ret;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose ret = read(db->fd_pipe[0], ids, sizeof(ids));
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (ret < 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: read(pipe) failed: %m");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose else if (ret == 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: read(pipe) failed: EOF");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose else if (ret % sizeof(ids[0]) != 0)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("cassandra: read(pipe) returned wrong amount of data");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose else {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* success */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i, count = ret / sizeof(ids[0]);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_input_id(db, ids[i]);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_close(db, "IPC pipe closed");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosedriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_callback_t *callback,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose void *context)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_callback *cb;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb = i_new(struct cassandra_callback, 1);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->id = ++db->callback_ids;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->future = future;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->callback = callback;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->context = context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cb->db = db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose array_append(&db->callbacks, &cb, 1);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose cass_future_set_callback(future, driver_cassandra_future_callback, cb);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void connect_callback(CassFuture *future, void *context)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db = context;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassError rc;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if ((rc = cass_future_error_code(future)) != CASS_OK) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_log_error(future,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose "Couldn't connect to Cassandra");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_close(db, "Couldn't connect to Cassandra");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (db->ioloop != NULL) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose /* driver_cassandra_sync_init() waiting for connection to
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose finish */
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose io_loop_stop(db->ioloop);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_send_queries(db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic int driver_cassandra_connect(struct sql_db *_db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db = (struct cassandra_db *)_db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose CassFuture *future;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (pipe(db->fd_pipe) < 0) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose i_error("pipe() failed: %m");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return -1;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose db->io_pipe = io_add(db->fd_pipe[0], IO_READ,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_input, db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_set_callback(future, db, connect_callback, db);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return 0;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_disconnect(struct sql_db *_db)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose struct cassandra_db *db = (struct cassandra_db *)_db;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose driver_cassandra_close(db, "Disconnected");
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic const char *
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosedriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *string)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose string_t *escaped;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose unsigned int i;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (strchr(string, '\'') == NULL)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return string;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose escaped = t_str_new(strlen(string)+10);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose for (i = 0; string[i] != '\0'; i++) {
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose if (string[i] == '\'')
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose str_append_c(escaped, '\'');
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose str_append_c(escaped, string[i]);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose }
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose return str_c(escaped);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose}
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bosestatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *connect_string)
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose{
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose const char *const *args, *key, *value, *error;
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose string_t *hosts = t_str_new(64);
b6dfbf81c61d4431aaa81687ec53e892f8b71edbSumit Bose bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
db->log_level = CASS_LOG_WARN;
db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
args = t_strsplit_spaces(connect_string, " ");
for (; *args != NULL; args++) {
value = strchr(*args, '=');
if (value == NULL) {
i_fatal("cassandra: Missing value in connect string: %s",
*args);
}
key = t_strdup_until(*args, value++);
if (strcmp(key, "host") == 0) {
if (str_len(hosts) > 0)
str_append_c(hosts, ',');
str_append(hosts, value);
} else if (strcmp(key, "port") == 0) {
if (net_str2port(value, &db->port) < 0)
i_fatal("cassandra: Invalid port: %s", value);
} else if (strcmp(key, "dbname") == 0 ||
strcmp(key, "keyspace") == 0) {
i_free(db->keyspace);
db->keyspace = i_strdup(value);
} else if (strcmp(key, "user") == 0) {
i_free(db->user);
db->user = i_strdup(value);
} else if (strcmp(key, "password") == 0) {
i_free(db->password);
db->password = i_strdup(value);
} else if (strcmp(key, "read_consistency") == 0) {
if (consistency_parse(value, &db->read_consistency) < 0)
i_fatal("cassandra: Unknown read_consistency: %s", value);
} else if (strcmp(key, "read_fallback_consistency") == 0) {
if (consistency_parse(value, &db->read_fallback_consistency) < 0)
i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
read_fallback_set = TRUE;
} else if (strcmp(key, "write_consistency") == 0) {
if (consistency_parse(value, &db->write_consistency) < 0)
i_fatal("cassandra: Unknown write_consistency: %s", value);
} else if (strcmp(key, "write_fallback_consistency") == 0) {
if (consistency_parse(value, &db->write_fallback_consistency) < 0)
i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
write_fallback_set = TRUE;
} else if (strcmp(key, "delete_consistency") == 0) {
if (consistency_parse(value, &db->delete_consistency) < 0)
i_fatal("cassandra: Unknown delete_consistency: %s", value);
} else if (strcmp(key, "delete_fallback_consistency") == 0) {
if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
delete_fallback_set = TRUE;
} else if (strcmp(key, "log_level") == 0) {
if (log_level_parse(value, &db->log_level) < 0)
i_fatal("cassandra: Unknown log_level: %s", value);
} else if (strcmp(key, "debug_queries") == 0) {
db->debug_queries = TRUE;
} else if (strcmp(key, "latency_aware_routing") == 0) {
db->latency_aware_routing = TRUE;
} else if (strcmp(key, "version") == 0) {
if (str_to_uint(value, &db->protocol_version) < 0)
i_fatal("cassandra: Invalid version: %s", value);
} else if (strcmp(key, "num_threads") == 0) {
if (str_to_uint(value, &db->num_threads) < 0)
i_fatal("cassandra: Invalid num_threads: %s", value);
} else if (strcmp(key, "heartbeat_interval") == 0) {
if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
i_fatal("cassandra: Invalid heartbeat_interval '%s': %s", value, error);
} else if (strcmp(key, "idle_timeout") == 0) {
if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
i_fatal("cassandra: Invalid idle_timeout '%s': %s", value, error);
} else if (strcmp(key, "connect_timeout") == 0) {
if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
} else if (strcmp(key, "request_timeout") == 0) {
if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
} else if (strcmp(key, "warn_timeout") == 0) {
if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
i_fatal("cassandra: Invalid warn_timeout '%s': %s", value, error);
} else if (strcmp(key, "metrics") == 0) {
i_free(db->metrics_path);
db->metrics_path = i_strdup(value);
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
}
if (!read_fallback_set)
db->read_fallback_consistency = db->read_consistency;
if (!write_fallback_set)
db->write_fallback_consistency = db->write_consistency;
if (!delete_fallback_set)
db->delete_fallback_consistency = db->delete_consistency;
if (str_len(hosts) == 0)
i_fatal("cassandra: No hosts given in connect string");
if (db->keyspace == NULL)
i_fatal("cassandra: No dbname given in connect string");
db->hosts = i_strdup(str_c(hosts));
}
static void
driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
{
#define ADD_UINT64(_struct, _field) \
str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
#define ADD_DOUBLE(_struct, _field) \
str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
CassMetrics metrics;
cass_session_get_metrics(db->session, &metrics);
str_append(dest, "{ \"requests\": {");
ADD_UINT64(requests, min);
ADD_UINT64(requests, max);
ADD_UINT64(requests, mean);
ADD_UINT64(requests, stddev);
ADD_UINT64(requests, median);
ADD_UINT64(requests, percentile_75th);
ADD_UINT64(requests, percentile_95th);
ADD_UINT64(requests, percentile_98th);
ADD_UINT64(requests, percentile_99th);
ADD_UINT64(requests, percentile_999th);
ADD_DOUBLE(requests, mean_rate);
ADD_DOUBLE(requests, one_minute_rate);
ADD_DOUBLE(requests, five_minute_rate);
ADD_DOUBLE(requests, fifteen_minute_rate);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}, \"stats\": {");
ADD_UINT64(stats, total_connections);
ADD_UINT64(stats, available_connections);
ADD_UINT64(stats, exceeded_pending_requests_water_mark);
ADD_UINT64(stats, exceeded_write_bytes_water_mark);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}, \"errors\": {");
ADD_UINT64(errors, connection_timeouts);
ADD_UINT64(errors, pending_request_timeouts);
ADD_UINT64(errors, request_timeouts);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}}");
}
static void driver_cassandra_metrics_write(struct cassandra_db *db)
{
struct var_expand_table tab[] = {
{ '\0', NULL, NULL }
};
string_t *path = t_str_new(64);
string_t *data;
const char *error;
int fd;
if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
i_error("cassandra: Failed to expand metrics_path=%s: %s",
db->metrics_path, error);
return;
}
fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
if (fd == -1) {
i_error("creat(%s) failed: %m", str_c(path));
return;
}
data = t_str_new(1024);
driver_cassandra_get_metrics_json(db, data);
if (write_full(fd, str_data(data), str_len(data)) < 0)
i_error("write(%s) failed: %m", str_c(path));
i_close_fd(&fd);
}
static struct sql_db *driver_cassandra_init_v(const char *connect_string)
{
struct cassandra_db *db;
db = i_new(struct cassandra_db, 1);
db->api = driver_cassandra_db;
db->fd_pipe[0] = db->fd_pipe[1] = -1;
T_BEGIN {
driver_cassandra_parse_connect_string(db, connect_string);
} T_END;
cass_log_set_level(db->log_level);
db->timestamp_gen = cass_timestamp_gen_monotonic_new();
db->cluster = cass_cluster_new();
cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
cass_cluster_set_contact_points(db->cluster, db->hosts);
if (db->user != NULL && db->password != NULL)
cass_cluster_set_credentials(db->cluster, db->user, db->password);
if (db->port != 0)
cass_cluster_set_port(db->cluster, db->port);
if (db->protocol_version != 0)
cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
if (db->num_threads != 0)
cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
if (db->latency_aware_routing)
cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
if (db->heartbeat_interval_secs != 0)
cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
if (db->idle_timeout_secs != 0)
cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
db->session = cass_session_new();
if (db->metrics_path != NULL)
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
return &db->api;
}
static void driver_cassandra_deinit_v(struct sql_db *_db)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
driver_cassandra_close(db, "Deinitialized");
i_assert(array_count(&db->callbacks) == 0);
array_free(&db->callbacks);
i_assert(array_count(&db->results) == 0);
array_free(&db->results);
cass_session_free(db->session);
cass_cluster_free(db->cluster);
cass_timestamp_gen_free(db->timestamp_gen);
if (db->to_metrics != NULL)
timeout_remove(&db->to_metrics);
i_free(db->metrics_path);
i_free(db->hosts);
i_free(db->error);
i_free(db->keyspace);
i_free(db->user);
i_free(db->password);
array_free(&_db->module_contexts);
i_free(db);
}
static void driver_cassandra_result_unlink(struct cassandra_db *db,
struct cassandra_result *result)
{
struct cassandra_result *const *results;
unsigned int i, count;
results = array_get(&db->results, &count);
for (i = 0; i < count; i++) {
if (results[i] == result) {
array_delete(&db->results, i, 1);
return;
}
}
i_unreached();
}
static void driver_cassandra_result_free(struct sql_result *_result)
{
struct cassandra_db *db = (struct cassandra_db *)_result->db;
struct cassandra_result *result = (struct cassandra_result *)_result;
struct timeval now;
const char *str;
long long reply_usecs;
i_assert(!result->api.callback);
i_assert(result->callback == NULL);
if (_result == db->sync_result)
db->sync_result = NULL;
reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
if (db->log_level >= CASS_LOG_DEBUG || db->debug_queries ||
reply_usecs/1000 >= db->warn_timeout_msecs) {
if (gettimeofday(&now, NULL) < 0)
i_fatal("gettimeofday() failed: %m");
str = t_strdup_printf(
"cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s",
result->query, result->row_count, reply_usecs,
timeval_diff_usecs(&now, &result->finish_time),
result->error != NULL ? result->error : "success");
if (reply_usecs/1000 >= db->warn_timeout_msecs)
i_warning("%s", str);
else
i_debug("%s", str);
}
if (result->result != NULL)
cass_result_free(result->result);
if (result->iterator != NULL)
cass_iterator_free(result->iterator);
if (result->statement != NULL)
cass_statement_free(result->statement);
if (result->row_pool != NULL)
pool_unref(&result->row_pool);
i_free(result->query);
i_free(result->error);
i_free(result);
}
static void result_finish(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
bool free_result = TRUE;
result->finished = TRUE;
result->finish_time = ioloop_timeval;
driver_cassandra_result_unlink(db, result);
i_assert((result->error != NULL) == (result->iterator == NULL));
result->api.callback = TRUE;
T_BEGIN {
result->callback(&result->api, result->context);
} T_END;
result->api.callback = FALSE;
free_result = db->sync_result != &result->api;
if (db->ioloop != NULL)
io_loop_stop(db->ioloop);
i_assert(!free_result || result->api.refcount > 0);
result->callback = NULL;
if (free_result)
sql_result_unref(&result->api);
}
static void query_resend_with_fallback(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
time_t last_warning =
ioloop_time - db->last_fallback_warning[result->query_type];
if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
result->error, cassandra_query_type_names[result->query_type],
cass_consistency_string(result->fallback_consistency),
cass_consistency_string(result->consistency));
db->last_fallback_warning[result->query_type] = ioloop_time;
}
i_free_and_null(result->error);
if (db->fallback_failures[result->query_type]++ == 0)
db->first_fallback_sent[result->query_type] = ioloop_timeval;
result->consistency = result->fallback_consistency;
driver_cassandra_result_send_query(result);
}
static void query_callback(CassFuture *future, void *context)
{
struct cassandra_result *result = context;
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassError error = cass_future_error_code(future);
if (error != CASS_OK) {
const char *errmsg;
size_t errsize;
int msecs;
cass_future_error_message(future, &errmsg, &errsize);
i_free(result->error);
msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
error == CASS_ERROR_LIB_REQUEST_TIMED_OUT ?
SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN :
SQL_RESULT_ERROR_TYPE_UNKNOWN;
result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs)",
result->query, (int)errsize, errmsg, msecs/1000, msecs%1000);
/* unavailable = cassandra server knows that there aren't
enough nodes available.
write timeout = cassandra server couldn't reach all the
needed nodes. this may be because it hasn't yet detected
that the servers are down, or because the servers are just
too busy. we'll try the fallback consistency to avoid
unnecessary temporary errors. */
if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
error == CASS_ERROR_SERVER_WRITE_TIMEOUT) &&
result->fallback_consistency != result->consistency) {
/* retry with fallback consistency */
query_resend_with_fallback(result);
return;
}
result_finish(result);
return;
}
if (result->fallback_consistency != result->consistency) {
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
db->fallback_failures[result->query_type] = 0;
}
result->result = cass_future_get_result(future);
result->iterator = cass_iterator_from_result(result->result);
result_finish(result);
}
static void driver_cassandra_result_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;
result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}
static bool
driver_cassandra_want_fallback_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
unsigned int failure_count = db->fallback_failures[result->query_type];
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
struct timeval tv;
if (failure_count == 0)
return FALSE;
tv = db->first_fallback_sent[result->query_type];
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
break;
}
}
timeval_add_msecs(&tv, msecs);
return timeval_cmp(&ioloop_timeval, &tv) < 0;
}
static int driver_cassandra_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
int ret;
if (!SQL_DB_IS_READY(&db->api)) {
if ((ret = sql_connect(&db->api)) <= 0) {
if (ret < 0)
driver_cassandra_close(db, "Couldn't connect to Cassandra");
return ret;
}
}
result->start_time = ioloop_timeval;
result->row_pool = pool_alloconly_create("cassandra result", 512);
switch (result->query_type) {
case CASSANDRA_QUERY_TYPE_READ:
result->consistency = db->read_consistency;
result->fallback_consistency = db->read_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_WRITE:
result->consistency = db->write_consistency;
result->fallback_consistency = db->write_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_DELETE:
result->consistency = db->delete_consistency;
result->fallback_consistency = db->delete_fallback_consistency;
break;
}
if (driver_cassandra_want_fallback_query(result))
result->consistency = result->fallback_consistency;
driver_cassandra_result_send_query(result);
result->query_sent = TRUE;
return 1;
}
static void driver_cassandra_send_queries(struct cassandra_db *db)
{
struct cassandra_result *const *results;
unsigned int i, count;
results = array_get(&db->results, &count);
for (i = 0; i < count; i++) {
if (!results[i]->query_sent) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
static void exec_callback(struct sql_result *_result ATTR_UNUSED,
void *context ATTR_UNUSED)
{
}
static void
driver_cassandra_query_full(struct sql_db *_db, const char *query,
enum cassandra_query_type query_type,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct cassandra_result *result;
result = i_new(struct cassandra_result, 1);
result->api = driver_cassandra_result;
result->api.db = _db;
result->api.refcount = 1;
result->callback = callback;
result->context = context;
result->query_type = query_type;
result->query = i_strdup(query);
array_append(&db->results, &result, 1);
(void)driver_cassandra_send_query(result);
}
static void driver_cassandra_exec(struct sql_db *db, const char *query)
{
driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
}
static void driver_cassandra_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context)
{
driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
}
static void cassandra_query_s_callback(struct sql_result *result, void *context)
{
struct cassandra_db *db = context;
db->sync_result = result;
}
static void driver_cassandra_sync_init(struct cassandra_db *db)
{
if (sql_connect(&db->api) < 0)
return;
db->orig_ioloop = current_ioloop;
db->ioloop = io_loop_create();
if (IS_CONNECTED(db))
return;
i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
db->io_pipe = io_loop_move_io(&db->io_pipe);
/* wait for connecting to finish */
io_loop_run(db->ioloop);
}
static void driver_cassandra_sync_deinit(struct cassandra_db *db)
{
if (db->orig_ioloop == NULL)
return;
if (db->io_pipe != NULL) {
io_loop_set_current(db->orig_ioloop);
db->io_pipe = io_loop_move_io(&db->io_pipe);
io_loop_set_current(db->ioloop);
}
io_loop_destroy(&db->ioloop);
}
static struct sql_result *
driver_cassandra_sync_query(struct cassandra_db *db, const char *query)
{
struct sql_result *result;
i_assert(db->sync_result == NULL);
switch (db->api.state) {
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
case SQL_DB_STATE_DISCONNECTED:
sql_not_connected_result.refcount++;
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
driver_cassandra_query(&db->api, query, cassandra_query_s_callback, db);
if (db->sync_result == NULL) {
db->io_pipe = io_loop_move_io(&db->io_pipe);
io_loop_run(db->ioloop);
}
result = db->sync_result;
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
db->sync_result = NULL;
} else if (result == NULL) {
result = &sql_not_connected_result;
result->refcount++;
}
return result;
}
static struct sql_result *
driver_cassandra_query_s(struct sql_db *_db, const char *query)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct sql_result *result;
driver_cassandra_sync_init(db);
result = driver_cassandra_sync_query(db, query);
driver_cassandra_sync_deinit(db);
return result;
}
static int
driver_cassandra_get_value(struct cassandra_result *result,
const CassValue *value, const char **str_r,
size_t *len_r)
{
const unsigned char *output;
void *output_dup;
size_t output_size;
CassError rc;
const char *type;
if (cass_value_is_null(value) != 0) {
*str_r = NULL;
return 0;
}
switch (cass_data_type_type(cass_value_data_type(value))) {
case CASS_VALUE_TYPE_INT: {
cass_int32_t num;
rc = cass_value_get_int32(value, &num);
if (rc == CASS_OK) {
const char *str = t_strdup_printf("%d", num);
output_size = strlen(str);
output = (const void *)str;
}
type = "int32";
break;
}
case CASS_VALUE_TYPE_BIGINT: {
cass_int64_t num;
rc = cass_value_get_int64(value, &num);
if (rc == CASS_OK) {
const char *str = t_strdup_printf("%lld", (long long)num);
output_size = strlen(str);
output = (const void *)str;
}
type = "int64";
break;
}
default:
rc = cass_value_get_bytes(value, &output, &output_size);
type = "bytes";
break;
}
if (rc != CASS_OK) {
i_free(result->error);
result->error = i_strdup_printf("Couldn't get value as %s: %s",
type, cass_error_desc(rc));
return -1;
}
output_dup = p_malloc(result->row_pool, output_size + 1);
memcpy(output_dup, output, output_size);
*str_r = output_dup;
*len_r = output_size;
return 0;
}
static int driver_cassandra_result_next_row(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const CassRow *row;
const CassValue *value;
const char *str;
size_t size;
unsigned int i;
int ret = 1;
if (result->iterator == NULL)
return -1;
if (cass_iterator_next(result->iterator) == 0)
return 0;
result->row_count++;
p_clear(result->row_pool);
p_array_init(&result->fields, result->row_pool, 8);
p_array_init(&result->field_sizes, result->row_pool, 8);
row = cass_iterator_get_row(result->iterator);
for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
ret = -1;
break;
}
array_append(&result->fields, &str, 1);
array_append(&result->field_sizes, &size, 1);
}
return ret;
}
static unsigned int
driver_cassandra_result_get_fields_count(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
return array_count(&result->fields);
}
static const char *
driver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
driver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
driver_cassandra_result_get_field_value(struct sql_result *_result,
unsigned int idx)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const char *const *strp;
strp = array_idx(&result->fields, idx);
return *strp;
}
static const unsigned char *
driver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
unsigned int idx ATTR_UNUSED,
size_t *size_r ATTR_UNUSED)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const char *const *strp;
const size_t *sizep;
strp = array_idx(&result->fields, idx);
sizep = array_idx(&result->field_sizes, idx);
*size_r = *sizep;
return (const void *)*strp;
}
static const char *
driver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
driver_cassandra_result_get_values(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
return array_idx(&result->fields, 0);
}
static const char *driver_cassandra_result_get_error(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
if (result->error != NULL)
return result->error;
return "FIXME";
}
static struct sql_transaction_context *
driver_cassandra_transaction_begin(struct sql_db *db)
{
struct cassandra_transaction_context *ctx;
ctx = i_new(struct cassandra_transaction_context, 1);
ctx->ctx.db = db;
ctx->refcount = 1;
/* we need to be able to handle multiple open transactions, so at least
for now just keep them in memory until commit time. */
ctx->query_pool = pool_alloconly_create("cassandra transaction", 1024);
return &ctx->ctx;
}
static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
{
struct cassandra_transaction_context *ctx = *_ctx;
*_ctx = NULL;
i_assert(ctx->refcount > 0);
if (--ctx->refcount > 0)
return;
pool_unref(&ctx->query_pool);
i_free(ctx->error);
i_free(ctx);
}
static void
transaction_set_failed(struct cassandra_transaction_context *ctx,
const char *error)
{
if (ctx->failed) {
i_assert(ctx->error != NULL);
} else {
i_assert(ctx->error == NULL);
ctx->failed = TRUE;
ctx->error = i_strdup(error);
}
}
static void
transaction_commit_callback(struct sql_result *result, void *context)
{
struct cassandra_transaction_context *ctx = context;
struct sql_commit_result commit_result;
i_zero(&commit_result);
if (sql_result_next_row(result) < 0) {
commit_result.error = sql_result_get_error(result);
commit_result.error_type = sql_result_get_error_type(result);
}
ctx->callback(&commit_result, ctx->context);
driver_cassandra_transaction_unref(&ctx);
}
static void
driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
sql_commit_callback_t *callback, void *context)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
enum cassandra_query_type query_type;
struct sql_commit_result result;
i_zero(&result);
ctx->callback = callback;
ctx->context = context;
if (ctx->failed || _ctx->head == NULL) {
if (ctx->failed)
result.error = ctx->error;
callback(&result, context);
driver_cassandra_transaction_unref(&ctx);
} else if (_ctx->head->next == NULL) {
/* just a single query, send it */
if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
query_type = CASSANDRA_QUERY_TYPE_DELETE;
else
query_type = CASSANDRA_QUERY_TYPE_WRITE;
driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
transaction_commit_callback, ctx);
} else {
/* multiple queries - we don't actually have a transaction though */
result.error = "Multiple changes in transaction not supported";
callback(&result, context);
}
}
static void
commit_multi_fail(struct cassandra_transaction_context *ctx,
struct sql_result *result, const char *query)
{
transaction_set_failed(ctx, t_strdup_printf(
"%s (query: %s)", sql_result_get_error(result), query));
sql_result_unref(result);
}
static int
driver_cassandra_transaction_commit_multi(struct cassandra_transaction_context *ctx,
struct sql_result **result_r)
{
struct cassandra_db *db = (struct cassandra_db *)ctx->ctx.db;
struct sql_result *result;
struct sql_transaction_query *query;
int ret = 0;
result = driver_cassandra_sync_query(db, "BEGIN");
if (sql_result_next_row(result) < 0) {
commit_multi_fail(ctx, result, "BEGIN");
return -1;
}
sql_result_unref(result);
/* send queries */
for (query = ctx->ctx.head; query != NULL; query = query->next) {
result = driver_cassandra_sync_query(db, query->query);
if (sql_result_next_row(result) < 0) {
commit_multi_fail(ctx, result, query->query);
ret = -1;
break;
}
sql_result_unref(result);
}
*result_r = driver_cassandra_sync_query(db, ctx->failed ?
"ROLLBACK" : "COMMIT");
return ret;
}
static void
driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
{
struct sql_transaction_context *_ctx = &ctx->ctx;
struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
struct sql_transaction_query *single_query = NULL;
struct sql_result *result = NULL;
int ret = 0;
if (_ctx->head->next == NULL) {
/* just a single query, send it */
single_query = _ctx->head;
result = sql_query_s(_ctx->db, single_query->query);
} else {
/* multiple queries, use a transaction */
driver_cassandra_sync_init(db);
ret = driver_cassandra_transaction_commit_multi(ctx, &result);
i_assert(ret == 0 || ctx->failed);
driver_cassandra_sync_deinit(db);
}
if (!ctx->failed) {
if (sql_result_next_row(result) < 0)
transaction_set_failed(ctx, sql_result_get_error(result));
}
if (result != NULL)
sql_result_unref(result);
}
static int
driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
const char **error_r)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
if (_ctx->head != NULL)
driver_cassandra_try_commit_s(ctx);
*error_r = t_strdup(ctx->error);
i_assert(ctx->refcount == 1);
i_assert((*error_r != NULL) == ctx->failed);
driver_cassandra_transaction_unref(&ctx);
return *error_r == NULL ? 0 : -1;
}
static void
driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
i_assert(ctx->refcount == 1);
driver_cassandra_transaction_unref(&ctx);
}
static void
driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
i_assert(affected_rows == NULL);
sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
}
static const char *
driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
const unsigned char *data, size_t size)
{
string_t *str = t_str_new(128);
str_append(str, "0x");
binary_to_hex_append(str, data, size);
return str_c(str);
}
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
.flags = 0,
.v = {
driver_cassandra_init_v,
driver_cassandra_deinit_v,
driver_cassandra_connect,
driver_cassandra_disconnect,
driver_cassandra_escape_string,
driver_cassandra_exec,
driver_cassandra_query,
driver_cassandra_query_s,
driver_cassandra_transaction_begin,
driver_cassandra_transaction_commit,
driver_cassandra_transaction_commit_s,
driver_cassandra_transaction_rollback,
driver_cassandra_update,
driver_cassandra_escape_blob
}
};
const struct sql_result driver_cassandra_result = {
.v = {
driver_cassandra_result_free,
driver_cassandra_result_next_row,
driver_cassandra_result_get_fields_count,
driver_cassandra_result_get_field_name,
driver_cassandra_result_find_field,
driver_cassandra_result_get_field_value,
driver_cassandra_result_get_field_value_binary,
driver_cassandra_result_find_field_value,
driver_cassandra_result_get_values,
driver_cassandra_result_get_error
}
};
const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
sql_driver_register(&driver_cassandra_db);
}
void driver_cassandra_deinit(void)
{
sql_driver_unregister(&driver_cassandra_db);
}
#endif