driver-cassandra.c revision 23bdbb7b1831785c6ba6df190f6369da882d2b9d
/* Copyright (c) 2015-2016 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "hex-binary.h"
#include "str.h"
#include "ioloop.h"
#include "net.h"
#include "write-full.h"
#include "time-util.h"
#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>
#define IS_CONNECTED(db) \
#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
enum cassandra_query_type {
};
#define CASSANDRA_QUERY_TYPE_COUNT 3
static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
"read", "write", "delete"
};
struct cassandra_callback {
unsigned int id;
struct cassandra_db *db;
void *context;
};
struct cassandra_db {
unsigned int protocol_version;
unsigned int num_threads;
unsigned int connect_timeout_msecs, request_timeout_msecs;
int fd_pipe[2];
unsigned int callback_ids;
char *metrics_path;
struct timeout *to_metrics;
unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
/* for synchronous queries: */
struct sql_result *sync_result;
char *error;
};
struct cassandra_result {
struct sql_result api;
const CassResult *result;
char *query;
char *error;
unsigned int row_count;
void *context;
bool query_sent:1;
bool finished:1;
};
struct cassandra_transaction_context {
struct sql_transaction_context ctx;
int refcount;
void *context;
char *error;
bool begin_succeeded:1;
bool begin_failed:1;
bool failed:1;
};
extern const struct sql_db driver_cassandra_db;
extern const struct sql_result driver_cassandra_result;
static struct {
const char *name;
} cass_consistency_names[] = {
{ CASS_CONSISTENCY_ANY, "any" },
{ CASS_CONSISTENCY_ONE, "one" },
{ CASS_CONSISTENCY_TWO, "two" },
{ CASS_CONSISTENCY_THREE, "three" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
{ CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
{ CASS_CONSISTENCY_SERIAL, "serial" },
{ CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
{ CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
};
static struct {
const char *name;
} cass_log_level_names[] = {
{ CASS_LOG_CRITICAL, "critical" },
{ CASS_LOG_ERROR, "error" },
{ CASS_LOG_WARN, "warn" },
{ CASS_LOG_INFO, "info" },
{ CASS_LOG_DEBUG, "debug" },
{ CASS_LOG_TRACE, "trace" }
};
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
return 0;
}
}
return -1;
}
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
return 0;
}
}
return -1;
}
{
/* switch back to original ioloop in case the caller wants to
}
{
struct cassandra_result *const *resultp;
}
}
/* running a sync query, stop it */
}
}
{
const char *message;
}
void *context)
{
/* this isn't the main thread - communicate with main thread by
writing the callback id to the pipe */
i_error("cassandra: write(pipe) failed: %m");
}
{
}
{
/* usually there are only a few callbacks, so don't bother with using
a hash table */
return;
}
}
}
{
unsigned int ids[1024];
if (ret < 0)
i_error("cassandra: read(pipe) failed: %m");
else if (ret == 0)
i_error("cassandra: read(pipe) failed: EOF");
i_error("cassandra: read(pipe) returned wrong amount of data");
else {
/* success */
return;
}
}
static void
void *context)
{
struct cassandra_callback *cb;
}
{
"Couldn't connect to Cassandra");
return;
}
/* driver_cassandra_sync_init() waiting for connection to
finish */
}
}
{
i_error("pipe() failed: %m");
return -1;
}
return 0;
}
{
}
static const char *
const char *string)
{
unsigned int i;
return string;
for (i = 0; string[i] != '\0'; i++) {
if (string[i] == '\'')
}
}
const char *connect_string)
{
i_fatal("cassandra: Missing value in connect string: %s",
*args);
}
} else {
}
}
if (!read_fallback_set)
if (!write_fallback_set)
if (!delete_fallback_set)
i_fatal("cassandra: No hosts given in connect string");
i_fatal("cassandra: No dbname given in connect string");
}
static void
{
}
{
struct var_expand_table tab[] = {
};
int fd;
if (fd == -1) {
return;
}
i_close_fd(&fd);
}
{
struct cassandra_db *db;
T_BEGIN {
} T_END;
if (db->protocol_version != 0)
if (db->num_threads != 0)
}
{
}
struct cassandra_result *result)
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_unreached();
}
{
i_fatal("gettimeofday() failed: %m");
}
}
{
bool free_result = TRUE;
T_BEGIN {
} T_END;
if (free_result)
}
{
i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
}
}
{
const char *errmsg;
if (error == CASS_ERROR_SERVER_UNAVAILABLE &&
/* retry with fallback consistency */
return;
}
return;
}
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
}
}
{
}
static bool
{
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
if (failure_count == 0)
return FALSE;
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
break;
}
}
}
{
int ret;
if (ret < 0)
return ret;
}
}
switch (result->query_type) {
break;
break;
break;
}
return 1;
}
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
if (!results[i]->query_sent) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
void *context ATTR_UNUSED)
{
}
static void
{
struct cassandra_result *result;
(void)driver_cassandra_send_query(result);
}
{
}
{
}
{
}
{
return;
if (IS_CONNECTED(db))
return;
/* wait for connecting to finish */
}
{
return;
}
}
static struct sql_result *
{
struct sql_result *result;
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
}
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
}
return result;
}
static struct sql_result *
{
struct sql_result *result;
return result;
}
static int
{
const unsigned char *output;
void *output_dup;
const char *type;
if (cass_value_is_null(value) != 0) {
return 0;
}
case CASS_VALUE_TYPE_INT: {
}
type = "int32";
break;
}
default:
type = "bytes";
break;
}
return -1;
}
*str_r = output_dup;
*len_r = output_size;
return 0;
}
{
const char *str;
unsigned int i;
int ret = 1;
return -1;
return 0;
ret = -1;
break;
}
}
return ret;
}
static unsigned int
{
}
static const char *
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
unsigned int idx)
{
const char *const *strp;
return *strp;
}
static const unsigned char *
unsigned int idx ATTR_UNUSED,
{
const char *const *strp;
return (const void *)*strp;
}
static const char *
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
{
}
{
return "FIXME";
}
static struct sql_transaction_context *
{
struct cassandra_transaction_context *ctx;
/* we need to be able to handle multiple open transactions, so at least
for now just keep them in memory until commit time. */
}
static void
{
return;
}
static void
const char *error)
{
} else {
}
}
static void
{
if (sql_result_next_row(result) < 0)
else
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
/* just a single query, send it */
else
} else {
/* multiple queries - we don't actually have a transaction though */
}
}
static void
{
}
static int
struct sql_result **result_r)
{
struct sql_result *result;
struct sql_transaction_query *query;
int ret = 0;
if (sql_result_next_row(result) < 0) {
return -1;
}
/* send queries */
if (sql_result_next_row(result) < 0) {
ret = -1;
break;
}
}
"ROLLBACK" : "COMMIT");
return ret;
}
static void
{
int ret = 0;
/* just a single query, send it */
} else {
/* multiple queries, use a transaction */
}
if (sql_result_next_row(result) < 0)
}
}
static int
const char **error_r)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static const char *
{
}
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
.flags = 0,
.v = {
}
};
const struct sql_result driver_cassandra_result = {
.v = {
}
};
const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
}
void driver_cassandra_deinit(void)
{
}
#endif