mod_serf.c revision 3c290fd0361d6d9d84d97725eaf299456bddd6cf
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "mod_serf.h"
#include "httpd.h"
#include "http_core.h"
#include "http_config.h"
#include "http_protocol.h"
#include "http_request.h"
#include "http_log.h"
#include "serf.h"
#include "apr_uri.h"
#include "apr_strings.h"
#include "apr_version.h"
#include "ap_mpm.h"
static int mpm_supprts_serf = 0;
typedef struct {
int on;
int preservehost;
typedef struct {
const char *name;
const char *provider;
typedef struct {
/* name -> serf_cluster_t* */
typedef struct {
int rstatus;
int want_ssl;
int done_headers;
int keep_reading;
request_rec *r;
} s_baton_t;
#define apr_time_from_msec(x) (x * 1000)
#endif
/**
* This works right now because all timers are invoked in the single listener
* thread in the Event MPM -- the same thread that serf callbacks are made
* from, so we don't technically need a mutex yet, but with the Simple MPM,
* invocations are made from worker threads, and we need to figure out locking
*/
static void timed_cleanup_callback(void *baton)
{
/* Causes all serf connections to unregister from the event mpm */
}
else {
apr_bucket *e;
/* TODO: return code? bleh */
return;
}
}
void *closed_baton,
{
if (why) {
/* justin says that error handling isn't done yet. hah. */
/* XXXXXX: review */
}
if (mpm_supprts_serf) {
}
ctx->keep_reading = 0;
}
void *setup_baton,
{
serf_bucket_t *c;
}
return c;
}
{
/* XXXXX: List of headers not to copy to serf. serf's serf_bucket_headers_setn,
* doesn't actually overwrite a header if we set it once, so we need to ignore anything
* we might want to toggle or combine.
*/
switch (key[0]) {
case 'a':
case 'A':
return 0;
}
break;
case 'c':
case 'C':
return 0;
}
break;
case 'h':
case 'H':
return 0;
}
break;
case 'k':
case 'K':
return 0;
}
break;
case 't':
case 'T':
return 0;
}
return 0;
}
break;
case 'u':
case 'U':
return 0;
}
break;
default:
break;
}
return 0;
}
{
int done = 0;
/* XXXXX: Special Treatment required for MANY other headers. fixme.*/
switch (key[0]) {
case 'c':
case 'C':
done = 1;
break;
}
done = 1;
break;
}
done = 1;
break;
}
done = 1;
break;
}
break;
case 't':
case 'T':
done = 1;
break;
}
break;
default:
break;
}
if (!done) {
}
return 0;
}
void *acceptor_baton,
{
serf_bucket_t *c;
/* get the per-request bucket allocator */
/* Create a barrier so the response doesn't eat us! */
return serf_bucket_response_create(c, bkt_alloc);
}
void *vbaton,
{
const char *data;
return APR_EGENERAL;
}
/* XXXXXXX: Create better error message. */
if (rv) {
if (APR_STATUS_IS_EAGAIN(rv)) {
return APR_SUCCESS;
}
if (mpm_supprts_serf) {
}
return rv;
}
/**
* XXXXX: If I understood serf buckets better, it might be possible to not
* copy all of the data here, and better stream it to the client.
**/
do {
if (SERF_BUCKET_READ_ERROR(rv)) {
return rv;
}
if (!ctx->done_headers) {
/* TODO: improve */
}
if (len > 0) {
/* TODO: make APR bucket <-> serf bucket stuff more magical. */
}
if (APR_STATUS_IS_EOF(rv)) {
ctx->keep_reading = 0;
if (mpm_supprts_serf) {
}
return APR_EOF;
}
/* XXXX: Should we send a flush now? */
if (APR_STATUS_IS_EAGAIN(rv)) {
return APR_SUCCESS;
}
} while (1);
}
void *vbaton,
void **acceptor_baton,
void **handler_baton,
{
}
else {
}
}
else {
}
}
*acceptor_baton = ctx;
*handler_baton = ctx;
return APR_SUCCESS;
}
/*
* Finding a random number in a range.
* n' = a + n(b-a+1)/(M+1)
* where:
* n' = random number in range
* a = low end of range
* b = high end of range
* n = random number of 0..M
* M = maxint
* Algorithm 'borrowed' from PHP's rand() function. (See mod_lbmethod_heartbeat.c).
*/
{
if (rv) {
return rv;
}
return APR_SUCCESS;
}
/* TOOD: rewrite drive_serf to make it async */
{
apr_status_t rv = 0;
/* XXXXX: make persistent/per-process or something.*/
&serf_module);
/* Allocate everything out of a subpool, with a shorter lifetime than
* the main request, so that we can cleanup safely and remove our events
* from the main serf context in the async mpm mode.
*/
int rc;
apr_uint32_t pick = 0;
/* TODO: could this be optimized in post-config to pre-setup the
* pointers to the right cluster inside the conf structure?
*/
if (!cluster) {
return HTTP_INTERNAL_SERVER_ERROR;
}
return HTTP_INTERNAL_SERVER_ERROR;
}
return HTTP_INTERNAL_SERVER_ERROR;
}
r,
&servers);
return HTTP_INTERNAL_SERVER_ERROR;
}
return HTTP_INTERNAL_SERVER_ERROR;
}
/* TOOD: restructure try all servers in the array !! */
pick = 0;
pool);
}
else {
/* XXXXX: cache dns? */
pool);
}
if (rv != APR_SUCCESS) {
return HTTP_INTERNAL_SERVER_ERROR;
}
if (mpm_supprts_serf) {
if (!serfme) {
return HTTP_INTERNAL_SERVER_ERROR;
}
}
else {
}
baton->r = r;
baton->done_headers = 0;
}
else {
}
if (rv) {
return rv;
}
/* TODO: create custom serf bucket, which does async request body reads */
if (ap_should_client_block(r)) {
char buf[AP_IOBUFSIZE];
apr_file_t *fp;
if (rv != APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "mod_serf: Unable to create temp request body buffer file.");
return HTTP_INTERNAL_SERVER_ERROR;
}
do {
if (rv > 0) {
if (rv) {
return HTTP_INTERNAL_SERVER_ERROR;
}
}
} while(rv > 0);
if (rv < 0) {
return HTTP_INTERNAL_SERVER_ERROR;
}
}
pool);
/* XXX: Is it correct that we don't use the returned serf_request_t? */
if (mpm_supprts_serf) {
return SUSPENDED;
}
else {
do {
/* XXXX: Handle timeouts */
if (APR_STATUS_IS_TIMEUP(rv)) {
continue;
}
if (rv != APR_SUCCESS) {
return HTTP_INTERNAL_SERVER_ERROR;
}
} while (baton->keep_reading);
}
}
static int serf_handler(request_rec *r)
{
&serf_module);
return DECLINED;
}
return drive_serf(r, conf);
}
static int is_true(const char *w)
{
strcasecmp(w, "true") == 0)
{
return 1;
}
return 0;
}
{
int i;
if (argc < 1) {
return "SerfPass must have at least a URI.";
}
if (rv != APR_SUCCESS) {
return "mod_serf: Unable to parse SerfPass url.";
}
return "mod_serf: Need scheme part in url.";
}
/* XXXX: These are bugs in apr_uri_parse. Fixme. */
}
}
for (i = 1; i < argc; i++) {
const char *p = argv[i];
const char *x = ap_strchr_c(p, '=');
if (x) {
if (strncmp(p, "preservehost", x-p) == 0) {
}
}
}
return NULL;
}
/* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
{
const char *rv;
int i;
&serf_module);
return err;
}
if (argc < 2) {
return "SerfCluster must have at least a name and provider.";
}
}
for (i = 2; i < argc; i++) {
const char *p = argv[i];
const char *x = ap_strchr_c(p, '=');
if (x && strlen(p) > 1) {
x+1);
}
else {
"");
}
}
"provider a configuration checker",
}
if (rv) {
return rv;
}
return NULL;
}
{
return new;
}
{
return ctx;
}
{
return ctx;
}
static const command_rec serf_cmds[] =
{
"Configure a cluster backend"),
"URL to reverse proxy to"),
{NULL}
};
typedef struct hb_table_baton_t {
apr_pool_t *p;
const char *msg;
{
b->msg = apr_psprintf(b->p,
"SerfCluster Heartbeat Invalid parameter '%s'",
key);
return 1;
}
return 0;
}
static const char* hb_config_check(void *baton,
{
if (apr_is_empty_table(params)) {
return "SerfCluster Heartbeat requires a path to the heartbat information.";
}
if (b.msg) {
return b.msg;
}
return NULL;
}
typedef struct hb_server_t {
const char *ip;
int busy;
int ready;
int seen;
unsigned int port;
} hb_server_t;
static void
{
char *key;
char *value;
char *strtok_state;
while (key) {
if (value) {
value++; /* Skip passed the = */
}
else {
value = "1";
}
}
}
{
apr_file_t *fp;
if (!path) {
return APR_SUCCESS;
}
if (rv) {
return rv;
}
if (rv) {
return rv;
}
{
char *t;
int lineno = 0;
char buf[4096];
const char *ip;
lineno++;
/* comment */
if (buf[0] == '#') {
continue;
}
/* line format: <IP> <query_string>\n */
if (!t) {
continue;
}
t++;
}
}
}
}
/* Server has zero threads active, but lots of them ready,
* it likely just started up, so lets /4 the number ready,
* to prevent us from completely flooding it with all new
* requests.
*/
}
}
}
return APR_SUCCESS;
}
{
return 0;
}
return -1;
}
else {
return 1;
}
}
static int hb_list_servers(void *baton,
request_rec *r,
{
int i;
if (rv) {
"SerfCluster: Heartbeat unable to read '%s'", path);
return HTTP_INTERNAL_SERVER_ERROR;
}
for (i = 0;
i < tmpservers->nelts;
i++)
{
ap_serf_server_t *x;
}
}
*out_servers = servers;
return OK;
}
static const ap_serf_cluster_provider_t builtin_heartbeat =
{
"heartbeat",
NULL,
NULL,
};
{
b->msg = apr_psprintf(b->p,
"SerfCluster Static Invalid parameter '%s'",
key);
return 1;
}
return 0;
}
static const char* static_config_check(void *baton,
{
if (apr_is_empty_table(params)) {
return "SerfCluster Static requires at least a host list.";
}
if (b.msg) {
return b.msg;
}
return "SerfCluster Static requires at least a hosts parameter";
}
return NULL;
}
static int static_list_servers(void *baton,
request_rec *r,
{
char *ip;
char *strtok_state;
while (ip) {
char *host_str;
char *scope_id;
apr_port_t port = 0;
if (!rv) {
}
}
/* TODO: support order=random */
}
*out_servers = servers;
return OK;
}
static const ap_serf_cluster_provider_t builtin_static =
{
"static",
NULL,
NULL,
};
{
if (rv != APR_SUCCESS) {
mpm_supprts_serf = 0;
}
return OK;
}
static void register_hooks(apr_pool_t *p)
{
}
{
NULL,
};