driver-cassandra.c revision 769cbb608e9ed620063708aff49fc1b6e924394a
/* Copyright (c) 2015-2017 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "hex-binary.h"
#include "str.h"
#include "ioloop.h"
#include "net.h"
#include "write-full.h"
#include "time-util.h"
#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>
#define IS_CONNECTED(db) \
#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
enum cassandra_query_type {
};
#define CASSANDRA_QUERY_TYPE_COUNT 3
static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
"read", "write", "delete"
};
struct cassandra_callback {
unsigned int id;
struct cassandra_db *db;
void *context;
};
struct cassandra_db {
bool debug_queries;
bool latency_aware_routing;
unsigned int protocol_version;
unsigned int num_threads;
unsigned int connect_timeout_msecs, request_timeout_msecs;
unsigned int warn_timeout_msecs;
int fd_pipe[2];
unsigned int callback_ids;
char *metrics_path;
struct timeout *to_metrics;
unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
/* for synchronous queries: */
struct sql_result *sync_result;
char *error;
};
struct cassandra_result {
struct sql_result api;
const CassResult *result;
char *query;
char *error;
unsigned int row_count;
void *context;
bool query_sent:1;
bool finished:1;
};
struct cassandra_transaction_context {
struct sql_transaction_context ctx;
int refcount;
void *context;
char *error;
bool begin_succeeded:1;
bool begin_failed:1;
bool failed:1;
};
extern const struct sql_db driver_cassandra_db;
extern const struct sql_result driver_cassandra_result;
static struct {
const char *name;
} cass_consistency_names[] = {
{ CASS_CONSISTENCY_ANY, "any" },
{ CASS_CONSISTENCY_ONE, "one" },
{ CASS_CONSISTENCY_TWO, "two" },
{ CASS_CONSISTENCY_THREE, "three" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
{ CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
{ CASS_CONSISTENCY_SERIAL, "serial" },
{ CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
{ CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
};
static struct {
const char *name;
} cass_log_level_names[] = {
{ CASS_LOG_CRITICAL, "critical" },
{ CASS_LOG_ERROR, "error" },
{ CASS_LOG_WARN, "warn" },
{ CASS_LOG_INFO, "info" },
{ CASS_LOG_DEBUG, "debug" },
{ CASS_LOG_TRACE, "trace" }
};
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
return 0;
}
}
return -1;
}
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
return 0;
}
}
return -1;
}
{
/* switch back to original ioloop in case the caller wants to
}
{
struct cassandra_result *const *resultp;
}
}
/* running a sync query, stop it */
}
}
{
const char *message;
}
void *context)
{
/* this isn't the main thread - communicate with main thread by
writing the callback id to the pipe */
i_error("cassandra: write(pipe) failed: %m");
}
{
}
{
/* usually there are only a few callbacks, so don't bother with using
a hash table */
return;
}
}
}
{
unsigned int ids[1024];
if (ret < 0)
i_error("cassandra: read(pipe) failed: %m");
else if (ret == 0)
i_error("cassandra: read(pipe) failed: EOF");
i_error("cassandra: read(pipe) returned wrong amount of data");
else {
/* success */
return;
}
}
static void
void *context)
{
struct cassandra_callback *cb;
}
{
"Couldn't connect to Cassandra");
return;
}
/* driver_cassandra_sync_init() waiting for connection to
finish */
}
}
{
i_error("pipe() failed: %m");
return -1;
}
return 0;
}
{
}
static const char *
const char *string)
{
unsigned int i;
return string;
for (i = 0; string[i] != '\0'; i++) {
if (string[i] == '\'')
}
}
const char *connect_string)
{
i_fatal("cassandra: Missing value in connect string: %s",
*args);
}
} else {
}
}
if (!read_fallback_set)
if (!write_fallback_set)
if (!delete_fallback_set)
i_fatal("cassandra: No hosts given in connect string");
i_fatal("cassandra: No dbname given in connect string");
}
static void
{
}
{
struct var_expand_table tab[] = {
};
const char *error;
int fd;
i_error("cassandra: Failed to expand metrics_path=%s: %s",
return;
}
if (fd == -1) {
return;
}
i_close_fd(&fd);
}
{
struct cassandra_db *db;
T_BEGIN {
} T_END;
if (db->protocol_version != 0)
if (db->num_threads != 0)
if (db->latency_aware_routing)
}
{
}
struct cassandra_result *result)
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_unreached();
}
{
const char *str;
long long reply_usecs;
i_fatal("gettimeofday() failed: %m");
"cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s",
else
}
}
{
bool free_result = TRUE;
T_BEGIN {
} T_END;
if (free_result)
}
{
i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
}
}
{
const char *errmsg;
int msecs;
/* unavailable = cassandra server knows that there aren't
enough nodes available.
write timeout = cassandra server couldn't reach all the
needed nodes. this may be because it hasn't yet detected
that the servers are down, or because the servers are just
too busy. we'll try the fallback consistency to avoid
unnecessary temporary errors. */
if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
/* retry with fallback consistency */
return;
}
return;
}
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
}
}
{
}
static bool
{
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
if (failure_count == 0)
return FALSE;
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
break;
}
}
}
{
int ret;
if (ret < 0)
return ret;
}
}
switch (result->query_type) {
break;
break;
break;
}
return 1;
}
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
if (!results[i]->query_sent) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
void *context ATTR_UNUSED)
{
}
static void
{
struct cassandra_result *result;
(void)driver_cassandra_send_query(result);
}
{
}
{
}
{
}
{
return;
if (IS_CONNECTED(db))
return;
/* wait for connecting to finish */
}
{
return;
}
}
static struct sql_result *
{
struct sql_result *result;
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
}
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
}
return result;
}
static struct sql_result *
{
struct sql_result *result;
return result;
}
static int
{
const unsigned char *output;
void *output_dup;
const char *type;
if (cass_value_is_null(value) != 0) {
return 0;
}
case CASS_VALUE_TYPE_INT: {
}
type = "int32";
break;
}
case CASS_VALUE_TYPE_BIGINT: {
}
type = "int64";
break;
}
default:
type = "bytes";
break;
}
return -1;
}
*str_r = output_dup;
*len_r = output_size;
return 0;
}
{
const char *str;
unsigned int i;
int ret = 1;
return -1;
return 0;
ret = -1;
break;
}
}
return ret;
}
static unsigned int
{
}
static const char *
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
unsigned int idx)
{
const char *const *strp;
return *strp;
}
static const unsigned char *
unsigned int idx ATTR_UNUSED,
{
const char *const *strp;
return (const void *)*strp;
}
static const char *
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
{
}
{
return "FIXME";
}
static struct sql_transaction_context *
{
struct cassandra_transaction_context *ctx;
/* we need to be able to handle multiple open transactions, so at least
for now just keep them in memory until commit time. */
}
static void
{
return;
}
static void
const char *error)
{
} else {
}
}
static void
{
struct sql_commit_result commit_result;
if (sql_result_next_row(result) < 0) {
}
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
struct sql_commit_result result;
/* just a single query, send it */
else
} else {
/* multiple queries - we don't actually have a transaction though */
}
}
static void
{
}
static int
struct sql_result **result_r)
{
struct sql_result *result;
struct sql_transaction_query *query;
int ret = 0;
if (sql_result_next_row(result) < 0) {
return -1;
}
/* send queries */
if (sql_result_next_row(result) < 0) {
ret = -1;
break;
}
}
"ROLLBACK" : "COMMIT");
return ret;
}
static void
{
int ret = 0;
/* just a single query, send it */
} else {
/* multiple queries, use a transaction */
}
if (sql_result_next_row(result) < 0)
}
}
static int
const char **error_r)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static const char *
{
}
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
.flags = 0,
.v = {
}
};
const struct sql_result driver_cassandra_result = {
.v = {
}
};
const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
}
void driver_cassandra_deinit(void)
{
}
#endif