/* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "array.h"
#include "hex-binary.h"
#include "str.h"
#include "ioloop.h"
#include "net.h"
#include "write-full.h"
#include "time-util.h"
#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>
enum cassandra_counter_type {
};
"sent",
"recv_ok",
"recv_err_no_hosts",
"recv_err_queue_full",
"recv_err_client_timeout",
"recv_err_server_timeout",
"recv_err_server_unavailable",
"recv_err_other",
"slow",
};
enum cassandra_query_type {
};
"read", "read-more", "write", "delete"
};
struct cassandra_callback {
unsigned int id;
void *context;
};
struct cassandra_db {
bool debug_queries;
bool latency_aware_routing;
unsigned int protocol_version;
unsigned int num_threads;
unsigned int warn_timeout_msecs;
unsigned int page_size;
unsigned int callback_ids;
char *metrics_path;
/* for synchronous queries: */
char *error;
};
struct cassandra_result {
char *query;
char *error;
void *context;
};
struct cassandra_transaction_context {
int refcount;
void *context;
char *query;
char *error;
};
struct cassandra_sql_arg {
unsigned int column_idx;
char *value_str;
unsigned char *value_binary;
};
struct cassandra_sql_statement {
};
struct cassandra_sql_prepared_statement {
char *query_template;
/* NULL, until the prepare is asynchronously finished */
/* statements waiting for prepare to finish */
/* an error here will cause the prepare to be retried on the next
execution attempt. */
char *error;
bool pending;
};
extern const struct sql_db driver_cassandra_db;
extern const struct sql_result driver_cassandra_result;
static struct {
const char *name;
} cass_consistency_names[] = {
{ CASS_CONSISTENCY_ANY, "any" },
{ CASS_CONSISTENCY_ONE, "one" },
{ CASS_CONSISTENCY_TWO, "two" },
{ CASS_CONSISTENCY_THREE, "three" },
{ CASS_CONSISTENCY_QUORUM, "" },
{ CASS_CONSISTENCY_ALL, "all" },
{ CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
{ CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
{ CASS_CONSISTENCY_SERIAL, "serial" },
{ CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
{ CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
};
static struct {
const char *name;
} cass_log_level_names[] = {
{ CASS_LOG_CRITICAL, "critical" },
{ CASS_LOG_ERROR, "error" },
{ CASS_LOG_WARN, "warn" },
{ CASS_LOG_INFO, "info" },
{ CASS_LOG_DEBUG, "debug" },
{ CASS_LOG_TRACE, "trace" }
};
static void
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
return 0;
}
}
return -1;
}
{
unsigned int i;
for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
return 0;
}
}
return -1;
}
{
/* switch back to original ioloop in case the caller wants to
}
{
}
}
}
/* running a sync query, stop it */
}
}
{
const char *message;
}
void *context)
{
/* this isn't the main thread - communicate with main thread by
writing the callback id to the pipe. note that we must not use
almost any dovecot functions here because most of them are using
data-stack, which isn't thread-safe. especially don't use
i_error() here. */
"cassandra: write(pipe) failed: %s\n",
}
}
{
}
{
/* usually there are only a few callbacks, so don't bother with using
a hash table */
return;
}
}
}
{
if (ret < 0)
i_error("cassandra: read(pipe) failed: %m");
else if (ret == 0)
i_error("cassandra: read(pipe) failed: EOF");
i_error("cassandra: read(pipe) returned wrong amount of data");
else {
/* success */
return;
}
}
static void
void *context)
{
}
{
"Couldn't connect to Cassandra");
return;
}
/* driver_cassandra_sync_init() waiting for connection to
finish */
}
}
{
i_error("pipe() failed: %m");
return -1;
}
return 0;
}
{
}
static const char *
const char *string)
{
unsigned int i;
return string;
for (i = 0; string[i] != '\0'; i++) {
if (string[i] == '\'')
}
}
const char *connect_string)
{
i_fatal("cassandra: Missing value in connect string: %s",
*args);
}
i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
#endif
i_fatal("cassandra: This cassandra version does not support execution_retry_times");
#endif
} else {
}
}
if (!read_fallback_set)
if (!write_fallback_set)
if (!delete_fallback_set)
i_fatal("cassandra: No hosts given in connect string");
i_fatal("cassandra: No dbname given in connect string");
}
static void
{
for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
}
}
{
};
const char *error;
int fd;
i_error("cassandra: Failed to expand metrics_path=%s: %s",
return;
}
if (fd == -1) {
return;
}
i_close_fd(&fd);
}
{
T_BEGIN {
} T_END;
/* binding with column indexes requires v4 */
}
if (db->protocol_version != 0)
if (db->num_threads != 0)
if (db->latency_aware_routing)
if (db->heartbeat_interval_secs != 0)
if (db->idle_timeout_secs != 0)
cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
#endif
}
{
}
struct cassandra_result *result)
{
unsigned int i, count;
for (i = 0; i < count; i++) {
return;
}
}
i_unreached();
}
bool all_pages, long long reply_usecs)
{
unsigned int row_count;
return;
i_fatal("gettimeofday() failed: %m");
if (all_pages) {
} else {
}
} else {
}
}
{
long long reply_usecs;
message about it separate from the per-page messages. */
}
}
{
T_BEGIN {
} T_END;
if (free_result)
}
{
i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
}
}
{
switch (error) {
break;
break;
break;
break;
break;
default:
break;
}
}
{
const char *errmsg;
int msecs;
/* Timeouts bring uncertainty whether the query succeeded or
not. Also _SERVER_UNAVAILABLE could have actually written
enough copies of the data for the query to succeed. */
/* unavailable = cassandra server knows that there aren't
enough nodes available. "All hosts in current policy
attempted and were either unavailable or failed"
no hosts available = The client library couldn't connect to
enough cassanra nodes. Error message is the same as for
"unavailable".
write timeout = cassandra server couldn't reach all the
needed nodes. this may be because it hasn't yet detected
that the servers are down, or because the servers are just
too busy. we'll try the fallback consistency to avoid
unnecessary temporary errors. */
if ((error == CASS_ERROR_SERVER_UNAVAILABLE ||
/* retry with fallback consistency */
return;
}
return;
}
/* non-fallback query finished successfully. if there had been
any fallbacks, reset them. */
}
}
{
#endif
}
{
}
static bool
{
if (failure_count == 0)
return FALSE;
/* double the retries every time. */
for (i = 1; i < failure_count; i++) {
msecs *= 2;
if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
break;
}
}
/* If last primary query sent timestamp + msecs is older than current
time, we need to retry the primary query. Note that this practically
prevents multiple primary queries from being attempted
simultaneously, because the caller updates primary_query_last_sent
immediately when returning.
The only time when multiple primary queries can be running in
parallel is when the earlier query is being slow and hasn't finished
early enough. This could even be a wanted feature, since while the
first query might have to wait for a timeout, Cassandra could have
been fixed in the meantime and the second query finishes
successfully. */
}
{
int ret;
if (ret < 0)
return ret;
}
}
switch (result->query_type) {
break;
/* consistency is already set and we don't want to fallback
at this point anymore. */
break;
break;
break;
i_unreached();
}
else
return 1;
}
{
unsigned int i, count;
for (i = 0; i < count; i++) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
}
}
void *context ATTR_UNUSED)
{
}
static struct cassandra_result *
bool is_prepared,
{
return result;
}
static void
{
(void)driver_cassandra_send_query(result);
}
{
}
{
}
{
}
{
return;
if (IS_CONNECTED(db))
return;
/* wait for connecting to finish */
}
{
return;
}
}
static struct sql_result *
{
case SQL_DB_STATE_CONNECTING:
case SQL_DB_STATE_BUSY:
i_unreached();
return &sql_not_connected_result;
case SQL_DB_STATE_IDLE:
break;
}
}
if (result == &sql_not_connected_result) {
/* we don't end up in cassandra's free function, so sync_result
won't be set to NULL if we don't do it here. */
}
return result;
}
static struct sql_result *
{
return result;
}
static int
{
const unsigned char *output;
void *output_dup;
const char *type;
if (cass_value_is_null(value) != 0) {
*len_r = 0;
return 0;
}
case CASS_VALUE_TYPE_INT: {
}
type = "int32";
break;
}
case CASS_VALUE_TYPE_BIGINT: {
}
type = "int64";
break;
}
default:
type = "bytes";
break;
}
return -1;
}
*str_r = output_dup;
*len_r = output_size;
return 0;
}
{
/* no paging */
return 0;
}
return 0;
/* callers that don't support sql_query_more() will still get a useful
error message. */
return SQL_RESULT_NEXT_MORE;
}
{
const char *str;
unsigned int i;
return -1;
return driver_cassandra_result_next_page(result);
ret = -1;
break;
}
}
return ret;
}
static void
{
(struct cassandra_result *)*_result;
/* Initialize the next page as a new sql_result */
/* Preserve the statement and update its paging state */
old_result->result);
/* The caller did support paging. Clear out the "...not supported by
the caller" error text, so it won't be in the debug log output. */
if (async)
else {
}
}
}
static unsigned int
{
}
static const char *
unsigned int idx ATTR_UNUSED)
{
i_unreached();
}
static int
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *
unsigned int idx)
{
const char *const *strp;
return *strp;
}
static const unsigned char *
unsigned int idx ATTR_UNUSED,
{
const char *const *strp;
return (const void *)*strp;
}
static const char *
const char *field_name ATTR_UNUSED)
{
i_unreached();
}
static const char *const *
{
}
{
return "FIXME";
}
static struct sql_transaction_context *
{
}
static void
{
return;
}
static void
const char *error)
{
} else {
}
}
static void
{
if (sql_result_next_row(result) < 0) {
}
}
static void
{
(struct cassandra_transaction_context *)_ctx;
return;
}
/* just a single query, send it */
else
if (ctx->query_timestamp != 0) {
}
} else {
/* wait for prepare to finish */
} else {
}
}
}
static void
{
/* just a single query, send it */
else
if (sql_result_next_row(result) < 0)
}
static int
const char **error_r)
{
(struct cassandra_transaction_context *)_ctx;
/* nothing should be using this - don't bother implementing */
i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
}
}
static void
{
(struct cassandra_transaction_context *)_ctx;
}
static void
unsigned int *affected_rows)
{
(struct cassandra_transaction_context *)_ctx;
return;
}
}
static const char *
{
}
static CassError
{
/* statements require exactly correct value type */
switch (value_type) {
case CASS_VALUE_TYPE_INT:
case CASS_VALUE_TYPE_BIGINT:
case CASS_VALUE_TYPE_TINY_INT:
default:
}
}
const struct cassandra_sql_arg *arg)
{
} else {
arg->value_int64);
}
i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
}
}
{
}
return;
}
}
}
}
static void
{
}
{
const char *errmsg;
} else {
}
}
{
i_unreached();
}
return;
}
/* clear the current error in case we're retrying */
}
{
}
}
static struct sql_prepared_statement *
const char *query_template)
{
}
static void
{
(struct cassandra_sql_prepared_statement *)_prep_stmt;
}
static struct sql_statement *
const char *query_template ATTR_UNUSED)
{
}
static struct sql_statement *
{
(struct cassandra_sql_prepared_statement *)_prep_stmt;
/* statement is already prepared. we can use it immediately. */
} else {
/* need to wait until prepare is finished */
}
}
static void
{
(struct cassandra_sql_statement *)_stmt;
}
static void
{
(struct cassandra_sql_statement *)_stmt;
}
static struct cassandra_sql_arg *
unsigned int column_idx)
{
return arg;
}
static void
unsigned int column_idx,
const char *value)
{
(struct cassandra_sql_statement *)_stmt;
}
}
static void
unsigned int column_idx,
{
(struct cassandra_sql_statement *)_stmt;
value, value_size);
}
}
static void
{
(struct cassandra_sql_statement *)_stmt;
}
}
static void
{
(struct cassandra_sql_statement *)_stmt;
/* wait for prepare to finish */
return;
} else {
}
}
}
static struct sql_result *
{
i_panic("cassandra: sql_statement_query_s() not supported");
}
static void
struct sql_statement *_stmt,
unsigned int *affected_rows)
{
(struct cassandra_transaction_context *)_ctx;
(struct cassandra_sql_statement *)_stmt;
return;
}
else {
}
}
.name = "cassandra",
.v = {
}
};
.v = {
}
};
void driver_cassandra_init(void);
void driver_cassandra_deinit(void);
void driver_cassandra_init(void)
{
}
void driver_cassandra_deinit(void)
{
}
#endif