diff -Naur apr_memcache-0.7.0/memcache/apr_memcache.c apr_memcache-0.7.0.oden/memcache/apr_memcache.c --- apr_memcache-0.7.0/memcache/apr_memcache.c 2005-04-14 10:27:36.000000000 +0200 +++ apr_memcache-0.7.0.oden/memcache/apr_memcache.c 2007-07-15 16:35:14.000000000 +0200 @@ -1,8 +1,9 @@ -/* Copyright 2004 Paul Querna - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,27 +15,20 @@ */ #include "apr_memcache.h" +#include "apr_poll.h" #include "apr_version.h" #include <stdlib.h> -#if APR_MAJOR_VERSION < 1 -/* With 1.0 apr_socket_create uses the apr_socket_create_ex prototype.. - * And apr_socket_create_ex is no more. - * So lets remap apr_socket_create to apr_socket_create_ex - */ -#define apr_socket_create apr_socket_create_ex -#endif - #define BUFFER_SIZE 512 struct apr_memcache_conn_t { - char* buffer; - apr_uint32_t blen; + char *buffer; + apr_size_t blen; apr_pool_t *p; apr_socket_t *sock; - apr_bucket_alloc_t* balloc; - apr_bucket_brigade* bb; - apr_bucket_brigade* tb; + apr_bucket_alloc_t *balloc; + apr_bucket_brigade *bb; + apr_bucket_brigade *tb; apr_memcache_server_t *ms; }; @@ -43,6 +37,9 @@ #define MC_EOL "\r\n" #define MC_EOL_LEN (sizeof(MC_EOL)-1) +#define MC_WS " " +#define MC_WS_LEN (sizeof(MC_WS)-1) + #define MC_GET "get " #define MC_GET_LEN (sizeof(MC_GET)-1) @@ -70,6 +67,9 @@ #define MC_STATS "stats" #define MC_STATS_LEN (sizeof(MC_STATS)-1) +#define MC_QUIT "quit" +#define MC_QUIT_LEN (sizeof(MC_QUIT)-1) + /* Strings for Server Replies */ #define MS_STORED "STORED" @@ -99,17 +99,30 @@ #define MS_END "END" #define MS_END_LEN (sizeof(MS_END)-1) +/** Server and Query Structure for a multiple get */ +struct cache_server_query_t { + apr_memcache_server_t* ms; + apr_memcache_conn_t* conn; + struct iovec* query_vec; + unsigned int query_vec_count; +}; -static apr_status_t make_server_dead(apr_memcache_t * mc, apr_memcache_server_t * ms) +#define MULT_GET_TIMEOUT 50000 + +static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms) { +#if APR_HAS_THREADS apr_thread_mutex_lock(ms->lock); +#endif ms->status = APR_MC_SERVER_DEAD; ms->btime = apr_time_now(); +#if APR_HAS_THREADS apr_thread_mutex_unlock(ms->lock); +#endif return APR_SUCCESS; } -static apr_status_t make_server_live(apr_memcache_t * mc, apr_memcache_server_t * ms) +static apr_status_t make_server_live(apr_memcache_t *mc, apr_memcache_server_t *ms) { ms->status = APR_MC_SERVER_LIVE; return APR_SUCCESS; @@ -135,8 +148,20 @@ APR_DECLARE(apr_memcache_server_t *) apr_memcache_find_server_hash(apr_memcache_t *mc, const apr_uint32_t hash) { + if (mc->server_func) { + return mc->server_func(mc->server_baton, mc, hash); + } + else { + return apr_memcache_find_server_hash_default(NULL, mc, hash); + } +} + +APR_DECLARE(apr_memcache_server_t *) +apr_memcache_find_server_hash_default(void *baton, apr_memcache_t *mc, + const apr_uint32_t hash) +{ apr_memcache_server_t *ms = NULL; - apr_uint32_t h = hash; + apr_uint32_t h = hash ? hash : 1; apr_uint32_t i = 0; apr_time_t curtime = 0; @@ -153,30 +178,36 @@ if (curtime == 0) { curtime = apr_time_now(); } +#if APR_HAS_THREADS apr_thread_mutex_lock(ms->lock); +#endif /* Try the the dead server, every 5 seconds */ if (curtime - ms->btime > apr_time_from_sec(5)) { if (mc_version_ping(ms) == APR_SUCCESS) { ms->btime = curtime; make_server_live(mc, ms); +#if APR_HAS_THREADS apr_thread_mutex_unlock(ms->lock); +#endif break; } } +#if APR_HAS_THREADS apr_thread_mutex_unlock(ms->lock); +#endif } h++; i++; } while(i < mc->ntotal); - if(i == mc->ntotal) { + if (i == mc->ntotal) { ms = NULL; } return ms; } -APR_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, const char* host, apr_port_t port) +APR_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, const char *host, apr_port_t port) { int i; @@ -191,7 +222,7 @@ return NULL; } -static apr_status_t ms_find_conn(apr_memcache_server_t * ms, apr_memcache_conn_t** conn) +static apr_status_t ms_find_conn(apr_memcache_server_t *ms, apr_memcache_conn_t **conn) { #if APR_HAS_THREADS return apr_reslist_acquire(ms->conns, (void **)conn); @@ -201,7 +232,7 @@ #endif } -static apr_status_t ms_bad_conn(apr_memcache_server_t * ms, apr_memcache_conn_t* conn) +static apr_status_t ms_bad_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) { #if APR_HAS_THREADS return apr_reslist_invalidate(ms->conns, conn); @@ -210,7 +241,7 @@ #endif } -static apr_status_t ms_release_conn(apr_memcache_server_t * ms, apr_memcache_conn_t* conn) +static apr_status_t ms_release_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) { #if APR_HAS_THREADS return apr_reslist_release(ms->conns, conn); @@ -219,7 +250,7 @@ #endif } -APR_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t * mc, apr_memcache_server_t * ms) +APR_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t *mc, apr_memcache_server_t *ms) { apr_status_t rv = APR_SUCCESS; @@ -231,12 +262,12 @@ return rv; } -APR_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t * mc, apr_memcache_server_t * ms) +APR_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t *mc, apr_memcache_server_t *ms) { return make_server_dead(mc, ms); } -static apr_status_t conn_connect(apr_memcache_conn_t* conn) +static apr_status_t conn_connect(apr_memcache_conn_t *conn) { apr_status_t rv = APR_SUCCESS; apr_sockaddr_t *sa; @@ -246,12 +277,17 @@ return rv; } + rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_socket_connect(conn->sock, sa); if (rv != APR_SUCCESS) { return rv; } - rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC); + rv = apr_socket_timeout_set(conn->sock, -1); if (rv != APR_SUCCESS) { return rv; } @@ -265,8 +301,8 @@ { apr_status_t rv = APR_SUCCESS; apr_memcache_conn_t *conn; - apr_bucket* e; - apr_pool_t* np; + apr_bucket *e; + apr_pool_t *np; apr_memcache_server_t *ms = params; rv = apr_pool_create(&np, pool); @@ -281,6 +317,7 @@ rv = apr_socket_create(&conn->sock, APR_INET, SOCK_STREAM, 0, np); if (rv != APR_SUCCESS) { + apr_pool_destroy(np); return rv; } @@ -295,30 +332,46 @@ APR_BRIGADE_INSERT_TAIL(conn->bb, e); rv = conn_connect(conn); - *conn_ = conn; - + if (rv != APR_SUCCESS) { + apr_pool_destroy(np); + } + else { + *conn_ = conn; + } + return rv; } static apr_status_t -mc_conn_destruct(void *conn_, void *params, apr_pool_t * pool) +mc_conn_destruct(void *conn_, void *params, apr_pool_t *pool) { -/* apr_memcache_conn_t *conn = conn_;*/ - -/* apr_pool_destroy(conn->p); */ + apr_memcache_conn_t *conn = (apr_memcache_conn_t*)conn_; + struct iovec vec[2]; + apr_size_t written; + + /* send a quit message to the memcached server to be nice about it. */ + vec[0].iov_base = MC_QUIT; + vec[0].iov_len = MC_QUIT_LEN; + vec[1].iov_base = MC_EOL; + vec[1].iov_len = MC_EOL_LEN; + + /* Return values not checked, since we just want to make it go away. */ + apr_socket_sendv(conn->sock, vec, 2, &written); + apr_socket_close(conn->sock); + return APR_SUCCESS; } -APR_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t* p, - const char* host, apr_port_t port, +APR_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t *p, + const char *host, apr_port_t port, apr_uint32_t min, apr_uint32_t smax, apr_uint32_t max, apr_uint32_t ttl, - apr_memcache_server_t** ms) + apr_memcache_server_t **ms) { apr_status_t rv = APR_SUCCESS; apr_memcache_server_t *server; - apr_pool_t* np; + apr_pool_t *np; rv = apr_pool_create(&np, p); @@ -367,16 +420,20 @@ mc->nalloc = max_servers; mc->ntotal = 0; mc->live_servers = apr_palloc(p, mc->nalloc * sizeof(struct apr_memcache_server_t *)); + mc->hash_func = NULL; + mc->hash_baton = NULL; + mc->server_func = NULL; + mc->server_baton = NULL; *memcache = mc; return rv; } /* The crc32 functions and data was originally written by Spencer - * Garrett <srg@quick.com> and was cleaned from the PostgreSQL source - * tree via the files contrib/ltree/crc32.[ch]. No license was - * included, therefore it is assumed that this code is public - * domain. */ + * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source + * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at + * src/usr.bin/cksum/crc32.c. + */ static const apr_uint32_t crc32tab[256] = { 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, @@ -445,66 +502,85 @@ 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d, }; -APR_DECLARE(apr_uint32_t) apr_memcache_hash(const char* data, const apr_uint32_t data_len) +APR_DECLARE(apr_uint32_t) apr_memcache_hash_crc32(void *baton, + const char *data, + const apr_size_t data_len) { apr_uint32_t i; apr_uint32_t crc; crc = ~0; - + for (i = 0; i < data_len; i++) crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff]; - - return ((~crc >> 16) & 0x7fff); + + return ~crc; +} + +APR_DECLARE(apr_uint32_t) apr_memcache_hash_default(void *baton, + const char *data, + const apr_size_t data_len) +{ + /* The default Perl Client doesn't actually use just crc32 -- it shifts it again + * like this.... + */ + return ((apr_memcache_hash_crc32(baton, data, data_len) >> 16) & 0x7fff); +} + +APR_DECLARE(apr_uint32_t) apr_memcache_hash(apr_memcache_t *mc, + const char *data, + const apr_size_t data_len) +{ + if (mc->hash_func) { + return mc->hash_func(mc->hash_baton, data, data_len); + } + else { + return apr_memcache_hash_default(NULL, data, data_len); + } } -static apr_status_t get_server_line(apr_memcache_conn_t* conn) +static apr_status_t get_server_line(apr_memcache_conn_t *conn) { - int bsize = BUFFER_SIZE; + apr_size_t bsize = BUFFER_SIZE; apr_status_t rv = APR_SUCCESS; rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ, BUFFER_SIZE); - if(rv != APR_SUCCESS) { + if (rv != APR_SUCCESS) { return rv; } rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize); - if(rv != APR_SUCCESS) { + if (rv != APR_SUCCESS) { return rv; } conn->blen = bsize; conn->buffer[bsize] = '\0'; - apr_brigade_cleanup(conn->tb); - - if(rv != APR_SUCCESS) { - return rv; - } - return rv; + return apr_brigade_cleanup(conn->tb); } static apr_status_t storage_cmd_write(apr_memcache_t *mc, - char* cmd, - const apr_uint32_t cmd_size, - const char* key, + char *cmd, + const apr_size_t cmd_size, + const char *key, char *data, - const apr_uint32_t data_size, + const apr_size_t data_size, apr_uint32_t timeout, apr_uint16_t flags) { apr_uint32_t hash; - apr_memcache_server_t* ms; - apr_memcache_conn_t* conn; + apr_memcache_server_t *ms; + apr_memcache_conn_t *conn; apr_status_t rv; apr_size_t written; struct iovec vec[5]; int klen; - int key_size = strlen(key); + apr_size_t key_size = strlen(key); - hash = apr_memcache_hash(key, key_size); + hash = apr_memcache_hash(mc, key, key_size); ms = apr_memcache_find_server_hash(mc, hash); @@ -526,7 +602,8 @@ vec[1].iov_base = (void*)key; vec[1].iov_len = key_size; - klen = snprintf(conn->buffer, BUFFER_SIZE, " %u %u %u" MC_EOL, flags, timeout, data_size); + klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u %u %" APR_SIZE_T_FMT " " MC_EOL, + flags, timeout, data_size); vec[2].iov_base = conn->buffer; vec[2].iov_len = klen; @@ -570,9 +647,9 @@ APR_DECLARE(apr_status_t) apr_memcache_set(apr_memcache_t *mc, - const char* key, + const char *key, char *data, - const apr_uint32_t data_size, + const apr_size_t data_size, apr_uint32_t timeout, apr_uint16_t flags) { @@ -585,9 +662,9 @@ APR_DECLARE(apr_status_t) apr_memcache_add(apr_memcache_t *mc, - const char* key, + const char *key, char *data, - const apr_uint32_t data_size, + const apr_size_t data_size, apr_uint32_t timeout, apr_uint16_t flags) { @@ -600,9 +677,9 @@ APR_DECLARE(apr_status_t) apr_memcache_replace(apr_memcache_t *mc, - const char* key, + const char *key, char *data, - const apr_uint32_t data_size, + const apr_size_t data_size, apr_uint32_t timeout, apr_uint16_t flags) { @@ -617,20 +694,20 @@ APR_DECLARE(apr_status_t) apr_memcache_getp(apr_memcache_t *mc, apr_pool_t *p, - const char* key, + const char *key, char **baton, apr_size_t *new_length, apr_uint16_t *flags_) { apr_status_t rv; - apr_memcache_server_t* ms; - apr_memcache_conn_t* conn; + apr_memcache_server_t *ms; + apr_memcache_conn_t *conn; apr_uint32_t hash; apr_size_t written; int klen = strlen(key); struct iovec vec[3]; - hash = apr_memcache_hash(key, klen); + hash = apr_memcache_hash(mc, key, klen); ms = apr_memcache_find_server_hash(mc, hash); if (ms == NULL) return APR_NOTFOUND; @@ -672,7 +749,7 @@ char *length; char *start; char *last; - apr_uint32_t len; + apr_size_t len; start = conn->buffer; flags = apr_strtok(conn->buffer," ",&last); @@ -749,18 +826,18 @@ APR_DECLARE(apr_status_t) apr_memcache_delete(apr_memcache_t *mc, - const char* key, + const char *key, apr_uint32_t timeout) { apr_status_t rv; - apr_memcache_server_t* ms; - apr_memcache_conn_t* conn; + apr_memcache_server_t *ms; + apr_memcache_conn_t *conn; apr_uint32_t hash; apr_size_t written; struct iovec vec[3]; int klen = strlen(key); - hash = apr_memcache_hash(key, klen); + hash = apr_memcache_hash(mc, key, klen); ms = apr_memcache_find_server_hash(mc, hash); if (ms == NULL) return APR_NOTFOUND; @@ -779,7 +856,7 @@ vec[1].iov_base = (void*)key; vec[1].iov_len = klen; - klen = snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout); + klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout); vec[2].iov_base = conn->buffer; vec[2].iov_len = klen; @@ -815,21 +892,21 @@ } static apr_status_t num_cmd_write(apr_memcache_t *mc, - char* cmd, + char *cmd, const apr_uint32_t cmd_size, - const char* key, + const char *key, const apr_int32_t inc, apr_uint32_t *new_value) { apr_status_t rv; - apr_memcache_server_t* ms; - apr_memcache_conn_t* conn; + apr_memcache_server_t *ms; + apr_memcache_conn_t *conn; apr_uint32_t hash; apr_size_t written; struct iovec vec[3]; int klen = strlen(key); - hash = apr_memcache_hash(key, klen); + hash = apr_memcache_hash(mc, key, klen); ms = apr_memcache_find_server_hash(mc, hash); if (ms == NULL) return APR_NOTFOUND; @@ -848,7 +925,7 @@ vec[1].iov_base = (void*)key; vec[1].iov_len = klen; - klen = snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc); + klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc); vec[2].iov_base = conn->buffer; vec[2].iov_len = klen; @@ -875,8 +952,9 @@ rv = APR_NOTFOUND; } else { - if(new_value) + if (new_value) { *new_value = atoi(conn->buffer); + } rv = APR_SUCCESS; } @@ -887,7 +965,7 @@ APR_DECLARE(apr_status_t) apr_memcache_incr(apr_memcache_t *mc, - const char* key, + const char *key, apr_int32_t inc, apr_uint32_t *new_value) { @@ -902,7 +980,7 @@ APR_DECLARE(apr_status_t) apr_memcache_decr(apr_memcache_t *mc, - const char* key, + const char *key, apr_int32_t inc, apr_uint32_t *new_value) { @@ -922,7 +1000,7 @@ char **baton) { apr_status_t rv; - apr_memcache_conn_t* conn; + apr_memcache_conn_t *conn; apr_size_t written; struct iovec vec[2]; @@ -971,7 +1049,7 @@ apr_status_t rv; apr_size_t written; struct iovec vec[2]; - apr_memcache_conn_t* conn; + apr_memcache_conn_t *conn; rv = ms_find_conn(ms, &conn); @@ -999,6 +1077,341 @@ } +APR_DECLARE(void) +apr_memcache_add_multget_key(apr_pool_t *data_pool, + const char* key, + apr_hash_t **values) +{ + apr_memcache_value_t* value; + int klen = strlen(key); + + /* create the value hash if need be */ + if (!*values) { + *values = apr_hash_make(data_pool); + } + + /* init key and add it to the value hash */ + value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t)); + + value->status = APR_NOTFOUND; + value->key = apr_pstrdup(data_pool, key); + + apr_hash_set(*values, value->key, klen, value); +} + +static void mget_conn_result(int up, + apr_status_t rv, + apr_memcache_t *mc, + apr_memcache_server_t *ms, + apr_memcache_conn_t *conn, + struct cache_server_query_t *server_query, + apr_hash_t *values, + apr_hash_t *server_queries) +{ + int j; + apr_memcache_value_t* value; + + if (!up) { + ms_bad_conn(ms, conn); + apr_memcache_disable_server(mc, ms); + } + + for (j = 1; j < server_query->query_vec_count ; j+=2) { + if (server_query->query_vec[j].iov_base) { + value = apr_hash_get(values, server_query->query_vec[j].iov_base, + strlen(server_query->query_vec[j].iov_base)); + + if (value->status == APR_NOTFOUND) { + value->status = rv; + } + } + } + + ms_release_conn(ms, conn); + + apr_hash_set(server_queries, &ms, sizeof(ms), NULL); +} + +APR_DECLARE(apr_status_t) +apr_memcache_multgetp(apr_memcache_t *mc, + apr_pool_t *temp_pool, + apr_pool_t *data_pool, + apr_hash_t *values) +{ + apr_status_t rv; + apr_memcache_server_t* ms; + apr_memcache_conn_t* conn; + apr_uint32_t hash; + apr_size_t written; + int klen; + + apr_memcache_value_t* value; + apr_hash_index_t* value_hash_index; + + /* this is a little over aggresive, but beats multiple loops + * to figure out how long each vector needs to be per-server. + */ + unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */ + unsigned int i, j; + unsigned int queries_sent; + apr_int32_t queries_recvd; + + apr_hash_t * server_queries = apr_hash_make(temp_pool); + struct cache_server_query_t* server_query; + apr_hash_index_t * query_hash_index; + + apr_pollset_t* pollset; + const apr_pollfd_t* activefds; + apr_pollfd_t* pollfds; + + + /* build all the queries */ + value_hash_index = apr_hash_first(temp_pool, values); + while (value_hash_index) { + void *v; + apr_hash_this(value_hash_index, NULL, NULL, &v); + value = v; + value_hash_index = apr_hash_next(value_hash_index); + klen = strlen(value->key); + + hash = apr_memcache_hash(mc, value->key, klen); + ms = apr_memcache_find_server_hash(mc, hash); + if (ms == NULL) { + continue; + } + + server_query = apr_hash_get(server_queries, &ms, sizeof(ms)); + + if (!server_query) { + rv = ms_find_conn(ms, &conn); + + if (rv != APR_SUCCESS) { + apr_memcache_disable_server(mc, ms); + value->status = rv; + continue; + } + + server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t)); + + apr_hash_set(server_queries, &ms, sizeof(ms), server_query); + + server_query->ms = ms; + server_query->conn = conn; + server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen); + + /* set up the first key */ + server_query->query_vec[0].iov_base = MC_GET; + server_query->query_vec[0].iov_len = MC_GET_LEN; + + server_query->query_vec[1].iov_base = (void*)(value->key); + server_query->query_vec[1].iov_len = klen; + + server_query->query_vec[2].iov_base = MC_EOL; + server_query->query_vec[2].iov_len = MC_EOL_LEN; + + server_query->query_vec_count = 3; + } + else { + j = server_query->query_vec_count - 1; + + server_query->query_vec[j].iov_base = MC_WS; + server_query->query_vec[j].iov_len = MC_WS_LEN; + j++; + + server_query->query_vec[j].iov_base = (void*)(value->key); + server_query->query_vec[j].iov_len = klen; + j++; + + server_query->query_vec[j].iov_base = MC_EOL; + server_query->query_vec[j].iov_len = MC_EOL_LEN; + j++; + + server_query->query_vec_count = j; + } + } + + /* create polling structures */ + pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t)); + + rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0); + + if (rv != APR_SUCCESS) { + return rv; + } + + /* send all the queries */ + queries_sent = 0; + query_hash_index = apr_hash_first(temp_pool, server_queries); + + while (query_hash_index) { + void *v; + apr_hash_this(query_hash_index, NULL, NULL, &v); + server_query = v; + query_hash_index = apr_hash_next(query_hash_index); + + conn = server_query->conn; + ms = server_query->ms; + + for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) { + rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]), + veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written); + } + + if (rv != APR_SUCCESS) { + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + continue; + } + + pollfds[queries_sent].desc_type = APR_POLL_SOCKET; + pollfds[queries_sent].reqevents = APR_POLLIN; + pollfds[queries_sent].p = temp_pool; + pollfds[queries_sent].desc.s = conn->sock; + pollfds[queries_sent].client_data = (void *)server_query; + apr_pollset_add (pollset, &pollfds[queries_sent]); + + queries_sent++; + } + + while (queries_sent) { + rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds); + + if (rv != APR_SUCCESS) { + /* timeout */ + queries_sent = 0; + continue; + } + for (i = 0; i < queries_recvd; i++) { + server_query = activefds[i].client_data; + conn = server_query->conn; + ms = server_query->ms; + + rv = get_server_line(conn); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) { + char *key; + char *flags; + char *length; + char *start; + char *last; + char *data; + apr_size_t len; + + start = conn->buffer; + key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */ + key = apr_strtok(NULL, " ", &last); + flags = apr_strtok(NULL, " ", &last); + + + length = apr_strtok(NULL, " ", &last); + len = atoi(length); + + value = apr_hash_get(values, key, strlen(key)); + + + if (value) { + if (len > 0) { + apr_bucket_brigade *bbb; + apr_bucket *e; + + /* eat the trailing \r\n */ + rv = apr_brigade_partition(conn->bb, len+2, &e); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + bbb = apr_brigade_split(conn->bb, e); + + rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + rv = apr_brigade_destroy(conn->bb); + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + conn->bb = bbb; + + value->len = len - 2; + data[value->len] = '\0'; + value->data = data; + } + + value->status = rv; + value->flags = atoi(flags); + + /* stay on the server */ + i--; + + } + else { + /* TODO: Server Sent back a key I didn't ask for or my + * hash is corrupt */ + } + } + else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) { + /* this connection is done */ + apr_pollset_remove (pollset, &activefds[i]); + ms_release_conn(ms, conn); + apr_hash_set(server_queries, &ms, sizeof(ms), NULL); + + queries_sent--; + } + else { + /* unknown reply? */ + rv = APR_EGENERAL; + } + + } /* /for */ + } /* /while */ + + query_hash_index = apr_hash_first(temp_pool, server_queries); + while (query_hash_index) { + void *v; + apr_hash_this(query_hash_index, NULL, NULL, &v); + server_query = v; + query_hash_index = apr_hash_next(query_hash_index); + + conn = server_query->conn; + ms = server_query->ms; + + mget_conn_result(TRUE, rv, mc, ms, conn, + server_query, values, server_queries); + continue; + } + + apr_pool_clear(temp_pool); + apr_pollset_destroy(pollset); + return APR_SUCCESS; + +} + + + /** * Define all of the strings for stats */ @@ -1015,6 +1428,9 @@ #define STAT_version MS_STAT " version " #define STAT_version_LEN (sizeof(STAT_version)-1) +#define STAT_pointer_size MS_STAT " pointer_size " +#define STAT_pointer_size_LEN (sizeof(STAT_pointer_size)-1) + #define STAT_rusage_user MS_STAT " rusage_user " #define STAT_rusage_user_LEN (sizeof(STAT_rusage_user)-1) @@ -1051,6 +1467,9 @@ #define STAT_get_misses MS_STAT " get_misses " #define STAT_get_misses_LEN (sizeof(STAT_get_misses)-1) +#define STAT_evictions MS_STAT " evictions " +#define STAT_evictions_LEN (sizeof(STAT_evictions)-1) + #define STAT_bytes_read MS_STAT " bytes_read " #define STAT_bytes_read_LEN (sizeof(STAT_bytes_read)-1) @@ -1060,41 +1479,54 @@ #define STAT_limit_maxbytes MS_STAT " limit_maxbytes " #define STAT_limit_maxbytes_LEN (sizeof(STAT_limit_maxbytes)-1) +#define STAT_threads MS_STAT " threads " +#define STAT_threads_LEN (sizeof(STAT_threads)-1) -static const char* stat_read_string(apr_pool_t *p, char* buf, int len) +static const char *stat_read_string(apr_pool_t *p, char *buf, int len) { /* remove trailing \r\n and null char */ return apr_pstrmemdup(p, buf, len-2); } -static apr_uint32_t stat_read_uint32(apr_pool_t *p, char* buf, int len) +static apr_uint32_t stat_read_uint32(apr_pool_t *p, char *buf, int len) { buf[len-2] = '\0'; return atoi(buf); } -static apr_uint64_t stat_read_uint64(apr_pool_t *p, char* buf, int len) +static apr_uint64_t stat_read_uint64(apr_pool_t *p, char *buf, int len) { buf[len-2] = '\0'; return apr_atoi64(buf); } -static apr_time_t stat_read_time(apr_pool_t *p, char* buf, int len) +static apr_time_t stat_read_time(apr_pool_t *p, char *buf, int len) { buf[len-2] = '\0'; return apr_time_from_sec(atoi(buf)); } -static apr_time_t stat_read_rtime(apr_pool_t *p, char* buf, int len) +static apr_time_t stat_read_rtime(apr_pool_t *p, char *buf, int len) { - char* tok; - char* secs; - char* usecs; + char *tok; + char *secs; + char *usecs; + const char *sep = ":"; buf[len-2] = '\0'; - secs = apr_strtok(buf,":", &tok); - usecs = apr_strtok(NULL,":", &tok); - return apr_time_make(atoi(secs), atoi(usecs)); + + secs = apr_strtok(buf, sep, &tok); + if (secs == NULL) { + sep = "."; + secs = apr_strtok(buf, sep, &tok); + } + usecs = apr_strtok(NULL, sep, &tok); + if (secs && usecs) { + return apr_time_make(atoi(secs), atoi(usecs)); + } + else { + return apr_time_make(0, 0); + } } /** @@ -1132,13 +1564,14 @@ stats-> name = mc_stat_ ## type ((STAT_ ## name ## _LEN)); \ } -static void update_stats(apr_pool_t *p, apr_memcache_conn_t* conn, +static void update_stats(apr_pool_t *p, apr_memcache_conn_t *conn, apr_memcache_stats_t *stats) { mc_do_stat(version, str) else mc_do_stat(pid, uint32) else mc_do_stat(uptime, uint32) + else mc_do_stat(pointer_size, uint32) else mc_do_stat(time, time) else mc_do_stat(rusage_user, rtime) else mc_do_stat(rusage_system, rtime) @@ -1152,9 +1585,11 @@ else mc_do_stat(cmd_set, uint32) else mc_do_stat(get_hits, uint32) else mc_do_stat(get_misses, uint32) + else mc_do_stat(evictions, uint64) else mc_do_stat(bytes_read, uint64) else mc_do_stat(bytes_written, uint64) else mc_do_stat(limit_maxbytes, uint32) + else mc_do_stat(threads, uint32) } APR_DECLARE(apr_status_t) @@ -1164,7 +1599,7 @@ { apr_memcache_stats_t *ret; apr_status_t rv; - apr_memcache_conn_t* conn; + apr_memcache_conn_t *conn; apr_size_t written; struct iovec vec[2]; @@ -1188,7 +1623,7 @@ return rv; } - ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t *)); + ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t)); do { rv = get_server_line(conn); @@ -1214,8 +1649,9 @@ ms_release_conn(ms, conn); - if(stats) + if (stats) { *stats = ret; + } return rv; } diff -Naur apr_memcache-0.7.0/memcache/apr_memcache.h apr_memcache-0.7.0.oden/memcache/apr_memcache.h --- apr_memcache-0.7.0/memcache/apr_memcache.h 2005-04-14 09:47:50.000000000 +0200 +++ apr_memcache-0.7.0.oden/memcache/apr_memcache.h 2007-07-15 16:35:14.000000000 +0200 @@ -1,8 +1,9 @@ -/* Copyright 2004 Paul Querna - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -32,6 +33,7 @@ #include "apr_ring.h" #include "apr_buckets.h" #include "apr_reslist.h" +#include "apr_hash.h" #ifdef __cplusplus extern "C" { @@ -66,19 +68,55 @@ apr_memcache_conn_t *conn; #endif apr_pool_t *p; /** Pool to use for private allocations */ +#if APR_HAS_THREADS apr_thread_mutex_t *lock; +#endif apr_time_t btime; }; +/* Custom hash callback function prototype, user for server selection. +* @param baton user selected baton +* @param data data to hash +* @param data_len length of data +*/ +typedef apr_uint32_t (*apr_memcache_hash_func)(void *baton, + const char *data, + apr_size_t data_len); + +typedef struct apr_memcache_t apr_memcache_t; + +/* Custom Server Select callback function prototype. +* @param baton user selected baton +* @param mc memcache instance, use mc->live_servers to select a node +* @param hash hash of the selected key. +*/ +typedef apr_memcache_server_t* (*apr_memcache_server_func)(void *baton, + apr_memcache_t *mc, + const apr_uint32_t hash); + /** Container for a set of memcached servers */ -typedef struct +struct apr_memcache_t { apr_uint32_t flags; /**< Flags, Not currently used */ apr_uint16_t nalloc; /**< Number of Servers Allocated */ apr_uint16_t ntotal; /**< Number of Servers Added */ apr_memcache_server_t **live_servers; /**< Array of Servers */ apr_pool_t *p; /** Pool to use for allocations */ -} apr_memcache_t; + void *hash_baton; + apr_memcache_hash_func hash_func; + void *server_baton; + apr_memcache_server_func server_func; +}; + +/** Returned Data from a multiple get */ +typedef struct +{ + apr_status_t status; + const char* key; + apr_size_t len; + char *data; + apr_uint16_t flags; +} apr_memcache_value_t; /** * Creates a crc32 hash used to split keys between servers @@ -87,8 +125,23 @@ * @return crc32 hash of data * @remark The crc32 hash is not compatible with old memcached clients. */ -APR_DECLARE(apr_uint32_t) -apr_memcache_hash(const char* data, apr_uint32_t data_len); +APR_DECLARE(apr_uint32_t) apr_memcache_hash(apr_memcache_t *mc, + const char *data, + apr_size_t data_len); + +/** + * Pure CRC32 Hash. Used by some clients. + */ +APR_DECLARE(apr_uint32_t) apr_memcache_hash_crc32(void *baton, + const char *data, + apr_size_t data_len); + +/** + * hash compatible with the standard Perl Client. + */ +APR_DECLARE(apr_uint32_t) apr_memcache_hash_default(void *baton, + const char *data, + apr_size_t data_len); /** * Picks a server based on a hash @@ -97,8 +150,16 @@ * @return server that controls specified hash * @see apr_memcache_hash */ +APR_DECLARE(apr_memcache_server_t *) apr_memcache_find_server_hash(apr_memcache_t *mc, + const apr_uint32_t hash); + +/** + * server selection compatible with the standard Perl Client. + */ APR_DECLARE(apr_memcache_server_t *) -apr_memcache_find_server_hash(apr_memcache_t *mc, const apr_uint32_t hash); +apr_memcache_find_server_hash_default(void *baton, + apr_memcache_t *mc, + const apr_uint32_t hash); /** * Adds a server to a client object @@ -108,9 +169,8 @@ * @warning Changing servers after startup may cause keys to go to * different servers. */ -APR_DECLARE(apr_status_t) -apr_memcache_add_server(apr_memcache_t * mc, - apr_memcache_server_t *server); +APR_DECLARE(apr_status_t) apr_memcache_add_server(apr_memcache_t *mc, + apr_memcache_server_t *server); /** @@ -120,19 +180,17 @@ * @param port Port of the server * @return Server with matching Hostname and Port, or NULL if none was found. */ -APR_DECLARE(apr_memcache_server_t *) -apr_memcache_find_server(apr_memcache_t *mc, - const char *host, - apr_port_t port); +APR_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, + const char *host, + apr_port_t port); /** * Enables a Server for use again * @param mc The memcache client object to use * @param ms Server to Activate */ -APR_DECLARE(apr_status_t) -apr_memcache_enable_server(apr_memcache_t *mc, - apr_memcache_server_t *ms); +APR_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t *mc, + apr_memcache_server_t *ms); /** @@ -140,9 +198,8 @@ * @param mc The memcache client object to use * @param ms Server to Disable */ -APR_DECLARE(apr_status_t) -apr_memcache_disable_server(apr_memcache_t *mc, - apr_memcache_server_t *ms); +APR_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t *mc, + apr_memcache_server_t *ms); /** * Creates a new Server Object @@ -157,15 +214,14 @@ * @see apr_reslist_create * @remark min, smax, and max are only used when APR_HAS_THREADS */ -APR_DECLARE(apr_status_t) -apr_memcache_server_create(apr_pool_t *p, - const char *host, - apr_port_t port, - apr_uint32_t min, - apr_uint32_t smax, - apr_uint32_t max, - apr_uint32_t ttl, - apr_memcache_server_t **ns); +APR_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t *p, + const char *host, + apr_port_t port, + apr_uint32_t min, + apr_uint32_t smax, + apr_uint32_t max, + apr_uint32_t ttl, + apr_memcache_server_t **ns); /** * Creates a new memcached client object * @param p Pool to use @@ -173,11 +229,10 @@ * @param flags Not currently used * @param mc location of the new memcache client object */ -APR_DECLARE(apr_status_t) -apr_memcache_create(apr_pool_t *p, - apr_uint16_t max_servers, - apr_uint32_t flags, - apr_memcache_t **mc); +APR_DECLARE(apr_status_t) apr_memcache_create(apr_pool_t *p, + apr_uint16_t max_servers, + apr_uint32_t flags, + apr_memcache_t **mc); /** * Gets a value from the server, allocating the value out of p @@ -189,13 +244,42 @@ * @param flags any flags set by the client for this key * @return */ -APR_DECLARE(apr_status_t) -apr_memcache_getp(apr_memcache_t *mc, - apr_pool_t *p, - const char* key, - char **baton, - apr_size_t *len, - apr_uint16_t *flags); +APR_DECLARE(apr_status_t) apr_memcache_getp(apr_memcache_t *mc, + apr_pool_t *p, + const char* key, + char **baton, + apr_size_t *len, + apr_uint16_t *flags); + + +/** + * Add a key to a hash for a multiget query + * if the hash (*value) is NULL it will be created + * @param data_pool pool from where the hash and their items are created from + * @param key null terminated string containing the key + * @param values hash of keys and values that this key will be added to + * @return + */ +APR_DECLARE(void) +apr_memcache_add_multget_key(apr_pool_t *data_pool, + const char* key, + apr_hash_t **values); + +/** + * Gets multiple values from the server, allocating the values out of p + * @param mc client to use + * @param temp_pool Pool used for tempoary allocations. May be cleared inside this + * call. + * @param data_pool Pool used to allocate data for the returned values. + * @param values hash of apr_memcache_value_t keyed by strings, contains the + * result of the multiget call. + * @return + */ +APR_DECLARE(apr_status_t) +apr_memcache_multgetp(apr_memcache_t *mc, + apr_pool_t *temp_pool, + apr_pool_t *data_pool, + apr_hash_t *values); /** * Sets a value by key on the server @@ -206,13 +290,12 @@ * @param timeout time in seconds for the data to live on the server * @param flags any flags set by the client for this key */ -APR_DECLARE(apr_status_t) -apr_memcache_set(apr_memcache_t *mc, - const char* key, - char *baton, - const apr_uint32_t data_size, - apr_uint32_t timeout, - apr_uint16_t flags); +APR_DECLARE(apr_status_t) apr_memcache_set(apr_memcache_t *mc, + const char *key, + char *baton, + const apr_size_t data_size, + apr_uint32_t timeout, + apr_uint16_t flags); /** * Adds value by key on the server @@ -225,13 +308,12 @@ * @return APR_SUCCESS if the key was added, APR_EEXIST if the key * already exists on the server. */ -APR_DECLARE(apr_status_t) -apr_memcache_add(apr_memcache_t *mc, - const char* key, - char *baton, - const apr_uint32_t data_size, - apr_uint32_t timeout, - apr_uint16_t flags); +APR_DECLARE(apr_status_t) apr_memcache_add(apr_memcache_t *mc, + const char *key, + char *baton, + const apr_size_t data_size, + apr_uint32_t timeout, + apr_uint16_t flags); /** * Replaces value by key on the server @@ -244,23 +326,21 @@ * @return APR_SUCCESS if the key was added, APR_EEXIST if the key * did not exist on the server. */ -APR_DECLARE(apr_status_t) -apr_memcache_replace(apr_memcache_t *mc, - const char* key, - char *data, - const apr_uint32_t data_size, - apr_uint32_t timeout, - apr_uint16_t flags); +APR_DECLARE(apr_status_t) apr_memcache_replace(apr_memcache_t *mc, + const char *key, + char *data, + const apr_size_t data_size, + apr_uint32_t timeout, + apr_uint16_t flags); /** * Deletes a key from a server * @param mc client to use * @param key null terminated string containing the key * @param timeout time for the delete to stop other clients from adding */ -APR_DECLARE(apr_status_t) -apr_memcache_delete(apr_memcache_t *mc, - const char* key, - apr_uint32_t timeout); +APR_DECLARE(apr_status_t) apr_memcache_delete(apr_memcache_t *mc, + const char *key, + apr_uint32_t timeout); /** * Increments a value @@ -269,11 +349,10 @@ * @param n number to increment by * @param nv new value after incrmenting */ -APR_DECLARE(apr_status_t) -apr_memcache_incr(apr_memcache_t *mc, - const char* key, - apr_int32_t n, - apr_uint32_t *nv); +APR_DECLARE(apr_status_t) apr_memcache_incr(apr_memcache_t *mc, + const char *key, + apr_int32_t n, + apr_uint32_t *nv); /** * Decrements a value @@ -282,11 +361,10 @@ * @param n number to decrement by * @param nv new value after decrementing */ -APR_DECLARE(apr_status_t) -apr_memcache_decr(apr_memcache_t *mc, - const char* key, - apr_int32_t n, - apr_uint32_t *new_value); +APR_DECLARE(apr_status_t) apr_memcache_decr(apr_memcache_t *mc, + const char *key, + apr_int32_t n, + apr_uint32_t *new_value); /** * Query a server's version @@ -295,21 +373,22 @@ * @param baton location to store server version string * @param len length of the server version string */ -APR_DECLARE(apr_status_t) -apr_memcache_version(apr_memcache_server_t *ms, - apr_pool_t *p, - char **baton); +APR_DECLARE(apr_status_t) apr_memcache_version(apr_memcache_server_t *ms, + apr_pool_t *p, + char **baton); typedef struct { /** Version string of this server */ - const char* version; + const char *version; /** Process id of this server process */ apr_uint32_t pid; /** Number of seconds this server has been running */ apr_uint32_t uptime; /** current UNIX time according to the server */ apr_time_t time; + /** The size of a pointer on the current machine */ + apr_uint32_t pointer_size; /** Accumulated user time for this process */ apr_time_t rusage_user; /** Accumulated system time for this process */ @@ -334,12 +413,17 @@ apr_uint32_t get_hits; /** Number of items that have been requested and not found */ apr_uint32_t get_misses; + /** Number of items removed from cache because they passed their + expiration time */ + apr_uint64_t evictions; /** Total number of bytes read by this server */ apr_uint64_t bytes_read; /** Total number of bytes sent by this server */ apr_uint64_t bytes_written; /** Number of bytes this server is allowed to use for storage. */ apr_uint32_t limit_maxbytes; + /** Number of threads the server is running (if built with threading) */ + apr_uint32_t threads; } apr_memcache_stats_t; /** @@ -348,10 +432,9 @@ * @param p Pool to allocate answer from * @param stats location of the new statistics structure */ -APR_DECLARE(apr_status_t) -apr_memcache_stats(apr_memcache_server_t *ms, - apr_pool_t *p, - apr_memcache_stats_t **stats); +APR_DECLARE(apr_status_t) apr_memcache_stats(apr_memcache_server_t *ms, + apr_pool_t *p, + apr_memcache_stats_t **stats); /** @} */