driver-cassandra.c revision a5f2707224b10f26e3d478a2b11e8d01f1b8f609
/* Copyright (c) 2015-2017 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "hex-binary.h"
#include "str.h"
#include "ioloop.h"
#include "net.h"
#include "write-full.h"
#include "time-util.h"
#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>
#define IS_CONNECTED(db) \
#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
enum cassandra_counter_type {
};
static const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
"sent",
"recv_ok",
"recv_err_no_hosts",
"recv_err_queue_full",
"recv_err_client_timeout",
"recv_err_server_timeout",
"recv_err_server_unavailable",
"recv_err_other",
"slow",
};
enum cassandra_query_type {
};
#define CASSANDRA_QUERY_TYPE_COUNT 3
static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
"read", "write", "delete"
};
struct cassandra_callback {
unsigned int id;
struct cassandra_db *db;
void *context;
};
struct cassandra_db {
bool debug_queries;
bool latency_aware_routing;
unsigned int protocol_version;
unsigned int num_threads;
unsigned int connect_timeout_msecs, request_timeout_msecs;
unsigned int warn_timeout_msecs;
unsigned int heartbeat_interval_secs, idle_timeout_secs;
unsigned int execution_retry_interval_msecs, execution_retry_times;
int fd_pipe[2];
unsigned int callback_ids;
char *metrics_path;
struct timeout *to_metrics;
unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
/* for synchronous queries: */
struct sql_result *sync_result;
char *error;
};
struct cassandra_result {
struct sql_result api;
const CassResult *result;
char *query;
char *error;
unsigned int row_count;
void *context;
bool query_sent:1;
bool finished:1;
};
struct cassandra_transaction_context {
struct sql_transaction_context ctx;
int refcount;
void *context;
char *error;
bool begin_succeeded:1;
bool begin_failed:1;
bool failed:1;
};
extern const struct sql_db driver_cassandra_db;
extern const struct sql_result driver_cassandra_result;
static struct {
const char *name;
} cass_consistency_names[] = {
{ CASS_CONSISTENCY_ANY, "any" },
{ CASS_CONSISTENCY_ONE, "one" },
{ CASS_CONSISTENCY_TWO, "two" },
{ CASS_CONSISTENCY_THREE, "three" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
{ CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
{ CASS_CONSISTENCY_SERIAL, "serial" },
{ CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
{ CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
};
static struct {
const char *name;
} cass_log_level_names[] = {
{ CASS_LOG_CRITICAL, "critical" },
{ CASS_LOG_ERROR, "error" },
{ CASS_LOG_WARN, "warn" },
{ CASS_LOG_INFO, "info" },
{ CASS_LOG_DEBUG, "debug" },
{ CASS_LOG_TRACE, "trace" }
};
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
return 0;
}
}
return -1;
}
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
return 0;
}
}
return -1;
}
{
/* switch back to original ioloop in case the caller wants to
}
{
struct cassandra_result *const *resultp;
}
}
/* running a sync query, stop it */
}
}
{
const char *message;
}
void *context)
{
/* this isn't the main thread - communicate with main thread by
writing the callback id to the pipe. note that we must not use
almost any dovecot functions here because most of them are using
data-stack, which isn't thread-safe. especially don't use
i_error() here. */
const char *str = t_strdup_printf(
"cassandra: write(pipe) failed: %s\n",
}
}
{
}
{
/* usually there are only a few callbacks, so don't bother with using
a hash table */
return;
}
}
}
{
unsigned int ids[1024];
if (ret < 0)
i_error("cassandra: read(pipe) failed: %m");
else if (ret == 0)
i_error("cassandra: read(pipe) failed: EOF");
i_error("cassandra: read(pipe) returned wrong amount of data");
else {
/* success */
return;
}
}
static void
void *context)
{
struct cassandra_callback *cb;
}
{
"Couldn't connect to Cassandra");
return;
}
/* driver_cassandra_sync_init() waiting for connection to
finish */
}
}
{
i_error("pipe() failed: %m");
return -1;
}
return 0;
}
{
}
static const char *
const char *string)
{
unsigned int i;
return string;
for (i = 0; string[i] != '\0'; i++) {
if (string[i] == '\'')
}
}
const char *connect_string)
{
i_fatal("cassandra: Missing value in connect string: %s",
*args);
}
i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
#endif
i_fatal("cassandra: This cassandra version does not support execution_retry_times");
#endif
} else {
}
}
if (!read_fallback_set)
if (!write_fallback_set)
if (!delete_fallback_set)
i_fatal("cassandra: No hosts given in connect string");
i_fatal("cassandra: No dbname given in connect string");
}
static void
{
for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
}
}
{
struct var_expand_table tab[] = {
};
const char *error;
int fd;
i_error("cassandra: Failed to expand metrics_path=%s: %s",
return;
}
if (fd == -1) {
return;
}
i_close_fd(&fd);
}
{
struct cassandra_db *db;
T_BEGIN {
} T_END;
if (db->protocol_version != 0)
if (db->num_threads != 0)
if (db->latency_aware_routing)
if (db->heartbeat_interval_secs != 0)
if (db->idle_timeout_secs != 0)
cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
#endif
}
{
}
struct cassandra_result *result)
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_unreached();
}
long long reply_usecs)
{
const char *str;
return;
i_fatal("gettimeofday() failed: %m");
"cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s",
} else {
}
}
{
long long reply_usecs;
}
{
bool free_result = TRUE;
T_BEGIN {
} T_END;
if (free_result)
}
{
i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
}
}
{
switch (error) {
break;
break;
break;
break;
break;
default:
break;
}
}
{
const char *errmsg;
int msecs;
/* Timeouts bring uncertainty whether the query succeeded or
not. Also _SERVER_UNAVAILABLE could have actually written
enough copies of the data for the query to succeed. */
/* unavailable = cassandra server knows that there aren't
enough nodes available. "All hosts in current policy
attempted and were either unavailable or failed"
no hosts available = The client library couldn't connect to
enough cassanra nodes. Error message is the same as for
"unavailable".
write timeout = cassandra server couldn't reach all the
needed nodes. this may be because it hasn't yet detected
that the servers are down, or because the servers are just
too busy. we'll try the fallback consistency to avoid
unnecessary temporary errors. */
if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
/* retry with fallback consistency */
return;
}
return;
}
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
}
}
{
#endif
}
{
}
static bool
{
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
if (failure_count == 0)
return FALSE;
/* double the retries every time. */
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
break;
}
}
/* If last primary query sent timestamp + msecs is older than current
time, we need to retry the primary query. Note that this practically
prevents multiple primary queries from being attempted
simultaneously, because the caller updates primary_query_last_sent
immediately when returning.
The only time when multiple primary queries can be running in
parallel is when the earlier query is being slow and hasn't finished
early enough. This could even be a wanted feature, since while the
first query might have to wait for a timeout, Cassandra could have
been fixed in the meantime and the second query finishes
successfully. */
}
{
int ret;
if (ret < 0)
return ret;
}
}
switch (result->query_type) {
break;
break;
break;
}
else
return 1;
}
{
struct cassandra_result *const *results;
unsigned int i, count;
for (i = 0; i < count; i++) {
if (!results[i]->query_sent) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
void *context ATTR_UNUSED)
{
}
static struct cassandra_result *
{
struct cassandra_result *result;
return result;
}
static void
{
struct cassandra_result *result;
(void)driver_cassandra_send_query(result);
}
{
}
{
}
{
}
{
return;
if (IS_CONNECTED(db))
return;
/* wait for connecting to finish */
}
{
return;
}
}
static struct sql_result *
{
struct sql_result *result;
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
}
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
}
return result;
}
static struct sql_result *
{
struct sql_result *result;
return result;
}
static int
{
const unsigned char *output;
void *output_dup;
const char *type;
if (cass_value_is_null(value) != 0) {
return 0;
}
case CASS_VALUE_TYPE_INT: {
}
type = "int32";
break;
}
case CASS_VALUE_TYPE_BIGINT: {
}
type = "int64";
break;
}
default:
type = "bytes";
break;
}
return -1;
}
*str_r = output_dup;
*len_r = output_size;
return 0;
}
{
const char *str;
unsigned int i;
int ret = 1;
return -1;
return 0;
ret = -1;
break;
}
}
return ret;
}
static unsigned int
{
}
static const char *
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
unsigned int idx)
{
const char *const *strp;
return *strp;
}
static const unsigned char *
unsigned int idx ATTR_UNUSED,
{
const char *const *strp;
return (const void *)*strp;
}
static const char *
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
{
}
{
return "FIXME";
}
static struct sql_transaction_context *
{
struct cassandra_transaction_context *ctx;
/* we need to be able to handle multiple open transactions, so at least
for now just keep them in memory until commit time. */
}
static void
{
return;
}
static void
const char *error)
{
} else {
}
}
static void
{
struct sql_commit_result commit_result;
if (sql_result_next_row(result) < 0) {
}
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
struct sql_commit_result result;
/* just a single query, send it */
else
} else {
/* multiple queries - we don't actually have a transaction though */
}
}
static void
{
}
static int
struct sql_result **result_r)
{
struct sql_result *result;
struct sql_transaction_query *query;
int ret = 0;
if (sql_result_next_row(result) < 0) {
return -1;
}
/* send queries */
if (sql_result_next_row(result) < 0) {
ret = -1;
break;
}
}
"ROLLBACK" : "COMMIT");
return ret;
}
static void
{
int ret = 0;
/* just a single query, send it */
} else {
/* multiple queries, use a transaction */
}
if (sql_result_next_row(result) < 0)
}
}
static int
const char **error_r)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static void
unsigned int *affected_rows)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
}
static const char *
{
}
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
.flags = 0,
.v = {
}
};
const struct sql_result driver_cassandra_result = {
.v = {
NULL,
}
};
const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
}
void driver_cassandra_deinit(void)
{
}
#endif