driver-cassandra.c revision af619a25952f5ba550800daf69a119247b1fcda3
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina/* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "lib.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "array.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "hex-binary.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "str.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "ioloop.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "net.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "write-full.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "time-util.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "var-expand.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "settings-parser.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include "sql-api-private.h"
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#ifdef BUILD_CASSANDRA
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include <fcntl.h>
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include <unistd.h>
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#include <cassandra.h>
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#define IS_CONNECTED(db) \
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina (db)->api.state != SQL_DB_STATE_CONNECTING)
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březinatypedef void driver_cassandra_callback_t(CassFuture *future, void *context);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březinaenum cassandra_counter_type {
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_SENT,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_TYPE_QUERY_SLOW,
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina CASSANDRA_COUNTER_COUNT
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina};
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březinastatic const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina "sent",
a1e4113a5388e34c08459c5b69679c82ac2bddc9Pavel Březina "recv_ok",
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina "recv_err_no_hosts",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_queue_full",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_client_timeout",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_server_timeout",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_server_unavailable",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "recv_err_other",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "slow",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina};
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinaenum cassandra_query_type {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CASSANDRA_QUERY_TYPE_READ,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CASSANDRA_QUERY_TYPE_READ_MORE,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CASSANDRA_QUERY_TYPE_WRITE,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CASSANDRA_QUERY_TYPE_DELETE,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CASSANDRA_QUERY_TYPE_COUNT
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina};
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina "read", "read-more", "write", "delete"
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina};
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastruct cassandra_callback {
afdc0179af0ad8ddbedd67422193ef02dcd2bf84Lukas Slebodnik unsigned int id;
afdc0179af0ad8ddbedd67422193ef02dcd2bf84Lukas Slebodnik CassFuture *future;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_callback_t *callback;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina void *context;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina};
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastruct cassandra_db {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct sql_db api;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina char *hosts, *keyspace, *user, *password;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassConsistency read_consistency, write_consistency, delete_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek CassLogLevel log_level;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek bool debug_queries;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek bool latency_aware_routing;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int protocol_version;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int num_threads;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int connect_timeout_msecs, request_timeout_msecs;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int warn_timeout_msecs;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int heartbeat_interval_secs, idle_timeout_secs;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int execution_retry_interval_msecs, execution_retry_times;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int page_size;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek in_port_t port;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek CassCluster *cluster;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek CassSession *session;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek CassTimestampGen *timestamp_gen;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek int fd_pipe[2];
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct io *io_pipe;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ARRAY(struct cassandra_callback *) callbacks;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ARRAY(struct cassandra_result *) results;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int callback_ids;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *metrics_path;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct timeout *to_metrics;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina uint64_t counters[CASSANDRA_COUNTER_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* for synchronous queries: */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct ioloop *ioloop, *orig_ioloop;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct sql_result *sync_result;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *error;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastruct cassandra_result {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct sql_result api;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassStatement *statement;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina const CassResult *result;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassIterator *iterator;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *query;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *error;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassConsistency consistency, fallback_consistency;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina enum cassandra_query_type query_type;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct timeval page0_start_time, start_time, finish_time;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int row_count, total_row_count, page_num;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina cass_int64_t timestamp;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina pool_t row_pool;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina ARRAY_TYPE(const_string) fields;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina ARRAY(size_t) field_sizes;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina sql_query_callback_t *callback;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina void *context;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool is_prepared:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool query_sent:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool finished:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool paging_continues:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastruct cassandra_transaction_context {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct sql_transaction_context ctx;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina int refcount;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina sql_commit_callback_t *callback;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina void *context;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct cassandra_sql_statement *stmt;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *query;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina cass_int64_t query_timestamp;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *error;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool begin_succeeded:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool begin_failed:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool failed:1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastruct cassandra_sql_arg {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int column_idx;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *value_str;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned char *value_binary;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina size_t value_binary_size;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina int64_t value_int64;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastruct cassandra_sql_statement {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct sql_statement stmt;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct cassandra_sql_prepared_statement *prep;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassStatement *cass_stmt;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina ARRAY(struct cassandra_sql_arg) pending_args;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina cass_int64_t timestamp;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct cassandra_result *result;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastruct cassandra_sql_prepared_statement {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct sql_prepared_statement prep_stmt;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *query_template;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* NULL, until the prepare is asynchronously finished */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina const CassPrepared *prepared;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* statements waiting for prepare to finish */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina ARRAY(struct cassandra_sql_statement *) pending_statements;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina /* an error here will cause the prepare to be retried on the next
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina execution attempt. */
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina char *error;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina bool pending;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaextern const struct sql_db driver_cassandra_db;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaextern const struct sql_result driver_cassandra_result;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic struct {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassConsistency consistency;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina const char *name;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina} cass_consistency_names[] = {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_ANY, "any" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_ONE, "one" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_TWO, "two" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_THREE, "three" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_QUORUM, "" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_ALL, "all" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_SERIAL, "serial" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic struct {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina CassLogLevel log_level;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina const char *name;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina} cass_log_level_names[] = {
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_CRITICAL, "critical" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_ERROR, "error" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_WARN, "warn" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_INFO, "info" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_DEBUG, "debug" },
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina { CASS_LOG_TRACE, "trace" }
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina};
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_prepare_pending(struct cassandra_db *db);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinaprepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_result_send_query(struct cassandra_result *result);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_send_queries(struct cassandra_db *db);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void result_finish(struct cassandra_result *result);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic int consistency_parse(const char *str, CassConsistency *consistency_r)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina{
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina unsigned int i;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (strcmp(cass_consistency_names[i].name, str) == 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek *consistency_r = cass_consistency_names[i].consistency;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return 0;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina }
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina return -1;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina}
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic int log_level_parse(const char *str, CassLogLevel *log_level_r)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int i;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (strcmp(cass_log_level_names[i].name, str) == 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek *log_level_r = cass_log_level_names[i].log_level;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return 0;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return -1;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* switch back to original ioloop in case the caller wants to
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek add/remove timeouts */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (db->ioloop != NULL)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek io_loop_set_current(db->orig_ioloop);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek sql_db_set_state(&db->api, state);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (db->ioloop != NULL)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek io_loop_set_current(db->ioloop);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_close(struct cassandra_db *db, const char *error)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_sql_prepared_statement *const *prep_stmtp;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_result *const *resultp;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek io_remove(&db->io_pipe);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (db->fd_pipe[0] != -1) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_close_fd(&db->fd_pipe[0]);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_close_fd(&db->fd_pipe[1]);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina }
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_foreach(&db->pending_prepares, prep_stmtp) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek (*prep_stmtp)->pending = FALSE;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek (*prep_stmtp)->error = i_strdup(error);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek prepare_finish_pending_statements(*prep_stmtp);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_clear(&db->pending_prepares);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek while (array_count(&db->results) > 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek resultp = array_idx(&db->results, 0);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if ((*resultp)->error == NULL)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek (*resultp)->error = i_strdup(error);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek result_finish(*resultp);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (db->ioloop != NULL) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* running a sync query, stop it */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek io_loop_stop(db->ioloop);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_log_error(CassFuture *future, const char *str)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek const char *message;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek size_t size;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cass_future_error_message(future, &message, &size);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_error("cassandra: %s: %.*s", str, (int)size, message);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
877b92e80bde510d5cd9f03dbf01e2bcf73ab072Michal Židekstatic void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek void *context)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_callback *cb = context;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* this isn't the main thread - communicate with main thread by
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek writing the callback id to the pipe. note that we must not use
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek almost any dovecot functions here because most of them are using
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek data-stack, which isn't thread-safe. especially don't use
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_error() here. */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek const char *str = t_strdup_printf(
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek "cassandra: write(pipe) failed: %s\n",
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek strerror(errno));
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek (void)write_full(STDERR_FILENO, str, strlen(str));
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void cassandra_callback_run(struct cassandra_callback *cb)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->callback(cb->future, cb->context);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cass_future_free(cb->future);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_free(cb);
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina}
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březinastatic void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina{
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina struct cassandra_callback *cb, *const *cbp;
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* usually there are only a few callbacks, so don't bother with using
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek a hash table */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_foreach(&db->callbacks, cbp) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb = *cbp;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (cb->id == id) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_delete(&db->callbacks,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_foreach_idx(&db->callbacks, cbp), 1);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cassandra_callback_run(cb);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_panic("cassandra: Received unknown ID %u", id);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void driver_cassandra_input(struct cassandra_db *db)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int ids[1024];
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ssize_t ret;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek ret = read(db->fd_pipe[0], ids, sizeof(ids));
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (ret < 0)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek i_error("cassandra: read(pipe) failed: %m");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek else if (ret == 0)
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina i_error("cassandra: read(pipe) failed: EOF");
132e477d69e07e02fe6e4d668c0bb6226206474aPavel Březina else if (ret % sizeof(ids[0]) != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("cassandra: read(pipe) returned wrong amount of data");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek else {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* success */
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek unsigned int i, count = ret / sizeof(ids[0]);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_input_id(db, ids[i]);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_close(db, "IPC pipe closed");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekdriver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_callback_t *callback,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek void *context)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_callback *cb;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb = i_new(struct cassandra_callback, 1);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->id = ++db->callback_ids;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->future = future;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->callback = callback;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->context = context;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cb->db = db;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek array_append(&db->callbacks, &cb, 1);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek cass_future_set_callback(future, driver_cassandra_future_callback, cb);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek}
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozekstatic void connect_callback(CassFuture *future, void *context)
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek{
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek struct cassandra_db *db = context;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek CassError rc;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if ((rc = cass_future_error_code(future)) != CASS_OK) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_log_error(future,
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek "Couldn't connect to Cassandra");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_close(db, "Couldn't connect to Cassandra");
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek return;
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek }
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek if (db->ioloop != NULL) {
bdf32fbb3c947dd1b2c54d1c21d8028a1ddc80e6Jakub Hrozek /* driver_cassandra_sync_init() waiting for connection to
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina finish */
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina io_loop_stop(db->ioloop);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_prepare_pending(db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_send_queries(db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic int driver_cassandra_connect(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassFuture *future;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (pipe(db->fd_pipe) < 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("pipe() failed: %m");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return -1;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->io_pipe = io_add(db->fd_pipe[0], IO_READ,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_input, db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_set_callback(future, db, connect_callback, db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return 0;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_disconnect(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_close(db, "Disconnected");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic const char *
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinadriver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina const char *string)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina string_t *escaped;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina unsigned int i;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (strchr(string, '\'') == NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return string;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina escaped = t_str_new(strlen(string)+10);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (i = 0; string[i] != '\0'; i++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (string[i] == '\'')
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append_c(escaped, '\'');
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append_c(escaped, string[i]);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return str_c(escaped);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_parse_connect_string(struct cassandra_db *db,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina const char *connect_string)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina const char *const *args, *key, *value, *error;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina string_t *hosts = t_str_new(64);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->log_level = CASS_LOG_WARN;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina args = t_strsplit_spaces(connect_string, " ");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (; *args != NULL; args++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina value = strchr(*args, '=');
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (value == NULL) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Missing value in connect string: %s",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina *args);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina key = t_strdup_until(*args, value++);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (strcmp(key, "host") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_len(hosts) > 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append_c(hosts, ',');
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(hosts, value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "port") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (net_str2port(value, &db->port) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid port: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "dbname") == 0 ||
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina strcmp(key, "keyspace") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->keyspace);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->keyspace = i_strdup(value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "user") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->user);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->user = i_strdup(value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "password") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->password);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->password = i_strdup(value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "read_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->read_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown read_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "read_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->read_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina read_fallback_set = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "write_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->write_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown write_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "write_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->write_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina write_fallback_set = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "delete_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->delete_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown delete_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "delete_fallback_consistency") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina delete_fallback_set = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "log_level") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (log_level_parse(value, &db->log_level) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown log_level: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "debug_queries") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->debug_queries = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "latency_aware_routing") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->latency_aware_routing = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "version") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->protocol_version) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid version: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "num_threads") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->num_threads) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid num_threads: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "heartbeat_interval") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid heartbeat_interval '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "idle_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid idle_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "connect_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "request_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "warn_timeout") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid warn_timeout '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "metrics") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->metrics_path);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->metrics_path = i_strdup(value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "execution_retry_interval") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#endif
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "execution_retry_times") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->execution_retry_times) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid execution_retry_times %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: This cassandra version does not support execution_retry_times");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#endif
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else if (strcmp(key, "page_size") == 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_to_uint(value, &db->page_size) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Invalid page_size: %s", value);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: Unknown connect string: %s", key);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (!read_fallback_set)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->read_fallback_consistency = db->read_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (!write_fallback_set)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->write_fallback_consistency = db->write_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (!delete_fallback_set)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->delete_fallback_consistency = db->delete_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (str_len(hosts) == 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: No hosts given in connect string");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->keyspace == NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("cassandra: No dbname given in connect string");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->hosts = i_strdup(str_c(hosts));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinadriver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#define ADD_UINT64(_struct, _field) \
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#define ADD_DOUBLE(_struct, _field) \
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina CassMetrics metrics;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_session_get_metrics(db->session, &metrics);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(dest, "{ \"requests\": {");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, min);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, max);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, mean);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, stddev);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, median);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, percentile_75th);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, percentile_95th);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, percentile_98th);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, percentile_99th);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(requests, percentile_999th);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_DOUBLE(requests, mean_rate);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_DOUBLE(requests, one_minute_rate);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_DOUBLE(requests, five_minute_rate);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_DOUBLE(requests, fifteen_minute_rate);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_truncate(dest, str_len(dest)-1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(dest, "}, \"stats\": {");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, total_connections);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, available_connections);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, exceeded_pending_requests_water_mark);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(stats, exceeded_write_bytes_water_mark);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_truncate(dest, str_len(dest)-1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(dest, "}, \"errors\": {");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(errors, connection_timeouts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(errors, pending_request_timeouts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ADD_UINT64(errors, request_timeouts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_truncate(dest, str_len(dest)-1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(dest, "}, \"queries\": {");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i],
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[i]);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_truncate(dest, str_len(dest)-1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_append(dest, "}}");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_metrics_write(struct cassandra_db *db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct var_expand_table tab[] = {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina { '\0', NULL, NULL }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina };
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina string_t *path = t_str_new(64);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina string_t *data;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina const char *error;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina int fd;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("cassandra: Failed to expand metrics_path=%s: %s",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->metrics_path, error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (fd == -1) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("creat(%s) failed: %m", str_c(path));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina data = t_str_new(1024);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_get_metrics_json(db, data);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (write_full(fd, str_data(data), str_len(data)) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_error("write(%s) failed: %m", str_c(path));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_close_fd(&fd);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic struct sql_db *driver_cassandra_init_v(const char *connect_string)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db = i_new(struct cassandra_db, 1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->api = driver_cassandra_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->fd_pipe[0] = db->fd_pipe[1] = -1;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina T_BEGIN {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_parse_connect_string(db, connect_string);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } T_END;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_log_set_level(db->log_level);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->protocol_version > 0 && db->protocol_version < 4) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina /* binding with column indexes requires v4 */
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->api.v.prepared_statement_init = NULL;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->api.v.prepared_statement_deinit = NULL;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->api.v.statement_init_prepared = NULL;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->timestamp_gen = cass_timestamp_gen_monotonic_new();
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->cluster = cass_cluster_new();
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_contact_points(db->cluster, db->hosts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->user != NULL && db->password != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_credentials(db->cluster, db->user, db->password);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->port != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_port(db->cluster, db->port);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->protocol_version != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->num_threads != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->latency_aware_routing)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->heartbeat_interval_secs != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->idle_timeout_secs != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina#endif
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->session = cass_session_new();
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->metrics_path != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_array_init(&db->results, 16);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_array_init(&db->callbacks, 16);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_array_init(&db->pending_prepares, 16);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return &db->api;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_deinit_v(struct sql_db *_db)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_close(db, "Deinitialized");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(array_count(&db->callbacks) == 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina array_free(&db->callbacks);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(array_count(&db->results) == 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina array_free(&db->results);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(array_count(&db->pending_prepares) == 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina array_free(&db->pending_prepares);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_session_free(db->session);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_cluster_free(db->cluster);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_timestamp_gen_free(db->timestamp_gen);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina timeout_remove(&db->to_metrics);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->metrics_path);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->hosts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->keyspace);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->user);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db->password);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina array_free(&_db->module_contexts);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(db);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_result_unlink(struct cassandra_db *db,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_result *result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_result *const *results;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina unsigned int i, count;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina results = array_get(&db->results, &count);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina for (i = 0; i < count; i++) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (results[i] == result) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina array_delete(&db->results, i, 1);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_unreached();
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_log_result(struct cassandra_result *result,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina bool all_pages, long long reply_usecs)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct timeval now;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina unsigned int row_count;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina reply_usecs/1000 < db->warn_timeout_msecs)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina return;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (gettimeofday(&now, NULL) < 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_fatal("gettimeofday() failed: %m");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina string_t *str = t_str_new(128);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "cassandra: Finished %squery '%s' (",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->is_prepared ? "prepared " : "", result->query);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->timestamp != 0)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "timestamp=%"PRId64", ", result->timestamp);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (all_pages) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "%u pages in total, ", result->page_num);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina row_count = result->total_row_count;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->page_num > 0 || result->paging_continues)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "page %u, ", result->page_num);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina row_count = result->row_count;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina timeval_diff_usecs(&now, &result->finish_time),
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->error != NULL ? result->error : "success");
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (reply_usecs/1000 >= db->warn_timeout_msecs) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_warning("%s", str_c(str));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } else {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_debug("%s", str_c(str));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void driver_cassandra_result_free(struct sql_result *_result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)_result->db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_result *result = (struct cassandra_result *)_result;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina long long reply_usecs;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(!result->api.callback);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(result->callback == NULL);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (_result == db->sync_result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->sync_result = NULL;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_log_result(result, FALSE, reply_usecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->page_num > 0 && !result->paging_continues) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina /* Multi-page query finishes now. Log a debug/warning summary
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina message about it separate from the per-page messages. */
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina reply_usecs = timeval_diff_usecs(&result->finish_time,
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina &result->page0_start_time);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_log_result(result, TRUE, reply_usecs);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->result != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_result_free(result->result);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->iterator != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_iterator_free(result->iterator);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (result->statement != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_statement_free(result->statement);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina pool_unref(&result->row_pool);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(result->query);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(result->error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free(result);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void result_finish(struct cassandra_result *result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina bool free_result = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->finished = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->finish_time = ioloop_timeval;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_result_unlink(db, result);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert((result->error != NULL) == (result->iterator == NULL));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->api.callback = TRUE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina T_BEGIN {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->callback(&result->api, result->context);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina } T_END;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->api.callback = FALSE;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina free_result = db->sync_result != &result->api;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (db->ioloop != NULL)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina io_loop_stop(db->ioloop);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_assert(!free_result || result->api.refcount > 0);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->callback = NULL;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (free_result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina sql_result_unref(&result->api);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void query_resend_with_fallback(struct cassandra_result *result)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina time_t last_warning =
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina ioloop_time - db->last_fallback_warning[result->query_type];
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->error, cassandra_query_type_names[result->query_type],
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_consistency_string(result->fallback_consistency),
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina cass_consistency_string(result->consistency));
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->last_fallback_warning[result->query_type] = ioloop_time;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina }
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina i_free_and_null(result->error);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->fallback_failures[result->query_type]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina result->consistency = result->fallback_consistency;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina driver_cassandra_result_send_query(result);
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina}
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březinastatic void counters_inc_error(struct cassandra_db *db, CassError error)
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina{
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina switch (error) {
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina break;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina break;
8fe171bf5a7a570591418e6548105f1d5a0097b3Pavel Březina case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina break;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina case CASS_ERROR_SERVER_WRITE_TIMEOUT:
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina break;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina case CASS_ERROR_SERVER_UNAVAILABLE:
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina break;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina default:
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina break;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina }
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina}
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březinastatic void query_callback(CassFuture *future, void *context)
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina{
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina struct cassandra_result *result = context;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina struct cassandra_db *db = (struct cassandra_db *)result->api.db;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina CassError error = cass_future_error_code(future);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina if (error != CASS_OK) {
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina const char *errmsg;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina size_t errsize;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina int msecs;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina cass_future_error_message(future, &errmsg, &errsize);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina i_free(result->error);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina counters_inc_error(db, error);
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina /* Timeouts bring uncertainty whether the query succeeded or
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina not. Also _SERVER_UNAVAILABLE could have actually written
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough copies of the data for the query to succeed. */
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina error == CASS_ERROR_SERVER_UNAVAILABLE ||
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina error == CASS_ERROR_LIB_REQUEST_TIMED_OUT ?
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN :
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina SQL_RESULT_ERROR_TYPE_UNKNOWN;
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina /* unavailable = cassandra server knows that there aren't
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough nodes available. "All hosts in current policy
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina attempted and were either unavailable or failed"
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina no hosts available = The client library couldn't connect to
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina enough cassanra nodes. Error message is the same as for
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina "unavailable".
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina
d3c82d0170d6d7407549afdadd08aa7e11aeb9a2Pavel Březina write timeout = cassandra server couldn't reach all the
needed nodes. this may be because it hasn't yet detected
that the servers are down, or because the servers are just
too busy. we'll try the fallback consistency to avoid
unnecessary temporary errors. */
if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
error == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE ||
error == CASS_ERROR_SERVER_WRITE_TIMEOUT) &&
result->fallback_consistency != result->consistency) {
/* retry with fallback consistency */
query_resend_with_fallback(result);
return;
}
result_finish(result);
return;
}
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
if (result->fallback_consistency != result->consistency) {
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
db->fallback_failures[result->query_type] = 0;
}
result->result = cass_future_get_result(future);
result->iterator = cass_iterator_from_result(result->result);
result_finish(result);
}
static void driver_cassandra_init_statement(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
cass_statement_set_consistency(result->statement, result->consistency);
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
if (db->page_size > 0)
cass_statement_set_paging_size(result->statement, db->page_size);
}
static void driver_cassandra_result_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;
i_assert(result->statement != NULL);
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
driver_cassandra_init_statement(result);
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}
static bool
driver_cassandra_want_fallback_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
unsigned int failure_count = db->fallback_failures[result->query_type];
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
struct timeval tv;
if (failure_count == 0)
return FALSE;
/* double the retries every time. */
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
msecs = CASSANDRA_FALLBACK_MAX_RETRY_MSECS;
break;
}
}
/* If last primary query sent timestamp + msecs is older than current
time, we need to retry the primary query. Note that this practically
prevents multiple primary queries from being attempted
simultaneously, because the caller updates primary_query_last_sent
immediately when returning.
The only time when multiple primary queries can be running in
parallel is when the earlier query is being slow and hasn't finished
early enough. This could even be a wanted feature, since while the
first query might have to wait for a timeout, Cassandra could have
been fixed in the meantime and the second query finishes
successfully. */
tv = db->primary_query_last_sent[result->query_type];
timeval_add_msecs(&tv, msecs);
return timeval_cmp(&ioloop_timeval, &tv) < 0;
}
static int driver_cassandra_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
int ret;
if (!SQL_DB_IS_READY(&db->api)) {
if ((ret = sql_connect(&db->api)) <= 0) {
if (ret < 0)
driver_cassandra_close(db, "Couldn't connect to Cassandra");
return ret;
}
}
if (result->page0_start_time.tv_sec == 0)
result->page0_start_time = ioloop_timeval;
result->start_time = ioloop_timeval;
result->row_pool = pool_alloconly_create("cassandra result", 512);
switch (result->query_type) {
case CASSANDRA_QUERY_TYPE_READ:
result->consistency = db->read_consistency;
result->fallback_consistency = db->read_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_READ_MORE:
/* consistency is already set and we don't want to fallback
at this point anymore. */
result->fallback_consistency = result->consistency;
break;
case CASSANDRA_QUERY_TYPE_WRITE:
result->consistency = db->write_consistency;
result->fallback_consistency = db->write_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_DELETE:
result->consistency = db->delete_consistency;
result->fallback_consistency = db->delete_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_COUNT:
i_unreached();
}
if (driver_cassandra_want_fallback_query(result))
result->consistency = result->fallback_consistency;
else
db->primary_query_last_sent[result->query_type] = ioloop_timeval;
driver_cassandra_result_send_query(result);
result->query_sent = TRUE;
return 1;
}
static void driver_cassandra_send_queries(struct cassandra_db *db)
{
struct cassandra_result *const *results;
unsigned int i, count;
results = array_get(&db->results, &count);
for (i = 0; i < count; i++) {
if (!results[i]->query_sent && results[i]->statement != NULL) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
static void exec_callback(struct sql_result *_result ATTR_UNUSED,
void *context ATTR_UNUSED)
{
}
static struct cassandra_result *
driver_cassandra_query_init(struct cassandra_db *db, const char *query,
enum cassandra_query_type query_type,
bool is_prepared,
sql_query_callback_t *callback, void *context)
{
struct cassandra_result *result;
result = i_new(struct cassandra_result, 1);
result->api = driver_cassandra_result;
result->api.db = &db->api;
result->api.refcount = 1;
result->callback = callback;
result->context = context;
result->query_type = query_type;
result->query = i_strdup(query);
result->is_prepared = is_prepared;
array_append(&db->results, &result, 1);
return result;
}
static void
driver_cassandra_query_full(struct sql_db *_db, const char *query,
enum cassandra_query_type query_type,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct cassandra_result *result;
result = driver_cassandra_query_init(db, query, query_type, FALSE,
callback, context);
result->statement = cass_statement_new(query, 0);
(void)driver_cassandra_send_query(result);
}
static void driver_cassandra_exec(struct sql_db *db, const char *query)
{
driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
}
static void driver_cassandra_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context)
{
driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
}
static void cassandra_query_s_callback(struct sql_result *result, void *context)
{
struct cassandra_db *db = context;
db->sync_result = result;
}
static void driver_cassandra_sync_init(struct cassandra_db *db)
{
if (sql_connect(&db->api) < 0)
return;
db->orig_ioloop = current_ioloop;
db->ioloop = io_loop_create();
if (IS_CONNECTED(db))
return;
i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
db->io_pipe = io_loop_move_io(&db->io_pipe);
/* wait for connecting to finish */
io_loop_run(db->ioloop);
}
static void driver_cassandra_sync_deinit(struct cassandra_db *db)
{
if (db->orig_ioloop == NULL)
return;
if (db->io_pipe != NULL) {
io_loop_set_current(db->orig_ioloop);
db->io_pipe = io_loop_move_io(&db->io_pipe);
io_loop_set_current(db->ioloop);
}
io_loop_destroy(&db->ioloop);
}
static struct sql_result *
driver_cassandra_sync_query(struct cassandra_db *db, const char *query,
enum cassandra_query_type query_type)
{
struct sql_result *result;
i_assert(db->sync_result == NULL);
switch (db->api.state) {
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
case SQL_DB_STATE_DISCONNECTED:
sql_not_connected_result.refcount++;
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
driver_cassandra_query_full(&db->api, query, query_type,
cassandra_query_s_callback, db);
if (db->sync_result == NULL) {
db->io_pipe = io_loop_move_io(&db->io_pipe);
io_loop_run(db->ioloop);
}
result = db->sync_result;
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
db->sync_result = NULL;
} else if (result == NULL) {
result = &sql_not_connected_result;
result->refcount++;
}
return result;
}
static struct sql_result *
driver_cassandra_query_s(struct sql_db *_db, const char *query)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct sql_result *result;
driver_cassandra_sync_init(db);
result = driver_cassandra_sync_query(db, query,
CASSANDRA_QUERY_TYPE_READ);
driver_cassandra_sync_deinit(db);
return result;
}
static int
driver_cassandra_get_value(struct cassandra_result *result,
const CassValue *value, const char **str_r,
size_t *len_r)
{
const unsigned char *output;
void *output_dup;
size_t output_size;
CassError rc;
const char *type;
if (cass_value_is_null(value) != 0) {
*str_r = NULL;
*len_r = 0;
return 0;
}
switch (cass_data_type_type(cass_value_data_type(value))) {
case CASS_VALUE_TYPE_INT: {
cass_int32_t num;
rc = cass_value_get_int32(value, &num);
if (rc == CASS_OK) {
const char *str = t_strdup_printf("%d", num);
output_size = strlen(str);
output = (const void *)str;
}
type = "int32";
break;
}
case CASS_VALUE_TYPE_TIMESTAMP:
case CASS_VALUE_TYPE_BIGINT: {
cass_int64_t num;
rc = cass_value_get_int64(value, &num);
if (rc == CASS_OK) {
const char *str = t_strdup_printf("%lld", (long long)num);
output_size = strlen(str);
output = (const void *)str;
}
type = "int64";
break;
}
default:
rc = cass_value_get_bytes(value, &output, &output_size);
type = "bytes";
break;
}
if (rc != CASS_OK) {
i_free(result->error);
result->error = i_strdup_printf("Couldn't get value as %s: %s",
type, cass_error_desc(rc));
return -1;
}
output_dup = p_malloc(result->row_pool, output_size + 1);
memcpy(output_dup, output, output_size);
*str_r = output_dup;
*len_r = output_size;
return 0;
}
static int driver_cassandra_result_next_page(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
if (db->page_size == 0) {
/* no paging */
return 0;
}
if (cass_result_has_more_pages(result->result) == cass_false)
return 0;
/* callers that don't support sql_query_more() will still get a useful
error message. */
i_free(result->error);
result->error = i_strdup("Paged query has more results, but not supported by the caller");
return SQL_RESULT_NEXT_MORE;
}
static int driver_cassandra_result_next_row(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const CassRow *row;
const CassValue *value;
const char *str;
size_t size;
unsigned int i;
int ret = 1;
if (result->iterator == NULL)
return -1;
if (cass_iterator_next(result->iterator) == 0)
return driver_cassandra_result_next_page(result);
result->row_count++;
result->total_row_count++;
p_clear(result->row_pool);
p_array_init(&result->fields, result->row_pool, 8);
p_array_init(&result->field_sizes, result->row_pool, 8);
row = cass_iterator_get_row(result->iterator);
for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
ret = -1;
break;
}
array_append(&result->fields, &str, 1);
array_append(&result->field_sizes, &size, 1);
}
return ret;
}
static void
driver_cassandra_result_more(struct sql_result **_result, bool async,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
struct cassandra_result *new_result;
struct cassandra_result *old_result =
(struct cassandra_result *)*_result;
/* Initialize the next page as a new sql_result */
new_result = driver_cassandra_query_init(db, old_result->query,
CASSANDRA_QUERY_TYPE_READ_MORE,
old_result->is_prepared,
callback, context);
/* Preserve the statement and update its paging state */
new_result->statement = old_result->statement;
old_result->statement = NULL;
cass_statement_set_paging_state(new_result->statement,
old_result->result);
old_result->paging_continues = TRUE;
/* The caller did support paging. Clear out the "...not supported by
the caller" error text, so it won't be in the debug log output. */
i_free_and_null(old_result->error);
new_result->consistency = old_result->consistency;
new_result->page_num = old_result->page_num + 1;
new_result->page0_start_time = old_result->page0_start_time;
new_result->total_row_count = old_result->total_row_count;
sql_result_unref(*_result);
*_result = NULL;
if (async)
(void)driver_cassandra_send_query(new_result);
else {
i_assert(db->api.state == SQL_DB_STATE_IDLE);
driver_cassandra_sync_init(db);
(void)driver_cassandra_send_query(new_result);
if (new_result->result == NULL) {
db->io_pipe = io_loop_move_io(&db->io_pipe);
io_loop_run(db->ioloop);
}
driver_cassandra_sync_deinit(db);
callback(&new_result->api, context);
}
}
static unsigned int
driver_cassandra_result_get_fields_count(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
return array_count(&result->fields);
}
static const char *
driver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
driver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
driver_cassandra_result_get_field_value(struct sql_result *_result,
unsigned int idx)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const char *const *strp;
strp = array_idx(&result->fields, idx);
return *strp;
}
static const unsigned char *
driver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
unsigned int idx ATTR_UNUSED,
size_t *size_r ATTR_UNUSED)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
const char *const *strp;
const size_t *sizep;
strp = array_idx(&result->fields, idx);
sizep = array_idx(&result->field_sizes, idx);
*size_r = *sizep;
return (const void *)*strp;
}
static const char *
driver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
driver_cassandra_result_get_values(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
return array_idx(&result->fields, 0);
}
static const char *driver_cassandra_result_get_error(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
if (result->error != NULL)
return result->error;
return "FIXME";
}
static struct sql_transaction_context *
driver_cassandra_transaction_begin(struct sql_db *db)
{
struct cassandra_transaction_context *ctx;
ctx = i_new(struct cassandra_transaction_context, 1);
ctx->ctx.db = db;
ctx->refcount = 1;
return &ctx->ctx;
}
static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
{
struct cassandra_transaction_context *ctx = *_ctx;
*_ctx = NULL;
i_assert(ctx->refcount > 0);
if (--ctx->refcount > 0)
return;
i_free(ctx->query);
i_free(ctx->error);
i_free(ctx);
}
static void
transaction_set_failed(struct cassandra_transaction_context *ctx,
const char *error)
{
if (ctx->failed) {
i_assert(ctx->error != NULL);
} else {
i_assert(ctx->error == NULL);
ctx->failed = TRUE;
ctx->error = i_strdup(error);
}
}
static void
transaction_commit_callback(struct sql_result *result, void *context)
{
struct cassandra_transaction_context *ctx = context;
struct sql_commit_result commit_result;
i_zero(&commit_result);
if (sql_result_next_row(result) < 0) {
commit_result.error = sql_result_get_error(result);
commit_result.error_type = sql_result_get_error_type(result);
}
ctx->callback(&commit_result, ctx->context);
driver_cassandra_transaction_unref(&ctx);
}
static void
driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
sql_commit_callback_t *callback, void *context)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
enum cassandra_query_type query_type;
struct sql_commit_result result;
i_zero(&result);
ctx->callback = callback;
ctx->context = context;
if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
if (ctx->failed)
result.error = ctx->error;
callback(&result, context);
driver_cassandra_transaction_unref(&ctx);
return;
}
/* just a single query, send it */
const char *query = ctx->query != NULL ?
ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
if (strncasecmp(query, "DELETE ", 7) == 0)
query_type = CASSANDRA_QUERY_TYPE_DELETE;
else
query_type = CASSANDRA_QUERY_TYPE_WRITE;
if (ctx->query != NULL) {
struct cassandra_result *cass_result;
cass_result = driver_cassandra_query_init(db, query, query_type,
FALSE, transaction_commit_callback, ctx);
cass_result->statement = cass_statement_new(query, 0);
if (ctx->query_timestamp != 0) {
cass_result->timestamp = ctx->query_timestamp;
cass_statement_set_timestamp(cass_result->statement,
ctx->query_timestamp);
}
(void)driver_cassandra_send_query(cass_result);
} else {
ctx->stmt->result =
driver_cassandra_query_init(db, query, query_type, TRUE,
transaction_commit_callback, ctx);
if (ctx->stmt->cass_stmt == NULL) {
/* wait for prepare to finish */
} else {
ctx->stmt->result->statement = ctx->stmt->cass_stmt;
(void)driver_cassandra_send_query(ctx->stmt->result);
pool_unref(&ctx->stmt->stmt.pool);
}
}
}
static void
driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
{
struct sql_transaction_context *_ctx = &ctx->ctx;
struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
struct sql_result *result = NULL;
enum cassandra_query_type query_type;
/* just a single query, send it */
if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
query_type = CASSANDRA_QUERY_TYPE_DELETE;
else
query_type = CASSANDRA_QUERY_TYPE_WRITE;
driver_cassandra_sync_init(db);
result = driver_cassandra_sync_query(db, ctx->query, query_type);
driver_cassandra_sync_deinit(db);
if (sql_result_next_row(result) < 0)
transaction_set_failed(ctx, sql_result_get_error(result));
sql_result_unref(result);
}
static int
driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
const char **error_r)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
if (ctx->stmt != NULL) {
/* nothing should be using this - don't bother implementing */
i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
}
if (ctx->query != NULL && !ctx->failed)
driver_cassandra_try_commit_s(ctx);
*error_r = t_strdup(ctx->error);
i_assert(ctx->refcount == 1);
i_assert((*error_r != NULL) == ctx->failed);
driver_cassandra_transaction_unref(&ctx);
return *error_r == NULL ? 0 : -1;
}
static void
driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
i_assert(ctx->refcount == 1);
driver_cassandra_transaction_unref(&ctx);
}
static void
driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
i_assert(affected_rows == NULL);
if (ctx->query != NULL || ctx->stmt != NULL) {
transaction_set_failed(ctx, "Multiple changes in transaction not supported");
return;
}
ctx->query = i_strdup(query);
}
static const char *
driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
const unsigned char *data, size_t size)
{
string_t *str = t_str_new(128);
str_append(str, "0x");
binary_to_hex_append(str, data, size);
return str_c(str);
}
static CassError
driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
unsigned int column_idx, int64_t value)
{
const CassDataType *data_type;
CassValueType value_type;
i_assert(stmt->prep != NULL);
/* statements require exactly correct value type */
data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
value_type = cass_data_type_type(data_type);
switch (value_type) {
case CASS_VALUE_TYPE_INT:
if (value < -2147483648 || value > 2147483647)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
case CASS_VALUE_TYPE_TIMESTAMP:
case CASS_VALUE_TYPE_BIGINT:
return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
case CASS_VALUE_TYPE_SMALL_INT:
if (value < -32768 || value > 32767)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
case CASS_VALUE_TYPE_TINY_INT:
if (value < -128 || value > 127)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
default:
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
}
}
static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
const struct cassandra_sql_arg *arg)
{
CassError rc;
if (arg->value_str != NULL) {
rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
arg->value_str);
} else if (arg->value_binary != NULL) {
rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
arg->value_binary,
arg->value_binary_size);
} else {
rc = driver_cassandra_bind_int(stmt, arg->column_idx,
arg->value_int64);
}
if (rc != CASS_OK) {
i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
stmt->stmt.query_template, arg->column_idx,
cass_error_desc(rc));
}
}
static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
{
const struct cassandra_sql_arg *arg;
if (stmt->prep->prepared == NULL) {
i_assert(stmt->prep->error != NULL);
if (stmt->result != NULL) {
stmt->result->error = i_strdup(stmt->prep->error);
result_finish(stmt->result);
}
return;
}
stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
if (stmt->timestamp != 0)
cass_statement_set_timestamp(stmt->cass_stmt, stmt->timestamp);
if (array_is_created(&stmt->pending_args)) {
array_foreach(&stmt->pending_args, arg)
prepare_finish_arg(stmt, arg);
}
if (stmt->result != NULL) {
stmt->result->statement = stmt->cass_stmt;
stmt->result->timestamp = stmt->timestamp;
(void)driver_cassandra_send_query(stmt->result);
pool_unref(&stmt->stmt.pool);
}
}
static void
prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
{
struct cassandra_sql_statement *const *stmtp;
array_foreach(&prep_stmt->pending_statements, stmtp)
prepare_finish_statement(*stmtp);
array_clear(&prep_stmt->pending_statements);
}
static void prepare_callback(CassFuture *future, void *context)
{
struct cassandra_sql_prepared_statement *prep_stmt = context;
CassError error = cass_future_error_code(future);
if (error != CASS_OK) {
const char *errmsg;
size_t errsize;
cass_future_error_message(future, &errmsg, &errsize);
i_free(prep_stmt->error);
prep_stmt->error = i_strndup(errmsg, errsize);
} else {
prep_stmt->prepared = cass_future_get_prepared(future);
}
prepare_finish_pending_statements(prep_stmt);
}
static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
{
struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
CassFuture *future;
if (!SQL_DB_IS_READY(&db->api)) {
if (!prep_stmt->pending) {
prep_stmt->pending = TRUE;
array_append(&db->pending_prepares, &prep_stmt, 1);
if (sql_connect(&db->api) < 0)
i_unreached();
}
return;
}
/* clear the current error in case we're retrying */
i_free_and_null(prep_stmt->error);
future = cass_session_prepare(db->session, prep_stmt->query_template);
driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
}
static void driver_cassandra_prepare_pending(struct cassandra_db *db)
{
struct cassandra_sql_prepared_statement *const *prep_stmtp;
i_assert(SQL_DB_IS_READY(&db->api));
array_foreach(&db->pending_prepares, prep_stmtp) {
(*prep_stmtp)->pending = FALSE;
prepare_start(*prep_stmtp);
}
array_clear(&db->pending_prepares);
}
static struct sql_prepared_statement *
driver_cassandra_prepared_statement_init(struct sql_db *db,
const char *query_template)
{
struct cassandra_sql_prepared_statement *prep_stmt =
i_new(struct cassandra_sql_prepared_statement, 1);
prep_stmt->prep_stmt.db = db;
prep_stmt->query_template = i_strdup(query_template);
i_array_init(&prep_stmt->pending_statements, 4);
prepare_start(prep_stmt);
return &prep_stmt->prep_stmt;
}
static void
driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
{
struct cassandra_sql_prepared_statement *prep_stmt =
(struct cassandra_sql_prepared_statement *)_prep_stmt;
i_assert(array_count(&prep_stmt->pending_statements) == 0);
if (prep_stmt->prepared != NULL)
cass_prepared_free(prep_stmt->prepared);
array_free(&prep_stmt->pending_statements);
i_free(prep_stmt->query_template);
i_free(prep_stmt->error);
i_free(prep_stmt);
}
static struct sql_statement *
driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
const char *query_template ATTR_UNUSED)
{
pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
struct cassandra_sql_statement *stmt =
p_new(pool, struct cassandra_sql_statement, 1);
stmt->stmt.pool = pool;
return &stmt->stmt;
}
static struct sql_statement *
driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
{
struct cassandra_sql_prepared_statement *prep_stmt =
(struct cassandra_sql_prepared_statement *)_prep_stmt;
pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
struct cassandra_sql_statement *stmt =
p_new(pool, struct cassandra_sql_statement, 1);
stmt->stmt.pool = pool;
stmt->stmt.query_template =
p_strdup(stmt->stmt.pool, prep_stmt->query_template);
stmt->prep = prep_stmt;
if (prep_stmt->prepared != NULL) {
/* statement is already prepared. we can use it immediately. */
stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
} else {
if (prep_stmt->error != NULL)
prepare_start(prep_stmt);
/* need to wait until prepare is finished */
array_append(&prep_stmt->pending_statements, &stmt, 1);
}
return &stmt->stmt;
}
static void
driver_cassandra_statement_abort(struct sql_statement *_stmt)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
if (stmt->cass_stmt != NULL)
cass_statement_free(stmt->cass_stmt);
}
static void
driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
const struct timespec *ts)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
cass_int64_t ts_usecs =
(cass_int64_t)ts->tv_sec * 1000000ULL +
ts->tv_nsec / 1000;
i_assert(stmt->result == NULL);
if (stmt->cass_stmt != NULL)
cass_statement_set_timestamp(stmt->cass_stmt, ts_usecs);
stmt->timestamp = ts_usecs;
}
static struct cassandra_sql_arg *
driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
unsigned int column_idx)
{
struct cassandra_sql_arg *arg;
if (!array_is_created(&stmt->pending_args))
p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
arg = array_append_space(&stmt->pending_args);
arg->column_idx = column_idx;
return arg;
}
static void
driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
unsigned int column_idx,
const char *value)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
if (stmt->cass_stmt != NULL)
cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
else if (stmt->prep != NULL) {
struct cassandra_sql_arg *arg =
driver_cassandra_add_pending_arg(stmt, column_idx);
arg->value_str = p_strdup(_stmt->pool, value);
}
}
static void
driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
unsigned int column_idx,
const void *value, size_t value_size)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
if (stmt->cass_stmt != NULL) {
cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
value, value_size);
} else if (stmt->prep != NULL) {
struct cassandra_sql_arg *arg =
driver_cassandra_add_pending_arg(stmt, column_idx);
arg->value_binary = p_memdup(_stmt->pool, value, value_size);
arg->value_binary_size = value_size;
}
}
static void
driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
unsigned int column_idx, int64_t value)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
if (stmt->cass_stmt != NULL)
driver_cassandra_bind_int(stmt, column_idx, value);
else if (stmt->prep != NULL) {
struct cassandra_sql_arg *arg =
driver_cassandra_add_pending_arg(stmt, column_idx);
arg->value_int64 = value;
}
}
static void
driver_cassandra_statement_query(struct sql_statement *_stmt,
sql_query_callback_t *callback, void *context)
{
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
const char *query = sql_statement_get_query(_stmt);
bool is_prepared = stmt->cass_stmt != NULL || stmt->prep != NULL;
stmt->result = driver_cassandra_query_init(db, query,
CASSANDRA_QUERY_TYPE_READ,
is_prepared,
callback, context);
if (stmt->cass_stmt != NULL) {
stmt->result->statement = stmt->cass_stmt;
} else if (stmt->prep != NULL) {
/* wait for prepare to finish */
return;
} else {
stmt->result->statement = cass_statement_new(query, 0);
stmt->result->timestamp = stmt->timestamp;
if (stmt->timestamp != 0) {
cass_statement_set_timestamp(stmt->result->statement,
stmt->timestamp);
}
}
(void)driver_cassandra_send_query(stmt->result);
pool_unref(&_stmt->pool);
}
static struct sql_result *
driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
{
i_panic("cassandra: sql_statement_query_s() not supported");
}
static void
driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
struct sql_statement *_stmt,
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
struct cassandra_sql_statement *stmt =
(struct cassandra_sql_statement *)_stmt;
i_assert(affected_rows == NULL);
if (ctx->query != NULL || ctx->stmt != NULL) {
transaction_set_failed(ctx, "Multiple changes in transaction not supported");
return;
}
if (stmt->prep != NULL)
ctx->stmt = stmt;
else {
ctx->query = i_strdup(sql_statement_get_query(_stmt));
ctx->query_timestamp = stmt->timestamp;
pool_unref(&_stmt->pool);
}
}
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
.flags = SQL_DB_FLAG_PREP_STATEMENTS,
.v = {
.init = driver_cassandra_init_v,
.deinit = driver_cassandra_deinit_v,
.connect = driver_cassandra_connect,
.disconnect = driver_cassandra_disconnect,
.escape_string = driver_cassandra_escape_string,
.exec = driver_cassandra_exec,
.query = driver_cassandra_query,
.query_s = driver_cassandra_query_s,
.transaction_begin = driver_cassandra_transaction_begin,
.transaction_commit = driver_cassandra_transaction_commit,
.transaction_commit_s = driver_cassandra_transaction_commit_s,
.transaction_rollback = driver_cassandra_transaction_rollback,
.update = driver_cassandra_update,
.escape_blob = driver_cassandra_escape_blob,
.prepared_statement_init = driver_cassandra_prepared_statement_init,
.prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
.statement_init = driver_cassandra_statement_init,
.statement_init_prepared = driver_cassandra_statement_init_prepared,
.statement_abort = driver_cassandra_statement_abort,
.statement_set_timestamp = driver_cassandra_statement_set_timestamp,
.statement_bind_str = driver_cassandra_statement_bind_str,
.statement_bind_binary = driver_cassandra_statement_bind_binary,
.statement_bind_int64 = driver_cassandra_statement_bind_int64,
.statement_query = driver_cassandra_statement_query,
.statement_query_s = driver_cassandra_statement_query_s,
.update_stmt = driver_cassandra_update_stmt,
}
};
const struct sql_result driver_cassandra_result = {
.v = {
driver_cassandra_result_free,
driver_cassandra_result_next_row,
driver_cassandra_result_get_fields_count,
driver_cassandra_result_get_field_name,
driver_cassandra_result_find_field,
driver_cassandra_result_get_field_value,
driver_cassandra_result_get_field_value_binary,
driver_cassandra_result_find_field_value,
driver_cassandra_result_get_values,
driver_cassandra_result_get_error,
driver_cassandra_result_more,
}
};
const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
sql_driver_register(&driver_cassandra_db);
}
void driver_cassandra_deinit(void)
{
sql_driver_unregister(&driver_cassandra_db);
}
#endif