Lines Matching refs:db
20 #define IS_CONNECTED(db) \
21 ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
22 (db)->api.state != SQL_DB_STATE_CONNECTING)
73 struct cassandra_db *db;
233 static void driver_cassandra_prepare_pending(struct cassandra_db *db);
237 static void driver_cassandra_send_queries(struct cassandra_db *db);
266 static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
270 if (db->ioloop != NULL)
271 io_loop_set_current(db->orig_ioloop);
272 sql_db_set_state(&db->api, state);
273 if (db->ioloop != NULL)
274 io_loop_set_current(db->ioloop);
277 static void driver_cassandra_close(struct cassandra_db *db, const char *error)
282 io_remove(&db->io_pipe);
283 if (db->fd_pipe[0] != -1) {
284 i_close_fd(&db->fd_pipe[0]);
285 i_close_fd(&db->fd_pipe[1]);
287 driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
289 array_foreach(&db->pending_prepares, prep_stmtp) {
294 array_clear(&db->pending_prepares);
296 while (array_count(&db->results) > 0) {
297 resultp = array_idx(&db->results, 0);
303 if (db->ioloop != NULL) {
305 io_loop_stop(db->ioloop);
328 if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
343 static void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
349 array_foreach(&db->callbacks, cbp) {
352 array_delete(&db->callbacks,
353 array_foreach_idx(&db->callbacks, cbp), 1);
361 static void driver_cassandra_input(struct cassandra_db *db)
366 ret = read(db->fd_pipe[0], ids, sizeof(ids));
377 for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
378 driver_cassandra_input_id(db, ids[i]);
381 driver_cassandra_close(db, "IPC pipe closed");
385 driver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
392 cb->id = ++db->callback_ids;
396 cb->db = db;
397 array_append(&db->callbacks, &cb, 1);
404 struct cassandra_db *db = context;
410 driver_cassandra_close(db, "Couldn't connect to Cassandra");
413 driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
414 if (db->ioloop != NULL) {
417 io_loop_stop(db->ioloop);
419 driver_cassandra_prepare_pending(db);
420 driver_cassandra_send_queries(db);
425 struct cassandra_db *db = (struct cassandra_db *)_db;
428 i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
430 if (pipe(db->fd_pipe) < 0) {
434 db->io_pipe = io_add(db->fd_pipe[0], IO_READ,
435 driver_cassandra_input, db);
436 driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
438 future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
439 driver_cassandra_set_callback(future, db, connect_callback, db);
445 struct cassandra_db *db = (struct cassandra_db *)_db;
447 driver_cassandra_close(db, "Disconnected");
451 driver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
468 static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
475 db->log_level = CASS_LOG_WARN;
476 db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
477 db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
478 db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
479 db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
480 db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
481 db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
497 if (net_str2port(value, &db->port) < 0)
501 i_free(db->keyspace);
502 db->keyspace = i_strdup(value);
504 i_free(db->user);
505 db->user = i_strdup(value);
507 i_free(db->password);
508 db->password = i_strdup(value);
510 if (consistency_parse(value, &db->read_consistency) < 0)
513 if (consistency_parse(value, &db->read_fallback_consistency) < 0)
517 if (consistency_parse(value, &db->write_consistency) < 0)
520 if (consistency_parse(value, &db->write_fallback_consistency) < 0)
524 if (consistency_parse(value, &db->delete_consistency) < 0)
527 if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
531 if (log_level_parse(value, &db->log_level) < 0)
534 db->debug_queries = TRUE;
536 db->latency_aware_routing = TRUE;
538 if (str_to_uint(value, &db->protocol_version) < 0)
541 if (str_to_uint(value, &db->num_threads) < 0)
544 if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0)
547 if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0)
550 if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0)
553 if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
556 if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0)
559 i_free(db->metrics_path);
560 db->metrics_path = i_strdup(value);
562 if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
568 if (str_to_uint(value, &db->execution_retry_times) < 0)
574 if (str_to_uint(value, &db->page_size) < 0)
582 db->read_fallback_consistency = db->read_consistency;
584 db->write_fallback_consistency = db->write_consistency;
586 db->delete_fallback_consistency = db->delete_consistency;
590 if (db->keyspace == NULL)
592 db->hosts = i_strdup(str_c(hosts));
596 driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
604 cass_session_get_metrics(db->session, &metrics);
638 db->counters[i]);
644 static void driver_cassandra_metrics_write(struct cassandra_db *db)
654 if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
656 db->metrics_path, error);
666 driver_cassandra_get_metrics_json(db, data);
674 struct cassandra_db *db;
676 db = i_new(struct cassandra_db, 1);
677 db->api = driver_cassandra_db;
678 db->fd_pipe[0] = db->fd_pipe[1] = -1;
681 driver_cassandra_parse_connect_string(db, connect_string);
683 cass_log_set_level(db->log_level);
685 if (db->protocol_version > 0 && db->protocol_version < 4) {
687 db->api.v.prepared_statement_init = NULL;
688 db->api.v.prepared_statement_deinit = NULL;
689 db->api.v.statement_init_prepared = NULL;
692 db->timestamp_gen = cass_timestamp_gen_monotonic_new();
693 db->cluster = cass_cluster_new();
694 cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
695 cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
696 cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
697 cass_cluster_set_contact_points(db->cluster, db->hosts);
698 if (db->user != NULL && db->password != NULL)
699 cass_cluster_set_credentials(db->cluster, db->user, db->password);
700 if (db->port != 0)
701 cass_cluster_set_port(db->cluster, db->port);
702 if (db->protocol_version != 0)
703 cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
704 if (db->num_threads != 0)
705 cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
706 if (db->latency_aware_routing)
707 cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
708 if (db->heartbeat_interval_secs != 0)
709 cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
710 if (db->idle_timeout_secs != 0)
711 cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
713 if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
714 cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
716 db->session = cass_session_new();
717 if (db->metrics_path != NULL)
718 db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
719 i_array_init(&db->results, 16);
720 i_array_init(&db->callbacks, 16);
721 i_array_init(&db->pending_prepares, 16);
722 return &db->api;
727 struct cassandra_db *db = (struct cassandra_db *)_db;
729 driver_cassandra_close(db, "Deinitialized");
731 i_assert(array_count(&db->callbacks) == 0);
732 array_free(&db->callbacks);
733 i_assert(array_count(&db->results) == 0);
734 array_free(&db->results);
735 i_assert(array_count(&db->pending_prepares) == 0);
736 array_free(&db->pending_prepares);
738 cass_session_free(db->session);
739 cass_cluster_free(db->cluster);
740 cass_timestamp_gen_free(db->timestamp_gen);
741 timeout_remove(&db->to_metrics);
742 i_free(db->metrics_path);
743 i_free(db->hosts);
744 i_free(db->error);
745 i_free(db->keyspace);
746 i_free(db->user);
747 i_free(db->password);
749 i_free(db);
752 static void driver_cassandra_result_unlink(struct cassandra_db *db,
758 results = array_get(&db->results, &count);
761 array_delete(&db->results, i, 1);
771 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
775 if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
776 reply_usecs/1000 < db->warn_timeout_msecs)
799 if (reply_usecs/1000 >= db->warn_timeout_msecs) {
800 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
809 struct cassandra_db *db = (struct cassandra_db *)_result->db;
816 if (_result == db->sync_result)
817 db->sync_result = NULL;
844 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
849 driver_cassandra_result_unlink(db, result);
859 free_result = db->sync_result != &result->api;
860 if (db->ioloop != NULL)
861 io_loop_stop(db->ioloop);
871 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
873 ioloop_time - db->last_fallback_warning[result->query_type];
880 db->last_fallback_warning[result->query_type] = ioloop_time;
883 db->fallback_failures[result->query_type]++;
889 static void counters_inc_error(struct cassandra_db *db, CassError error)
893 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
896 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
899 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
902 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
905 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
908 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
916 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
928 counters_inc_error(db, error);
965 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
970 db->fallback_failures[result->query_type] = 0;
980 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
987 if (db->page_size > 0)
988 cass_statement_set_paging_size(result->statement, db->page_size);
993 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
998 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
1002 future = cass_session_execute(db->session, result->statement);
1003 driver_cassandra_set_callback(future, db, query_callback, result);
1009 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1010 unsigned int failure_count = db->fallback_failures[result->query_type];
1036 tv = db->primary_query_last_sent[result->query_type];
1043 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1046 if (!SQL_DB_IS_READY(&db->api)) {
1047 if ((ret = sql_connect(&db->api)) <= 0) {
1049 driver_cassandra_close(db, "Couldn't connect to Cassandra");
1060 result->consistency = db->read_consistency;
1061 result->fallback_consistency = db->read_fallback_consistency;
1069 result->consistency = db->write_consistency;
1070 result->fallback_consistency = db->write_fallback_consistency;
1073 result->consistency = db->delete_consistency;
1074 result->fallback_consistency = db->delete_fallback_consistency;
1083 db->primary_query_last_sent[result->query_type] = ioloop_timeval;
1090 static void driver_cassandra_send_queries(struct cassandra_db *db)
1095 results = array_get(&db->results, &count);
1110 driver_cassandra_query_init(struct cassandra_db *db, const char *query,
1119 result->api.db = &db->api;
1126 array_append(&db->results, &result, 1);
1135 struct cassandra_db *db = (struct cassandra_db *)_db;
1138 result = driver_cassandra_query_init(db, query, query_type, FALSE,
1144 static void driver_cassandra_exec(struct sql_db *db, const char *query)
1146 driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
1149 static void driver_cassandra_query(struct sql_db *db, const char *query,
1152 driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
1157 struct cassandra_db *db = context;
1159 db->sync_result = result;
1162 static void driver_cassandra_sync_init(struct cassandra_db *db)
1164 if (sql_connect(&db->api) < 0)
1166 db->orig_ioloop = current_ioloop;
1167 db->ioloop = io_loop_create();
1168 if (IS_CONNECTED(db))
1170 i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
1172 db->io_pipe = io_loop_move_io(&db->io_pipe);
1174 io_loop_run(db->ioloop);
1177 static void driver_cassandra_sync_deinit(struct cassandra_db *db)
1179 if (db->orig_ioloop == NULL)
1181 if (db->io_pipe != NULL) {
1182 io_loop_set_current(db->orig_ioloop);
1183 db->io_pipe = io_loop_move_io(&db->io_pipe);
1184 io_loop_set_current(db->ioloop);
1186 io_loop_destroy(&db->ioloop);
1190 driver_cassandra_sync_query(struct cassandra_db *db, const char *query,
1195 i_assert(db->sync_result == NULL);
1197 switch (db->api.state) {
1208 driver_cassandra_query_full(&db->api, query, query_type,
1209 cassandra_query_s_callback, db);
1210 if (db->sync_result == NULL) {
1211 db->io_pipe = io_loop_move_io(&db->io_pipe);
1212 io_loop_run(db->ioloop);
1215 result = db->sync_result;
1219 db->sync_result = NULL;
1230 struct cassandra_db *db = (struct cassandra_db *)_db;
1233 driver_cassandra_sync_init(db);
1234 result = driver_cassandra_sync_query(db, query,
1236 driver_cassandra_sync_deinit(db);
1303 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1305 if (db->page_size == 0) {
1357 struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
1363 new_result = driver_cassandra_query_init(db, old_result->query,
1390 i_assert(db->api.state == SQL_DB_STATE_IDLE);
1391 driver_cassandra_sync_init(db);
1394 db->io_pipe = io_loop_move_io(&db->io_pipe);
1395 io_loop_run(db->ioloop);
1397 driver_cassandra_sync_deinit(db);
1476 driver_cassandra_transaction_begin(struct sql_db *db)
1481 ctx->ctx.db = db;
1535 struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
1563 cass_result = driver_cassandra_query_init(db, query, query_type,
1574 driver_cassandra_query_init(db, query, query_type, TRUE,
1591 struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
1600 driver_cassandra_sync_init(db);
1601 result = driver_cassandra_sync_query(db, ctx->query, query_type);
1602 driver_cassandra_sync_deinit(db);
1786 struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
1789 if (!SQL_DB_IS_READY(&db->api)) {
1792 array_append(&db->pending_prepares, &prep_stmt, 1);
1794 if (sql_connect(&db->api) < 0)
1803 future = cass_session_prepare(db->session, prep_stmt->query_template);
1804 driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
1807 static void driver_cassandra_prepare_pending(struct cassandra_db *db)
1811 i_assert(SQL_DB_IS_READY(&db->api));
1813 array_foreach(&db->pending_prepares, prep_stmtp) {
1817 array_clear(&db->pending_prepares);
1821 driver_cassandra_prepared_statement_init(struct sql_db *db,
1826 prep_stmt->prep_stmt.db = db;
1849 driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
1982 struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
1986 stmt->result = driver_cassandra_query_init(db, query,