mod_serf.c revision ee55ac2cd2fd8c48de97c754758004a7a0176336
c6d33447e28403a90ad817dba4df75fae785be28pquerna/* Licensed to the Apache Software Foundation (ASF) under one or more
c6d33447e28403a90ad817dba4df75fae785be28pquerna * contributor license agreements. See the NOTICE file distributed with
c6d33447e28403a90ad817dba4df75fae785be28pquerna * this work for additional information regarding copyright ownership.
c6d33447e28403a90ad817dba4df75fae785be28pquerna * The ASF licenses this file to You under the Apache License, Version 2.0
c6d33447e28403a90ad817dba4df75fae785be28pquerna * (the "License"); you may not use this file except in compliance with
c6d33447e28403a90ad817dba4df75fae785be28pquerna * the License. You may obtain a copy of the License at
c6d33447e28403a90ad817dba4df75fae785be28pquerna * Unless required by applicable law or agreed to in writing, software
c6d33447e28403a90ad817dba4df75fae785be28pquerna * distributed under the License is distributed on an "AS IS" BASIS,
c6d33447e28403a90ad817dba4df75fae785be28pquerna * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
c6d33447e28403a90ad817dba4df75fae785be28pquerna * See the License for the specific language governing permissions and
c6d33447e28403a90ad817dba4df75fae785be28pquerna * limitations under the License.
82d8a5c340e2d50ebadc542a6422bacf3c244432pquernastatic int mpm_supprts_serf = 0;
c6d33447e28403a90ad817dba4df75fae785be28pquernatypedef struct {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernatypedef struct {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *name;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *provider;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernatypedef struct {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* name -> serf_cluster_t* */
c6d33447e28403a90ad817dba4df75fae785be28pquernatypedef struct {
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * This works right now because all timers are invoked in the single listener
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * thread in the Event MPM -- the same thread that serf callbacks are made
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * from, so we don't technically need a mutex yet, but with the Simple MPM,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * invocations are made from worker threads, and we need to figure out locking
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: timed_cleanup_callback");
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna /* Causes all serf connections to unregister from the event mpm */
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
c6d33447e28403a90ad817dba4df75fae785be28pquernastatic void closed_connection(serf_connection_t *conn,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: closed_connection");
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* justin says that error handling isn't done yet. hah. */
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXXX: review */
c6d33447e28403a90ad817dba4df75fae785be28pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, "Closed Connection Error");
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_mpm_register_timed_callback(apr_time_from_msec(1),
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: conn_setup ");
c6d33447e28403a90ad817dba4df75fae785be28pquerna c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
c6d33447e28403a90ad817dba4df75fae785be28pquerna c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic int copy_headers_in(void *vbaton, const char *key, const char *value)
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXX: List of headers not to copy to serf. serf's serf_bucket_headers_setn,
c6d33447e28403a90ad817dba4df75fae785be28pquerna * doesn't actually overwrite a header if we set it once, so we need to ignore anything
c6d33447e28403a90ad817dba4df75fae785be28pquerna * we might want to toggle or combine.
c6d33447e28403a90ad817dba4df75fae785be28pquerna switch (key[0]) {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic int copy_headers_out(void *vbaton, const char *key, const char *value)
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXX: Special Treatment required for MANY other headers. fixme.*/
c6d33447e28403a90ad817dba4df75fae785be28pquerna switch (key[0]) {
c6d33447e28403a90ad817dba4df75fae785be28pquerna else if (strcasecmp("Content-Encoding", key) == 0) {
c6d33447e28403a90ad817dba4df75fae785be28pquernastatic serf_bucket_t* accept_response(serf_request_t *request,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: accept_response");
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* get the per-request bucket allocator */
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* Create a barrier so the response doesn't eat us! */
c6d33447e28403a90ad817dba4df75fae785be28pquernastatic apr_status_t handle_response(serf_request_t *request,
c6d33447e28403a90ad817dba4df75fae785be28pquerna const char *data;
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: handle_response");
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXXXX: Create better error message. */
c6d33447e28403a90ad817dba4df75fae785be28pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, "serf_bucket_response_status...");
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_mpm_register_timed_callback(apr_time_from_msec(1),
c6d33447e28403a90ad817dba4df75fae785be28pquerna * XXXXX: If I understood serf buckets better, it might be possible to not
c6d33447e28403a90ad817dba4df75fae785be28pquerna * copy all of the data here, and better stream it to the client.
c6d33447e28403a90ad817dba4df75fae785be28pquerna rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
c6d33447e28403a90ad817dba4df75fae785be28pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, "serf_bucket_read(response)");
c6d33447e28403a90ad817dba4df75fae785be28pquerna serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna /* TODO: make APR bucket <-> serf bucket stuff more magical. */
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna e = apr_bucket_immortal_create(data, len, ctx->r->connection->bucket_alloc);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: writing %"APR_SIZE_T_FMT" bytes", len);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_mpm_register_timed_callback(apr_time_from_msec(1),
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXX: Should we send a flush now? */
c6d33447e28403a90ad817dba4df75fae785be28pquerna } while (1);
c6d33447e28403a90ad817dba4df75fae785be28pquernastatic apr_status_t setup_request(serf_request_t *request,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: setup_request");
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXX: handle incoming request bodies */
c6d33447e28403a90ad817dba4df75fae785be28pquerna *req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri, body_bkt,
c6d33447e28403a90ad817dba4df75fae785be28pquerna hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
c6d33447e28403a90ad817dba4df75fae785be28pquerna apr_table_do(copy_headers_in, hdrs_bkt, ctx->r->headers_in, NULL);
fa9496078a83e18311b90b33574fdeb9c115ed7dpquerna serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->conf->url.hostname);
c6d33447e28403a90ad817dba4df75fae785be28pquerna serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
c6d33447e28403a90ad817dba4df75fae785be28pquerna *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
c6d33447e28403a90ad817dba4df75fae785be28pquerna ctx->ssl_ctx = serf_bucket_ssl_encrypt_context_get(*req_bkt);
c6d33447e28403a90ad817dba4df75fae785be28pquerna *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, ctx->ssl_ctx,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna/* TOOD: rewrite drive_serf to make it async */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic int drive_serf(request_rec *r, serf_config_t *conf)
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXXX: make persistent/per-process or something.*/
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna (serf_server_config_t *)ap_get_module_config(r->server->module_config,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna /* Allocate everything out of a subpool, with a shorter lifetime than
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * the main request, so that we can cleanup safely and remove our events
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna * from the main serf context in the async mpm mode.
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* TODO: could this be optimized in post-config to pre-setup the
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna * pointers to the right cluster inside the conf structure?
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: unable to find cluster %s", conf->url.hostname);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: unable to find provider %s", cluster->provider);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: %s is missing list servers provider.", cluster->provider);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: %s list servers returned failure", cluster->provider);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna if (servers == NULL || apr_is_empty_array(servers)) {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: %s failed to provide a list of servers", cluster->provider);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* TOOD: restructure try all servers in the array !! */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna choice = APR_ARRAY_IDX(servers, 0, ap_serf_server_t *);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* XXXXX: cache dns? */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna rv = apr_sockaddr_info_get(&address, conf->url.hostname,
c6d33447e28403a90ad817dba4df75fae785be28pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "Unable to resolve: %s", conf->url.hostname);
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna serfme = ap_lookup_provider("mpm_serf", "instance", "0");
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "mpm lied to us about supporting serf.");
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
c6d33447e28403a90ad817dba4df75fae785be28pquerna srequest = serf_connection_request_create(conn, setup_request,
ee55ac2cd2fd8c48de97c754758004a7a0176336pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, "handing off serf request to mpm");
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna rv = serf_context_run(serfme, SERF_DURATION_FOREVER, pool);
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna /* XXXX: Handle timeouts */
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "serf_context_run()");
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna serf_config_t *conf = ap_get_module_config(r->per_dir_config,
fa9496078a83e18311b90b33574fdeb9c115ed7dpquernastatic int is_true(const char *w)
c6d33447e28403a90ad817dba4df75fae785be28pquernastatic const char *add_pass(cmd_parms *cmd, void *vconf,
fa9496078a83e18311b90b33574fdeb9c115ed7dpquerna return "SerfPass must have at least a URI.";
c6d33447e28403a90ad817dba4df75fae785be28pquerna return "mod_serf: Unable to parse SerfPass url.";
c6d33447e28403a90ad817dba4df75fae785be28pquerna /* XXXX: These are bugs in apr_uri_parse. Fixme. */
c6d33447e28403a90ad817dba4df75fae785be28pquerna conf->url.port = apr_uri_port_of_scheme(conf->url.scheme);
fa9496078a83e18311b90b33574fdeb9c115ed7dpquerna const char *p = argv[i];
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna/* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic const char *add_cluster(cmd_parms *cmd, void *d,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *rv;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna (serf_server_config_t *)ap_get_module_config(cmd->server->module_config,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna return "SerfCluster must have at least a name and provider.";
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna cluster->provider = apr_pstrdup(cmd->pool, argv[1]);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna return apr_psprintf(cmd->pool, "SerfCluster: unable to find "
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *p = argv[i];
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to "
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "provider a configuration checker",
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna rv = backend->check_config(backend->baton, cmd, cluster->params);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic void *create_dir_config(apr_pool_t *p, char *dummy)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic void *create_server_config(apr_pool_t *p, server_rec *s)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna (serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna serf_server_config_t *base = (serf_server_config_t *) basev;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna serf_server_config_t *overrides = (serf_server_config_t *) overridesv;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "Configure a cluster backend"),
fa9496078a83e18311b90b33574fdeb9c115ed7dpquerna AP_INIT_TAKE_ARGV("SerfPass", add_pass, NULL, OR_INDEXES,
fa9496078a83e18311b90b33574fdeb9c115ed7dpquerna "URL to reverse proxy to"),
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernatypedef struct hb_table_baton_t {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *msg;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic int hb_table_check(void *rec, const char *key, const char *value)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster Heartbeat Invalid parameter '%s'",
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna return "SerfCluster Heartbeat requires a path to the heartbat information.";
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernatypedef struct hb_server_t {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *ip;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernaargstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic apr_status_t read_heartbeats(const char *path,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) {
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna const char *ip;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* comment */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* line format: <IP> <query_string>\n */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna server->seen = atoi(apr_table_get(hbt, "lastseen"));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* Server has zero threads active, but lots of them ready,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna * it likely just started up, so lets /4 the number ready,
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna * to prevent us from completely flooding it with all new
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna * requests.
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic int hb_server_sort(const void *a_, const void *b_)
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "SerfCluster: Heartbeat unable to read '%s'", path);
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *),
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *));
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna for (i = 0;
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna /* TODO: expand multicast format to support ports? */
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquernastatic const ap_serf_cluster_provider_t builtin_heartbeat =
546d45814f6de2976187f3eaad9ba7a4b01c8b77pquerna "heartbeat",
82d8a5c340e2d50ebadc542a6422bacf3c244432pquerna rv = ap_mpm_query(AP_MPMQ_HAS_SERF, &mpm_supprts_serf);