diff options
Diffstat (limited to 'cache/src')
| -rw-r--r-- | cache/src/cache_evbase_client.cpp | 640 | ||||
| -rw-r--r-- | cache/src/object_store_client.cpp | 145 | ||||
| -rw-r--r-- | cache/src/tango_cache_client.cpp | 1251 | ||||
| -rw-r--r-- | cache/src/tango_cache_client_in.h | 255 | ||||
| -rw-r--r-- | cache/src/tango_cache_pending.cpp | 306 | ||||
| -rw-r--r-- | cache/src/tango_cache_redis.cpp | 422 | ||||
| -rw-r--r-- | cache/src/tango_cache_redis.h | 21 | ||||
| -rw-r--r-- | cache/src/tango_cache_tools.cpp | 258 | ||||
| -rw-r--r-- | cache/src/tango_cache_tools.h | 26 | ||||
| -rw-r--r-- | cache/src/tango_cache_transfer.cpp | 986 | ||||
| -rw-r--r-- | cache/src/tango_cache_transfer.h | 31 | ||||
| -rw-r--r-- | cache/src/tango_cache_xml.cpp | 172 | ||||
| -rw-r--r-- | cache/src/tango_cache_xml.h | 12 |
13 files changed, 0 insertions, 4525 deletions
diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp deleted file mode 100644 index e7835ef..0000000 --- a/cache/src/cache_evbase_client.cpp +++ /dev/null @@ -1,640 +0,0 @@ -#include <sys/ioctl.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <net/if.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/prctl.h> -#include <string.h> -#include <pthread.h> - -#include "cache_evbase_client.h" -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" - -enum CACHE_ASYN_CMD -{ - CACHE_ASYN_FETCH=0, - CACHE_ASYN_UPLOAD_ONCE_DATA, - CACHE_ASYN_UPLOAD_ONCE_EVBUF, - CACHE_ASYN_UPLOAD_START, - CACHE_ASYN_UPLOAD_FRAG_DATA, - CACHE_ASYN_UPLOAD_FRAG_EVBUF, - CACHE_ASYN_UPLOAD_END, - CACHE_ASYN_UPLOAD_CANCEL, - CACHE_ASYN_DELETE, - CACHE_ASYN_HEAD, -}; - -struct databuffer -{ - char *data; - size_t size; - struct evbuffer *evbuf; - enum CACHE_ASYN_CMD cmd_type; - enum OBJECT_LOCATION where_to_get; - struct cache_evbase_ctx *ctx_asyn; -}; - -enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn) -{ - return tango_cache_get_last_error(ctx_asyn->ctx); -} -enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance) -{ - return tango_cache_ctx_error(instance->instance); -} - -void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out) -{ - tango_cache_get_statistics(instance->instance, out); -} - -struct tango_cache_result *cache_evbase_read_result(void *promise_result) -{ - return tango_cache_read_result(promise_result); -} - -static int create_notification_pipe(evutil_socket_t sv[2]) -{ - if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1) - { - return -1; - } - if(evutil_make_socket_nonblocking(sv[0])<0 || evutil_make_socket_nonblocking(sv[1])<0) - { - return -1; - } - if(evutil_make_socket_closeonexec(sv[0])<0 || evutil_make_socket_closeonexec(sv[1])<0) - { - return -1; - } - return 0; -} - -static int32_t mesa_tcp_sock_write (int32_t write_fd, void *buf, int32_t bufsize) -{ - int32_t res; - - do{ - res = send(write_fd, buf, bufsize, MSG_NOSIGNAL); - }while (res==-1 &&(errno == EINTR)); - - return res; -} - -static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t len, int32_t s_time_out) -{ - fd_set w_set, e_set; - struct timeval tv; - int32_t res=0, sndlen=0, sendsize=0; - - while(len > sndlen) - { - FD_ZERO (&w_set); - FD_ZERO (&e_set); - FD_SET (socket_fd, &w_set); - FD_SET (socket_fd, &e_set); - if(s_time_out == 0) - { - res = select (socket_fd + 1, NULL, &w_set, &e_set, NULL); - } - else - { - tv.tv_sec = s_time_out; - tv.tv_usec = 0; - res = select (socket_fd + 1, NULL, &w_set, &e_set, &tv); - } - if(res <= 0) - { - printf("log_error: select io res=%d, error: %s\n", res, strerror(errno)); - return -1; - } - - if(FD_ISSET(socket_fd, &e_set)) - { - printf("log_error: select io is in efds, error: %s\n", strerror(errno)); - return -2; - } - - if(FD_ISSET(socket_fd, &w_set)) - { - sendsize = mesa_tcp_sock_write(socket_fd, (char*)content + sndlen, len - sndlen); - if (sendsize < 0) - { - if(errno == EAGAIN) - { - continue; - } - return -1; - } - sndlen += sendsize; - } - } - - return sndlen; -} - -static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn) -{ - free(ctx_asyn); -} - -static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) -{ - struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn; - struct promise *p; - int ret=0; - - switch(buffer->cmd_type) - { - case CACHE_ASYN_FETCH: - p = ctx_asyn->ctx->promise; - if(do_tango_cache_fetch_object(ctx_asyn->ctx, buffer->where_to_get) < 0) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); - } - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_HEAD: - p = ctx_asyn->ctx->promise; - ret = do_tango_cache_head_object(ctx_asyn->ctx, buffer->where_to_get); - if(ret<0) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); - } - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_DELETE: - cache_delete_minio_object(ctx_asyn->ctx, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_UPLOAD_ONCE_DATA: - do_tango_cache_upload_once_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_UPLOAD_ONCE_EVBUF: - do_tango_cache_upload_once_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true); - evbuffer_free(buffer->evbuf); - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_UPLOAD_START: - ctx_asyn->ctx->instance->statistic.put_recv_num += 1; - ctx_asyn->ctx->instance->error_code = CACHE_OK; - break; - - case CACHE_ASYN_UPLOAD_FRAG_DATA: - tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size); - free(buffer->data); - break; - - case CACHE_ASYN_UPLOAD_FRAG_EVBUF: - tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); - evbuffer_free(buffer->evbuf); - break; - - case CACHE_ASYN_UPLOAD_END: - do_tango_cache_update_end(ctx_asyn->ctx, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_UPLOAD_CANCEL: - tango_cache_update_cancel(ctx_asyn->ctx); - cache_asyn_ctx_destroy(ctx_asyn); - break; - default: assert(0);break; - } -} - -static void sockpair_notification_handler(evutil_socket_t fd, short events, void *arg) -{ - ssize_t readlen, needlen; - struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; - struct databuffer *buffer; - - while(1) - { - needlen=sizeof(struct cache_evbase_ctx *); - readlen = recv(fd, &buffer, needlen, 0); - if(readlen == needlen) - { - cache_asyn_ioevent_dispatch(buffer); - free(buffer); - } - else - { - if(errno!=EWOULDBLOCK && errno!=EAGAIN) - { - MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "read pipe error: %s.", strerror(errno)); - assert(0); - return; - } - break; - } - } -} - -static void* thread_listen_sockpair(void *arg) -{ - struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; - struct event listen_event; - - prctl(PR_SET_NAME, "tango_cache"); - - event_assign(&listen_event, instance_asyn->evbase, instance_asyn->notify_readfd, EV_READ|EV_PERSIST, sockpair_notification_handler, instance_asyn); - event_add(&listen_event, NULL); - event_base_dispatch(instance_asyn->evbase); - - printf("Libevent dispath error, should not run here.\n"); - MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); - assert(0); - return NULL; -} - -int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -1; - } - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_END; - - //ENDʱ����δ��ʼ�ֶ��ϴ��������ϴ�֮ǰ����locateһ��λ�� - ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size); - tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); - if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size); - } - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn) -{ - struct databuffer *buffer; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_CANCEL; - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - if(!ctx_asyn->ctx->fail_state) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - } - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - } -} - -int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - if(way == PUT_MEM_FREE) free((void *)data); - return 0;//��ʱ�Ⱥ��Է���ֵ���Իص���ʽ֪ͨ����������û���֪��ʱEND�����⡣ - } - ctx_asyn->object_size += size; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_MEM_COPY) - { - buffer->data = (char *)malloc(size); - memcpy(buffer->data, data, size); - } - else - { - buffer->data = (char*)data; - } - buffer->size = size; - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA; - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - free(buffer->data); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - return 0; - } - ctx_asyn->object_size += evbuffer_get_length(evbuf); - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF; - buffer->evbuf = evbuffer_new(); - evbuffer_add_buffer(buffer->evbuf, evbuf); - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - evbuffer_free(buffer->evbuf); - free(buffer); - return -2; - } - return 0; -} - -struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_put *meta) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; - - if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) - { - maybe_loc = OBJECT_IN_HOS; - } - ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc); - if(ctx == NULL) - { - return NULL; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_START; - - //�¼�֪ͨ��Ϊ������ͳ����Ϣ - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return NULL; - } - return ctx_asyn; -} - -int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - - ctx = tango_cache_update_once_prepare(instance->instance, f, meta, size, path, pathsize); - if(ctx == NULL) - { - if(way == PUT_MEM_FREE) free((void *)data); - return -1; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_MEM_COPY) - { - buffer->data = (char *)malloc(size); - memcpy(buffer->data, data, size); - } - else - { - buffer->data = (char*)data; - } - buffer->size = size; - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - free(buffer->data); - free(buffer); - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -2; - } - return 0; -} - -int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* f, - struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - - ctx = tango_cache_update_once_prepare(instance->instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); - if(ctx == NULL) - { - return -1; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF; - buffer->evbuf = evbuffer_new(); - evbuffer_add_buffer(buffer->evbuf, evbuf); - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - evbuffer_free(buffer->evbuf); - free(buffer); - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -2; - } - return 0; -} - -int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - - if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) - { - where_to_get = OBJECT_IN_HOS; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_GET, f, meta, where_to_get); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_FETCH; - buffer->where_to_get = where_to_get; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_head_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - enum OBJECT_LOCATION location = OBJECT_IN_HOS; - - if(instance->instance->param->object_store_way != CACHE_ALL_HOS) - { - location = OBJECT_IN_REDIS; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_HEAD, f, meta, location); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_HEAD; - buffer->where_to_get = location; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, f, objkey, minio_addr, bucket); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_DELETE; - - //�ο�Unix�����432ҳ���ڶ��߳�д�İ�ȫ������ - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog) -{ - return tango_cache_parameter_new(profile_path, section, runtimelog); -} - -struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog) -{ - evutil_socket_t notification_fd[2]; - struct cache_evbase_instance *instance_asyn; - struct event_base *evbase; - pthread_t thread_tid; - pthread_attr_t attr; - - if(create_notification_pipe(notification_fd)) - { - return NULL; - } - if((evbase = event_base_new()) == NULL) - { - return NULL; - } - - instance_asyn = (struct cache_evbase_instance *)calloc(1, sizeof(struct cache_evbase_instance)); - instance_asyn->evbase = evbase; - instance_asyn->notify_readfd = notification_fd[0]; - instance_asyn->notify_sendfd = notification_fd[1]; - instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog); - if(instance_asyn->instance == NULL) - { - free(instance_asyn); - return NULL; - } - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if(pthread_create(&thread_tid, &attr, thread_listen_sockpair, instance_asyn)) - { - free(instance_asyn); - return NULL; - } - return instance_asyn; -} - -void cache_evbase_global_init(void) -{ - tango_cache_global_init(); -} - diff --git a/cache/src/object_store_client.cpp b/cache/src/object_store_client.cpp deleted file mode 100644 index 58eb3fb..0000000 --- a/cache/src/object_store_client.cpp +++ /dev/null @@ -1,145 +0,0 @@ -#include <sys/ioctl.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <net/if.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/prctl.h> -#include <string.h> -#include <pthread.h> - -#include "object_store_client.h" -#include "tango_cache_tools.h" - -enum CACHE_ERR_CODE object_store_get_last_error(const struct cache_evbase_ctx *ctx) -{ - return cache_evbase_get_last_error(ctx); -} - -void object_store_get_statistics(const struct object_store_instance *instance, struct cache_statistics *out) -{ - struct cache_statistics out_cache; - - memset(out, 0, sizeof(struct cache_statistics)); - for(u_int32_t i=0; i<instance->instance_num; i++) - { - cache_evbase_get_statistics(instance->instances[i], &out_cache); - - out->del_error_num += out_cache.del_error_num; - out->del_recv_num += out_cache.del_recv_num; - out->del_succ_num += out_cache.del_succ_num; - out->get_err_http += out_cache.get_err_http; - out->get_err_redis += out_cache.get_err_redis; - out->get_miss_num += out_cache.get_miss_num; - out->get_recv_num += out_cache.get_recv_num; - out->get_succ_http += out_cache.get_succ_http; - out->get_succ_redis+= out_cache.get_succ_redis; - out->put_err_http += out_cache.put_err_http; - out->put_err_redis += out_cache.put_err_redis; - out->put_recv_num += out_cache.put_recv_num; - out->put_succ_http += out_cache.put_succ_http; - out->put_succ_redis+= out_cache.put_succ_redis; - out->session_http += out_cache.session_http; - out->session_redis += out_cache.session_redis; - out->memory_used += out_cache.memory_used; - out->totaldrop_num += out_cache.totaldrop_num; - } -} - -struct tango_cache_result *object_store_read_result(void *promise_result) -{ - return cache_evbase_read_result(promise_result); -} - -int object_store_update_end(struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize) -{ - return cache_evbase_update_end(ctx, path, pathsize); -} - -void object_store_update_cancel(struct cache_evbase_ctx *ctx) -{ - cache_evbase_update_cancel(ctx); -} - -int object_store_update_frag_data(struct cache_evbase_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) -{ - return cache_evbase_update_frag_data(ctx, way, data, size); -} - -int object_store_update_frag_evbuf(struct cache_evbase_ctx *ctx, struct evbuffer *evbuf) -{ - return cache_evbase_update_frag_evbuf(ctx, evbuf); -} - -struct cache_evbase_ctx *object_store_update_start(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_put *meta) -{ - return cache_evbase_update_start(instance->instances[rand()%instance->instance_num], f, meta); -} - -int object_store_upload_once_data(struct object_store_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - return cache_evbase_upload_once_data(instance->instances[rand()%instance->instance_num], f, way, data, size, meta, path, pathsize); -} - -int object_store_upload_once_evbuf(struct object_store_instance *instance, struct future* f, - struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - return cache_evbase_upload_once_evbuf(instance->instances[rand()%instance->instance_num], f, evbuf, meta, path, pathsize); -} - -int object_store_fetch_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - return cache_evbase_fetch_object(instance->instances[rand()%instance->instance_num], f, meta, where_to_get); -} - -int object_store_head_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta) -{ - return cache_evbase_head_object(instance->instances[rand()%instance->instance_num], f, meta); -} - -int object_store_delete_object(struct object_store_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - return cache_evbase_delete_object(instance->instances[rand()%instance->instance_num], f, objkey, minio_addr, bucket); -} - -struct object_store_instance *object_store_instance_new(const char* profile_path, const char* section, int thread_num, void *runtimelog) -{ - struct object_store_instance *object_instance; - struct tango_cache_parameter *parameter; - - parameter = tango_cache_parameter_new(profile_path, section, runtimelog); - if(parameter == NULL) - { - return NULL; - } - - object_instance = (struct object_store_instance *)calloc(1, sizeof(struct object_store_instance)); - object_instance->instance_num = thread_num; - object_instance->instances = (struct cache_evbase_instance **)calloc(1, sizeof(struct cache_evbase_instance *)*object_instance->instance_num); - - for(u_int32_t i=0; i<object_instance->instance_num; i++) - { - object_instance->instances[i] = cache_evbase_instance_new(parameter, runtimelog); - if(object_instance->instances[i] == NULL) - { - free(parameter); - free(object_instance); - return NULL; - } - } - srandom(time(NULL)); - return object_instance; -} - -void object_store_global_init(void) -{ - cache_evbase_global_init(); -} - diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp deleted file mode 100644 index 07674ad..0000000 --- a/cache/src/tango_cache_client.cpp +++ /dev/null @@ -1,1251 +0,0 @@ -#include <sys/types.h> -#include <sys/stat.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> -#include <openssl/sha.h> -#include <openssl/md5.h> - -#include <MESA/MESA_prof_load.h> - -#include "tango_cache_client_in.h" -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" -#include "tango_cache_xml.h" -#include "tango_cache_redis.h" - -int TANGO_CACHE_VERSION_20181009=0; - -static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size) -{ - MD5_CTX c; - unsigned char md5[17]={0}; - - if(size < 33) - return -1; - - MD5_Init(&c); - MD5_Update(&c, data, len); - MD5_Final(md5, &c); - - Base64_EncodeBlock(md5, 16, result); - return 0; -} - -void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) -{ - SHA256_CTX c; - unsigned char sha256[128]; - u_int32_t length; - - SHA256_Init(&c); - SHA256_Update(&c, data, len); - SHA256_Final(sha256, &c); - - length = (size > 64)?32:(size-1)/2; //Ԥ��һ���ռ� - for(u_int32_t i=0; i<length; i++) - { - sprintf(result + i*2, "%02x", sha256[i]); - } - result[length*2] = '\0'; -} - -static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize) -{ - struct WLB_consumer_t chosen; - - if(wiredLB_lookup(wiredlb, key, keylen, &chosen)) - { - return -1; - } - snprintf(host, hostsize, "%s:%hu", chosen.ip_addr, chosen.data_port); - return 0; -} - -enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx) -{ - return ctx->error_code; -} -enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance) -{ - return instance->error_code; -} - -void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code) -{ - ctx->fail_state = true; - ctx->error_code = error_code; -} - -const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) -{ - switch(ctx->error_code) - { - case CACHE_CACHE_MISS: return "cache not hit"; - case CACHE_TIMEOUT: return "cache not fresh"; - case CACHE_OUTOF_MEMORY: return "outof memory"; - case CACHE_ERR_WIREDLB: return "wiredlb error"; - case CACHE_ERR_SOCKPAIR: return "socketpair error"; - case CACHE_ERR_INTERNAL: return "internal error"; - case CACHE_ERR_REDIS_JSON: return "parse redis json error"; - case CACHE_ERR_REDIS_CONNECT:return "redis is not connected"; - case CACHE_ERR_REDIS_EXEC: return "redis command reply error"; - case CACHE_OUTOF_SESSION: return "two many curl sessions"; - case CACHE_UPDATE_CANCELED: return "update was canceled"; - case CACHE_ERR_EVBUFFER: return "evbuffer read write error"; - default: return ctx->error; - } -} - -void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out) -{ - *out = instance->statistic; -} - -struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result) -{ - return (struct tango_cache_result *)promise_result; -} - -static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic) -{ - switch(ctx->method) - { - case CACHE_REQUEST_PUT: - if(ctx->fail_state) - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->put_err_http += 1; - else - statistic->put_err_redis += 1; - } - else - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->put_succ_http += 1; - else - statistic->put_succ_redis += 1; - } - break; - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - if(ctx->fail_state) - { - if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT) - statistic->get_miss_num += 1; - else if(ctx->locate == OBJECT_IN_HOS) - statistic->get_err_http += 1; - else - statistic->get_err_redis += 1; - } - else - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->get_succ_http += 1; - else - statistic->get_succ_redis += 1; - } - break; - case CACHE_REQUEST_DELETE: - if(ctx->fail_state) - { - statistic->del_error_num += 1; - } - else - { - statistic->del_succ_num += 1; - } - break; - case CACHE_REQUEST_DELETE_MUL: - statistic->del_succ_num += ctx->del.succ_num; - statistic->del_error_num += ctx->del.fail_num; - break; - default:break; - } -} - -void easy_string_destroy(struct easy_string *estr) -{ - if(estr->buff != NULL) - { - free(estr->buff); - estr->buff = NULL; - estr->len = estr->size = 0; - } -} - -void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) -{ - if(estr->size-estr->len < len+1) - { - estr->size += len*4+1; - estr->buff = (char*)realloc(estr->buff, estr->size); - } - - memcpy(estr->buff+estr->len, data, len); - estr->len += len; - estr->buff[estr->len]='\0'; -} - -//callback: �Ƿ���ûص���������ҪΪ���ֱ�ӵ���APIʱʧ�ܣ����ٵ��ûص�������ͨ������ֵ�ж� -void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) -{ - struct multipart_etag_list *etag; - - if(ctx->curl != NULL) - { - curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); - curl_easy_cleanup(ctx->curl); - } - easy_string_destroy(&ctx->response); - - switch(ctx->method) - { - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - easy_string_destroy(&ctx->get.response_tag); - break; - - case CACHE_REQUEST_PUT: - if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); - if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); - if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta); - if(ctx->put.once_request.len > 0) - { - ctx->instance->statistic.memory_used -= ctx->put.once_request.size; - easy_string_destroy(&ctx->put.once_request); - } - if(ctx->put.evbuf!=NULL) - { - ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); - evbuffer_free(ctx->put.evbuf); - } - while(NULL != (etag = TAILQ_FIRST(&ctx->put.etag_head))) - { - TAILQ_REMOVE(&ctx->put.etag_head, etag, node); - free(etag->etag); - free(etag); - }//no break here - case CACHE_REQUEST_DELETE_MUL: - if(ctx->headers != NULL) - { - curl_slist_free_all(ctx->headers); - }//no break here - case CACHE_REQUEST_DELETE: - if(callback && ctx->promise != NULL) - { - if(ctx->fail_state) - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - else - { - promise_success(ctx->promise, NULL); - } - } - break; - default: break; - } - update_statistics(ctx, &ctx->instance->statistic); - free(ctx); -} - -//�ж�session�Ƿ����ƣ�����ȡ��ʼ���ж�where_to_get�Ƿ�ȫ����MINIO�� -bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get) -{ - if(where_to_get == OBJECT_IN_HOS) - { - return (instance->statistic.session_http>=instance->param->maximum_sessions); - } - else - { - return (instance->statistic.session_redis>=instance->param->maximum_sessions); - } -} - - -//�����ϴ�API��ʹ��ctx��evbuffer������������ctx��ȡ���� -enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size) -{ - if(instance->param->fsstatid_trig) - { - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size); - } - if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize) - { - return OBJECT_IN_HOS; - } - else - { - return OBJECT_IN_REDIS; - } -} - -void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) -{ - if(path != NULL) - { - if(ctx->locate == OBJECT_IN_HOS) - { - snprintf(path, pathsize, "http://%s/%s", ctx->hostaddr, ctx->object_key); - } - else - { - snprintf(path, pathsize, "redis://%s/%s", ctx->instance->redisaddr, ctx->object_key); - } - } -} - -int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) -{ - if(!ctx->fail_state) - { - ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size); - tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size); - } - } - return do_tango_cache_update_end(ctx, false); -} - -void tango_cache_update_cancel(struct tango_cache_ctx *ctx) -{ - ctx->put.close_state = true; - if(ctx->curl != NULL) - { - curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); - curl_easy_cleanup(ctx->curl); - ctx->curl = NULL; - } - tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED); - if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else - { - tango_cache_ctx_destroy(ctx); - } -} - -int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size) -{ - if(ctx->fail_state) - { - return 0; //TODO: ��ʱ���Է���ֵ!! - } - if(evbuffer_add(ctx->put.evbuf, data, size)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - return 0; - } - ctx->instance->statistic.memory_used += size; - ctx->put.object_size += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); - } - return 0; -} - -int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) -{ - size_t size; - - if(ctx->fail_state) - { - return 0;//TODO: ��ʱ���Է���ֵ!! - } - - size = evbuffer_get_length(evbuf); - if(way == EVBUFFER_MOVE) - { - if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - return 0; - } - } - else - { - if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - return 0; - } - } - ctx->instance->statistic.memory_used += size; - ctx->put.object_size += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); - } - return 0; -} - -struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc) -{ - struct tango_cache_ctx *ctx; - char buffer[2064], *user_tag=NULL, *user_tag_value=NULL; - time_t expires, now, last_modify; - struct easy_string hdr_estr={NULL, 0, 0}; - - if(sessions_exceeds_limit(instance, maybe_loc) || (u_int64_t)instance->statistic.memory_used>=instance->param->maximum_used_mem) - { - instance->error_code = CACHE_OUTOF_MEMORY; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_PUT; - ctx->locate = maybe_loc; - - if(instance->param->hash_object_key) - { - caculate_sha256(meta->url, strlen(meta->url), buffer, 72); - if(meta->user_log_name) - { - struct timespec start_time; - clock_gettime(CLOCK_REALTIME,&start_time); - snprintf(ctx->object_key, 256, "%s/%lu_%c%c_%c%c_%s", instance->param->bucketname, start_time.tv_nsec, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); - } - //����ԭʼURL - snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url); - ctx->headers = curl_slist_append(ctx->headers, buffer); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); - } - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - if(ctx->headers!=NULL) curl_slist_free_all(ctx->headers); - free(ctx); - return NULL; - } - - //Expires�ֶΣ����ڻ����ڲ��ж������Ƿ�ʱ - now = time(NULL); - expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout; - if(expires_timestamp2hdr_str(now + expires, buffer, 256)) - { - ctx->headers = curl_slist_append(ctx->headers, buffer); - } - ctx->put.object_ttl = expires; - //Last-Modify�ֶΣ�����GETʱ�ж��Ƿ����� - last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified; - if(last_modify == 0) - { - last_modify = get_gmtime_timestamp(now); - } - sprintf(buffer, "x-amz-meta-lm: %lu", last_modify); - ctx->headers = curl_slist_append(ctx->headers, buffer); - //�б���֧�ֵı�ͷ�� - for(int i=0; i<HDR_CONTENT_NUM; i++) - { - if(meta->std_hdr[i] != NULL) - { - ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i])); - easy_string_savedata(&hdr_estr, "\r\n", 2); - } - } - } - if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL) - { - ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n")); - } - } - ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE - //���������ͷ����GETʱ��ԭ������ - if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) - { - user_tag = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //������������ռ䣻18=17+1: ͷ��+�ַ��������� - memcpy(user_tag, "x-amz-meta-user: ", 17); - user_tag_value = user_tag+17; - Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)user_tag_value); - ctx->headers = curl_slist_append(ctx->headers, user_tag); - } - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - ctx->put.object_meta = cJSON_CreateObject(); - if(instance->param->hash_object_key) - { - cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-Url", meta->url); - } - cJSON_AddNumberToObject(ctx->put.object_meta, "Expires", now + expires); - cJSON_AddNumberToObject(ctx->put.object_meta, "X-Amz-Meta-Lm", last_modify); - cJSON_AddStringToObject(ctx->put.object_meta, "Headers", hdr_estr.buff); - if(user_tag_value != NULL) - { - cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-User", user_tag_value); - } - easy_string_destroy(&hdr_estr); - } - if(user_tag != NULL) - { - free(user_tag); - } - - ctx->put.evbuf = evbuffer_new(); - TAILQ_INIT(&ctx->put.etag_head); - return ctx; -} - -struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; - - if(instance->param->object_store_way != CACHE_SMALL_REDIS) - { - maybe_loc = OBJECT_IN_HOS; - } - - ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc); - if(ctx == NULL) - { - return NULL; - } - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->error_code = CACHE_OK; - return ctx; -} - -//һ�����ϴ�ʱ��ֱ�Ӷ�λ�����ϴ���λ�� -struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, - size_t object_size, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION location; - - location = tango_cache_object_locate(instance, object_size); - ctx = tango_cache_update_prepare(instance, f, meta, location); - if(ctx == NULL) - { - return NULL; - } - tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size); - } - return ctx; -} - -int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_update_once_prepare(instance, f, meta, size, path, pathsize); - if(ctx == NULL) - { - if(way == PUT_MEM_FREE) free((void *)data); - return -1; - } - return do_tango_cache_upload_once_data(ctx, way, data, size); -} - -int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f, - enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_update_once_prepare(instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_upload_once_evbuf(ctx, way, evbuf); -} - -struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, - struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct tango_cache_ctx *ctx; - char sha256[72]; - - if(sessions_exceeds_limit(instance, where_to_get)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - promise_allow_many_successes(ctx->promise); //��λص��������ʱ����promise_finish - ctx->method = method; - ctx->get.state = GET_STATE_START; - ctx->get.max_age = meta->get.max_age; - ctx->get.min_fresh = meta->get.min_fresh; - - if(instance->param->hash_object_key) - { - caculate_sha256(meta->url, strlen(meta->url), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); - } - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - free(ctx); - return NULL; - } - return ctx; -} - -int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct tango_cache_ctx *ctx; - - if(instance->param->object_store_way != CACHE_SMALL_REDIS) - { - where_to_get = OBJECT_IN_HOS; - } - - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_fetch_object(ctx, where_to_get); -} - -int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION location; - - //���������Redis����Ԫ��Ϣ�洢��Redis�� - location = (instance->param->object_store_way != CACHE_ALL_HOS)?OBJECT_IN_REDIS:OBJECT_IN_HOS; - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_head_object(ctx, location); -} - -struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct tango_cache_ctx *ctx; - char sha256[72]; - const char *pbucket; - - if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_DELETE; - - pbucket = (bucket==NULL)?instance->param->bucketname:bucket; - if(instance->param->hash_object_key) - { - caculate_sha256(objkey, strlen(objkey), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", pbucket, objkey); - } - if(minio_addr != NULL) - { - snprintf(ctx->hostaddr, 48, "%s", minio_addr); - } - else if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - free(ctx); - return NULL; - } - return ctx; -} - -int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_delete_prepare(instance, f, objkey, minio_addr, bucket); - if(ctx == NULL) - { - return -1; - } - return (cache_delete_minio_object(ctx)==1)?0:-1; -} - -struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num) -{ - struct tango_cache_ctx *ctx; - char md5[48]={0}, content_md5[64]; - - if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_DELETE_MUL; - ctx->del.succ_num = num; - - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += num; - free(ctx); - return NULL; - } - - construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size); - caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48); - sprintf(content_md5, "Content-MD5: %s", md5); - ctx->headers = curl_slist_append(ctx->headers, content_md5); - ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); - ctx->headers = curl_slist_append(ctx->headers, "Expect:"); - return ctx; -} - -//TODO: AccessDenied -int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_multi_delete_prepare(instance, f, objlist, num); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_multi_delete(ctx); -} - -static void check_multi_info(CURLM *multi) -{ - CURLMsg *msg; - int msgs_left; - struct tango_cache_ctx *ctx; - CURL *easy; - CURLcode res; - long res_code; - - while((msg = curl_multi_info_read(multi, &msgs_left))) - { - if(msg->msg != CURLMSG_DONE) - { - continue; - } - - easy = msg->easy_handle; - res = msg->data.result; - curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); - curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code); - curl_multi_remove_handle(multi, easy); - curl_easy_cleanup(easy); - ctx->curl = NULL; - ctx->res_code = 0; - - switch(ctx->method) - { - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - tango_cache_curl_get_done(ctx, res, res_code); - break; - case CACHE_REQUEST_PUT: - tango_cache_curl_put_done(ctx, res, res_code); - break; - case CACHE_REQUEST_DELETE: - tango_cache_curl_del_done(ctx, res, res_code); - break; - case CACHE_REQUEST_DELETE_MUL: - tango_cache_curl_muldel_done(ctx, res, res_code); - break; - default: break; - } - } -} - -/* Called by libevent when we get action on a multi socket */ -static void libevent_socket_event_cb(int fd, short action, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from event_assign - UNUSED CURLMcode rc; - int what, still_running; - - what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0); - - rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - - check_multi_info(instance->multi_hd); - if(still_running<=0 && evtimer_pending(&instance->timer_event, NULL)) - { - evtimer_del(&instance->timer_event); - } -} - -/* Called by libevent when our timeout expires */ -static void libevent_timer_event_cb(int fd, short kind, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - UNUSED CURLMcode rc; - int still_running; - - rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - check_multi_info(instance->multi_hd); -} - -static int curl_socket_function_cb(CURL *curl, curl_socket_t sockfd, int what, void *userp, void *sockp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from multi handle - struct curl_socket_data *sockinfo = (struct curl_socket_data *)sockp; //curl_multi_assign, for socket - int action; - - if(what == CURL_POLL_REMOVE) - { - if(sockinfo != NULL) - { - event_del(&sockinfo->sock_event); - free(sockinfo); - } - } - else - { - if(sockinfo == NULL) - { - sockinfo = (struct curl_socket_data *)calloc(1, sizeof(struct curl_socket_data)); - curl_multi_assign(instance->multi_hd, sockfd, sockinfo); - } - else - { - event_del(&sockinfo->sock_event); - } - - action = (what&CURL_POLL_IN?EV_READ:0)|(what&CURL_POLL_OUT?EV_WRITE:0)|EV_PERSIST; - event_assign(&sockinfo->sock_event, instance->evbase, sockfd, action, libevent_socket_event_cb, instance); - event_add(&sockinfo->sock_event, NULL); - } - - return 0; -} - -static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - struct timeval timeout; - UNUSED CURLMcode rc; - int still_running; - - timeout.tv_sec = timeout_ms/1000; - timeout.tv_usec = (timeout_ms%1000)*1000; - - if(timeout_ms == 0) - { - //timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once. - //To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs. - rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - } - else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer. - { - evtimer_del(&instance->timer_event); - } - else //update the timer to the new value. - { - evtimer_add(&instance->timer_event, &timeout); - } - - return 0; //0-success; -1-error -} - -static void instance_statistic_timer_cb(int fd, short kind, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - struct timeval tv; - struct cache_statistics incr_statistic; - long long *plast_statistic = (long long*)&instance->statistic_last; - long long *pnow_statistic = (long long*)&instance->statistic; - long long *pinc_statistic = (long long*)&incr_statistic; - - for(u_int32_t i=0; i<sizeof(struct cache_statistics)/sizeof(long long); i++) - { - pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i]; - } - instance->statistic_last = instance->statistic; - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis); - tv.tv_sec = instance->param->fsstat_period; - tv.tv_usec = 0; - event_add(&instance->timer_statistic, &tv); -} - -static int _unfold_IP_range(char* ip_range, char***ip_list, int size) -{ - int i=0,count=0, ret=0; - int range_digits[5]; - memset(range_digits,0,sizeof(range_digits)); - ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); - if(ret!=4&&ret!=5) - { - return 0; - } - if(ret==4&&range_digits[4]==0) - { - range_digits[4]=range_digits[3]; - } - for(i=0;i<5;i++) - { - if(range_digits[i]<0||range_digits[i]>255) - { - return 0; - } - } - count=range_digits[4]-range_digits[3]+1; - *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); - for(i=0;i<count;i++) - { - (*ip_list)[size+i]=(char*)malloc(64); - snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i); - } - return count; -} - -static int unfold_IP_range(const char* ip_range, char***ip_list) -{ - char *token=NULL,*sub_token=NULL,*saveptr; - char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1); - int count=0; - strcpy(buffer,ip_range); - for (token = buffer; ; token= NULL) - { - sub_token= strtok_r(token,";", &saveptr); - if (sub_token == NULL) - break; - count+=_unfold_IP_range(sub_token, ip_list,count); - } - free(buffer); - return count; -} - -static int build_redis_cluster_addrs(const char *iplist, const char *ports, char *redisaddrs, size_t size, void *runtimelog) -{ - u_int32_t redis_ip_num; - u_int32_t redis_port_start, redis_port_end; - char **redis_iplist=NULL; - size_t addrlen; - int ret; - - redis_ip_num = unfold_IP_range(iplist, &redis_iplist); - if(redis_ip_num ==0 ) - { - MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_IP_LIST %s failed.", iplist); - return -1; - } - ret = sscanf(ports, "%u-%u", &redis_port_start, &redis_port_end); - if(ret!=1 && ret!=2) - { - MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_PORT_RANGE %s failed.", iplist); - return -2; - } - - memset(redisaddrs, 0, size); - for(u_int32_t i=0; i<redis_ip_num; i++) - { - addrlen = strlen(redisaddrs); - snprintf(redisaddrs+addrlen, size-addrlen, "%s:%u,", redis_iplist[i], redis_port_start); - } - addrlen = strlen(redisaddrs); - redisaddrs[addrlen-1] = '\0'; - return 0; -} - -static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log) -{ - wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER); - if(wparam->wiredlb == NULL) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); - return -1; - } - wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port)); - wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override)); - if(strlen(wparam->wiredlb_datacenter) > 0) - { - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1); - } - if(wparam->wiredlb_override) - { - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1); - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port)); - } - if(wiredLB_init(wparam->wiredlb) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group); - return -1; - } - return 0; -} - -int register_field_stat(struct tango_cache_parameter *param, void *runtime_log) -{ - int value; - const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS", - "PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS", - "DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"}; - - param->fsstat_handle = FS_create_handle(); - FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); - value = 1; - FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value)); - value = 2; - FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); - value = 1; - FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); - FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); - FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); - if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen); - return -1; - } - - for(int i=0; i<=FS_FILED_TOTAL_DROP; i++) - { - param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); - } - for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++) - { - param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]); - } - param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3); - if(param->fsstat_histlen_id < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed."); - return -1; - } - FS_start(param->fsstat_handle); - return 0; -} - -struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log) -{ - u_int32_t intval; - u_int64_t longval; - struct tango_cache_parameter *param; - char redis_cluster_ip[512], redis_ports[256]; - - param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter)); - - //multi curl - MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1); - param->maximum_host_cnns = intval; - MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20); - param->maximum_pipelines = intval; - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 0); - param->transfer_timeout = intval; - - //instance - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", ¶m->maximum_sessions, 100); - MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); - longval = intval; - param->maximum_used_mem = longval * 1024 * 1024; - MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1); - MESA_load_profile_string_def(profile_path, section, "CACHE_TOKEN", param->cache_token, 256, "c21f969b5f03d33d43e04f8f136e7682"); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", ¶m->upload_block_size, 5242880); - if(param->upload_block_size < 5242880) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 604800); - if(intval < 60) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); - return NULL; - } - param->relative_ttl = intval; - - //wiredlb - MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->cache.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->cache.wiredlb_datacenter, 64); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->cache.wiredlb_override, 1); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); - param->cache.wiredlb_ha_port = intval; - MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->cache.wiredlb_group, 64, "MINIO_GROUP"); - MESA_load_profile_uint_def(profile_path, section, "CACHE_LISTEN_PORT", ¶m->cache.port, 9000); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_IP_LIST", param->cache.iplist, 4096) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BROKERS_LIST not found.", profile_path, section); - return NULL; - } - if(wired_load_balancer_init(¶m->cache, runtime_log)) - { - return NULL; - } - - MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_HOS); - if(param->object_store_way!=CACHE_ALL_HOS && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section); - return NULL; - } - if(param->object_store_way != CACHE_ALL_HOS) - { - if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section); - return NULL; - } - if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section); - return NULL; - } - if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log)) - { - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", ¶m->redis_object_maxsize, 10240); - if(param->redis_object_maxsize >= param->upload_block_size) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CACHE_OBJECT_SIZE must be smaller than CACHE_UPLOAD_BLOCK_SIZE.", profile_path, section); - return NULL; - } - } - - //FieldStat LOG - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE"); - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log"); - MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", ¶m->fsstat_period, 10); - MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", ¶m->fsstatid_trig, 0); - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2"); - MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", ¶m->fsstat_dst_port, 8125); - MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256); - if(param->fsstatid_trig && register_field_stat(param, runtime_log)) - { - return NULL; - } - return param; -} - -struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog) -{ - struct tango_cache_instance *instance; - char *redis_sep, *save_ptr=NULL; - struct timeval tv; - time_t now, remain; - - instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance)); - memset(instance, 0, sizeof(struct tango_cache_instance)); - instance->runtime_log = runtimelog; - instance->evbase = evbase; - instance->param = param; - - instance->multi_hd = curl_multi_init(); - curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->maximum_host_cnns); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->maximum_pipelines); - curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); - curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp - curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); - curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); - - if(param->object_store_way != CACHE_ALL_HOS) - { - if(redis_asyn_connect_init(instance)) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s failed.", instance->param->redisaddrs); - free(instance); - return NULL; - } - redis_sep = strtok_r(param->redisaddrs, ",", &save_ptr); - sprintf(instance->redisaddr, "%s", redis_sep); - } - evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); - - if(param->fsstatid_trig) - { - evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance); - now = time(NULL); - remain = instance->param->fsstat_period - (now % instance->param->fsstat_period); - tv.tv_sec = remain; - tv.tv_usec = 0; - evtimer_add(&instance->timer_statistic, &tv); - } - return instance; -} - -void tango_cache_global_init(void) -{ - curl_global_init(CURL_GLOBAL_NOTHING); -} - diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h deleted file mode 100644 index 82345de..0000000 --- a/cache/src/tango_cache_client_in.h +++ /dev/null @@ -1,255 +0,0 @@ -#ifndef __TANGO_CACHE_CLIENT_IN_H__ -#define __TANGO_CACHE_CLIENT_IN_H__ - -#include <curl/curl.h> -#include <sys/queue.h> -#include <pthread.h> - -#include <event2/event.h> -#include <event.h> -#include <hiredis-vip/async.h> -#include <hiredis-vip/hircluster.h> -#include <cjson/cJSON.h> - -#include <MESA/wiredLB.h> -#include <MESA/field_stat2.h> -#include "tango_cache_client.h" - -#define RESPONSE_HDR_EXPIRES 1 -#define RESPONSE_HDR_LAST_MOD 2 -#define RESPONSE_HDR_ALL 3 - -#define CACHE_ALL_HOS 0 //Ԫ��Ϣ�Ͷ�����MINIO -#define CACHE_META_REDIS 1 //Ԫ��Ϣ��REDIS������MINIO -#define CACHE_SMALL_REDIS 2 //Ԫ��Ϣ��С�ļ���REDIS�����ļ���MINIO - -enum FIELD_STAT_FILEDS -{ - FS_FILED_GET_RECV=0, - FS_FILED_GET_S_TOTAL, - FS_FILED_GET_S_HTTP, - FS_FILED_GET_S_REDIS, - FS_FILED_GET_MISS, - FS_FILED_GET_E_TOTAL, - FS_FILED_GET_E_HTTP, - FS_FILED_GET_E_REDIS, - FS_FILED_PUT_RECV, - FS_FILED_PUT_S_TOTAL, - FS_FILED_PUT_S_HTTP, - FS_FILED_PUT_S_REDIS, - FS_FILED_PUT_E_TOTAL, - FS_FILED_PUT_E_HTTP, - FS_FILED_PUT_E_REDIS, - FS_FILED_DEL_RECV, - FS_FILED_DEL_SUCC, - FS_FILED_DEL_ERROR, - FS_FILED_TOTAL_DROP, - - //Next use Status - FS_FILED_MEM_USED, - FS_FILED_SESS_HTTP, - FS_FILED_SESS_REDIS, - - FS_FILED_NUM, -}; - -enum CACHE_REQUEST_METHOD -{ - CACHE_REQUEST_GET=0, - CACHE_REQUEST_PUT, - CACHE_REQUEST_DELETE, - CACHE_REQUEST_DELETE_MUL, - CACHE_REQUEST_HEAD, -}; - -enum GET_OBJECT_STATE -{ - GET_STATE_START=0, - GET_STATE_DELETE, - GET_STATE_REDIS_META, - GET_STATE_REDIS_ALL, - GET_STATE_REDIS_TRY, - GET_STATE_END, -}; - -enum PUT_OBJECT_STATE -{ - PUT_STATE_START=0, - PUT_STATE_WAIT_START, - PUT_STATE_PART, - PUT_STATE_CANCEL, - PUT_STATE_REDIS_META, - PUT_STATE_REDIS_EXPIRE, - PUT_STATE_REDIS_SETEX, //��״̬���ڵȴ�����ִ�н�� - PUT_STATE_END, -}; - -struct easy_string -{ - char* buff; - size_t len; - size_t size; -}; - -struct wiredlb_parameter -{ - char wiredlb_topic[64]; - char wiredlb_datacenter[64]; - char wiredlb_group[64]; - char iplist[4096];//minio�б� - WLB_handle_t wiredlb; - u_int32_t wiredlb_override; - u_int32_t port; - short wiredlb_ha_port; -}; - -struct tango_cache_parameter -{ - char bucketname[256]; - char cache_token[256]; - char redis_key[256]; - long maximum_host_cnns; - long transfer_timeout;//������ʱ������ - long maximum_pipelines; - u_int64_t maximum_used_mem; - u_int32_t maximum_sessions; - u_int32_t upload_block_size; //minio�ֶ��ϴ������С���� - time_t relative_ttl; //����������Ч�� - u_int32_t hash_object_key; - //wiredlb - int object_store_way; //��ȡobject��Ϣ�ķ�ʽ - struct wiredlb_parameter cache; - char redisaddrs[4096]; - u_int32_t redis_object_maxsize;//С�ļ�����redisʱ�����������С - - //FieldStatLog - int32_t fsstat_dst_port; - char fsstat_dst_ip[64]; - char fsstat_appname[16]; - char fsstat_filepath[256]; - u_int32_t fsstat_period; - u_int32_t fsstatid_trig; - char fsstat_histlen[256]; - screen_stat_handle_t fsstat_handle; - int32_t fsstat_histlen_id; - int32_t fsstat_field_ids[FS_FILED_NUM]; -}; - -struct tango_cache_instance -{ - struct event_base* evbase; - struct event timer_event; - struct event timer_statistic; - CURLM *multi_hd; - enum CACHE_ERR_CODE error_code; - - int redis_connecting; - redisClusterAsyncContext *redis_ac; - char redisaddr[128]; - - const struct tango_cache_parameter *param; - void *runtime_log; - struct cache_statistics statistic; - struct cache_statistics statistic_last; //���ڶ��instanceʹ��ͬһ��fieldstat�ۼ� -}; - -struct multipart_etag_list -{ - char *etag; - u_int32_t part_number; - TAILQ_ENTRY(multipart_etag_list) node; -}; - -typedef void (redisRedirectMinioCallback)(struct tango_cache_ctx *ctx); - -struct cache_ctx_data_get -{ - time_t max_age; - time_t min_fresh; - time_t expires; - time_t last_modify; - u_int32_t need_hdrs; - enum GET_OBJECT_STATE state; - struct easy_string response_tag; - struct tango_cache_result result; - redisRedirectMinioCallback *redis_redirect_minio_cb; -}; - -struct cache_ctx_data_put -{ - struct evbuffer *evbuf; - size_t upload_length; - size_t upload_offset; - char *uploadID; - char *combine_xml; - TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head; - cJSON *object_meta; - struct easy_string once_request; //һ����PUTʱ�洢�����ݣ�ʧ�ܵ�ʱ��������������ܸ��������ṹ - enum PUT_OBJECT_STATE state; - u_int32_t part_index; //��RESPONSE_HDR_ - u_int32_t object_ttl; - bool close_state; //���������ùر� - size_t object_size; -}; - -struct cache_ctx_multi_delete -{ - u_int32_t succ_num; - u_int32_t fail_num; -}; - -struct tango_cache_ctx -{ - CURL *curl; - struct curl_slist *headers; - struct promise* promise; - char error[CURL_ERROR_SIZE]; - char object_key[256]; - char hostaddr[48]; - - enum CACHE_REQUEST_METHOD method; - enum CACHE_ERR_CODE error_code; - struct easy_string response; - - bool fail_state; - enum OBJECT_LOCATION locate; //�ɳ��������϶�����λ�� - long res_code; - - union{ - struct cache_ctx_data_put put; - struct cache_ctx_data_get get; - struct cache_ctx_multi_delete del; - }; - struct tango_cache_instance *instance; -}; - -struct curl_socket_data -{ - struct event sock_event; -}; - -void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size); - -void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); -void easy_string_destroy(struct easy_string *estr); - -void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback=true); -void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code); -const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx); - -bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get); - -struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, - struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc); -struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, - enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get); -struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, - struct future* f, const char *objkey, const char *minio_addr, const char *bucket); - -enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size); -void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); -struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, - struct future* f, struct tango_cache_meta_put *meta, size_t object_size, char *path, size_t pathsize); - -#endif - diff --git a/cache/src/tango_cache_pending.cpp b/cache/src/tango_cache_pending.cpp deleted file mode 100644 index 8d803d9..0000000 --- a/cache/src/tango_cache_pending.cpp +++ /dev/null @@ -1,306 +0,0 @@ -#include "tango_cache_pending.h" -#include <tfe_utils.h> -#include <assert.h> -#include <string.h> -#include <stdlib.h> -#include <stdio.h> -#include <stdbool.h> - - -time_t get_time_value(const char* field_value, const char* field_type) -{ - time_t time; - char* time_value = NULL; - field_value += strlen(field_type); - field_value++; - int len = strlen(field_value); - time_value = ALLOC(char, len+1); - int index = 0; - while (field_value[index] != ',' && field_value[index] != '\r' && index < len) - { - time_value[index] = field_value[index]; - index++; - } - time_value[index] = '\0'; - time = (time_t)atol(time_value); - free(time_value); - return time; -} - - -void get_request_freshness(const char *value, struct request_freshness* restrict) -{ - const char* field_value = NULL; - field_value = strstr(value, "min-fresh"); - if (field_value != NULL) - { - restrict->min_fresh = get_time_value(field_value, "min-fresh");; - } - - field_value = strstr(value, "max-age"); - if (field_value != NULL) - { - restrict->max_age = get_time_value(field_value, "max-age");; - } -} - - -enum cache_pending_action request_cache_control(const char* value, struct request_freshness* restrict) -{ - if (strstr(value, "no-cache") != NULL) - { - return REVALIDATE; - } - if (strstr(value, "no-store") != NULL) - { - return FORBIDDEN; - } - get_request_freshness(value, restrict); - return ALLOWED; -} - - -bool cache_verify(const struct tfe_http_half *request) -{ - if( !tfe_http_std_field_read(request,TFE_HTTP_IF_MATCH) || - !tfe_http_std_field_read(request,TFE_HTTP_IF_NONE_MATCH) || - !tfe_http_std_field_read(request,TFE_HTTP_IF_MODIFIED_SINCE) || - !tfe_http_std_field_read(request,TFE_HTTP_IF_UNMODIFIED_SINCE) - ) - { - return true; - } - return false; -} - - -const char* get_head_value(const struct tfe_http_field *http_fields, size_t n_fields, enum tfe_http_std_field head_key) -{ - size_t i = 0; - for (i = 0; i < n_fields; i++) - { - if (http_fields[i].http_field == head_key) - { - return http_fields[i].value; - } - } - return NULL; -} - - -enum cache_pending_action get_pragma_action(const char * value) -{ - const char *pragma_value = "no-cache"; - if (strcasecmp(value, pragma_value) == 0) - { - return REVALIDATE; - } - return UNDEFINED; -} - - -enum cache_pending_action tfe_cache_get_pending(const struct tfe_http_half *request, struct request_freshness* restrict) -{ - enum cache_pending_action res = UNDEFINED; - const char *value = NULL; - memset(restrict,0,sizeof(struct request_freshness)); - if(request->req_spec.method!=TFE_HTTP_METHOD_GET) - { - return FORBIDDEN; - } - if(NULL!=tfe_http_std_field_read(request, TFE_HTTP_CONT_RANGE) || - NULL!=tfe_http_std_field_read(request, TFE_HTTP_AUTHORIZATION)|| - NULL!=tfe_http_nonstd_field_read(request, "WWW-Authenticate")) - { - return FORBIDDEN; - } - value = tfe_http_std_field_read(request, TFE_HTTP_PRAGMA); - if (value != NULL) - { - res = get_pragma_action(value); - } - else - { - value = tfe_http_std_field_read(request, TFE_HTTP_CACHE_CONTROL); - if (value != NULL) - { - res = request_cache_control(value, restrict); - } - else - { - if (cache_verify(request)) - { - res = REVALIDATE; - } - } - } - return res; -} - - - -time_t read_GMT_time(const char* gmt_string) -{ - time_t expire_rel_time; - struct tm expire_gmt_time; - memset(&expire_gmt_time, 0, sizeof(expire_gmt_time)); - strptime(gmt_string, "%a, %d %b %Y %H:%M:%S GMT", &expire_gmt_time); - expire_rel_time = mktime(&expire_gmt_time); - return expire_rel_time; -} - - -bool is_standard_gmt_format(const char* value) -{ - int str_len = strlen(value); - if(0==strcasecmp(value+str_len-3,"GMT")) - { - return true; - } - else - { - return false; - } -} - -time_t get_response_s_maxage(const char* cache_ctl) -{ - const char* s_maxage = NULL; - s_maxage = strstr(cache_ctl, "s-maxage"); - if (s_maxage != NULL) - { - return get_time_value(s_maxage, "s-maxage"); - } - else - { - return 0; - } -} - - -time_t get_response_maxage(const char* cache_ctl) -{ - const char* max_age = NULL; - max_age = strstr(cache_ctl, "max-age"); - if (max_age != NULL) - { - return get_time_value(max_age, "max-age"); - } - else - { - return 0; - } -} - - -void get_response_freshness(const struct tfe_http_half *response, struct response_freshness* freshness) -{ - time_t expire_rel_time = 0; - time_t cur_rel_time = 0; - struct tm cur_gmt_time; - const char* field_value = NULL; - field_value = tfe_http_std_field_read(response, TFE_HTTP_CACHE_CONTROL); - if (field_value != NULL) - { - freshness->timeout = get_response_s_maxage(field_value); - if (freshness->timeout == 0) - { - freshness->timeout = get_response_maxage(field_value); - } - } - else - { - field_value = tfe_http_std_field_read(response, TFE_HTTP_EXPIRES); - if (field_value != NULL && is_standard_gmt_format(field_value)) - { - expire_rel_time = read_GMT_time(field_value); - const time_t cur_ct_time = time(NULL); - if (gmtime_r(&cur_ct_time, &cur_gmt_time) == NULL) - { - assert(0); - } - cur_rel_time = mktime(&cur_gmt_time); - freshness->timeout = expire_rel_time - cur_rel_time; - } - } - field_value = tfe_http_std_field_read(response, TFE_HTTP_DATE); - if (field_value != NULL) - { - if(is_standard_gmt_format(field_value)) - { - freshness->date = read_GMT_time(field_value);; - } - } - field_value = tfe_http_std_field_read(response, TFE_HTTP_LAST_MODIFIED); - if (field_value != NULL && is_standard_gmt_format(field_value)) - { - freshness->last_modified = read_GMT_time(field_value);; - } -} - - -enum cache_pending_action response_cache_control(const char* value) -{ - const char *forbidden_vaule[] = {"no-store", "private"}; - const char *verify_vaule[] = { "no-cache", "must-revalidate","proxy-revalidate" }; - int i = 0; - for (i = 0; i < 2; i++) - { - if (strstr(value, forbidden_vaule[i]) != NULL) - { - return FORBIDDEN; - } - } - for (i = 0; i < 3; i++) - { - if (strstr(value, verify_vaule[i]) != NULL) - { - return REVALIDATE; - } - } - return ALLOWED; -} - - -enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_half *response, struct response_freshness* freshness) -{ - enum cache_pending_action res = UNDEFINED; - const char *value = NULL; - memset(freshness,0,sizeof(struct response_freshness)); - if(response->resp_spec.resp_code!=TFE_HTTP_STATUS_OK - || NULL!=tfe_http_std_field_read(response, TFE_HTTP_CONT_RANGE) //NOT upload response with content-range - || NULL==response->resp_spec.content_length - || NULL!=tfe_http_std_field_read(response, TFE_HTTP_AUTHORIZATION) - || NULL!=tfe_http_nonstd_field_read(response, "WWW-Authenticate") - || NULL!=tfe_http_std_field_read(response, TFE_HTTP_SET_COOKIE)) - { - return FORBIDDEN; - } - - value = tfe_http_std_field_read(response, TFE_HTTP_PRAGMA); - if (value != NULL) - { - res = get_pragma_action(value); - } - else - { - value = tfe_http_std_field_read(response, TFE_HTTP_CACHE_CONTROL); - if (value != NULL) - { - res = response_cache_control(value); - } - else - { - value = tfe_http_std_field_read(response, TFE_HTTP_EXPIRES); - if (value != NULL && 0!= read_GMT_time(value)) - { - res = ALLOWED; - } - } - } - if (res == ALLOWED) - { - get_response_freshness(response, freshness); - } - return res; -} diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp deleted file mode 100644 index 54fd8d0..0000000 --- a/cache/src/tango_cache_redis.cpp +++ /dev/null @@ -1,422 +0,0 @@ -#include <sys/types.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> - -#include <hiredis-vip/hircluster.h> -#include <hiredis-vip/async.h> -#include <hiredis-vip/adapters/libevent.h> -#include <cjson/cJSON.h> - -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" -#include "tango_cache_redis.h" - -#define PARSE_JSON_RET_ERROR -1 -#define PARSE_JSON_RET_TIMEOUT 0 -#define PARSE_JSON_RET_SUCC 1 - -#define CACHE_REDIS_CONNECT_IDLE 0 -#define CACHE_REDIS_CONNECTING 1 -#define CACHE_REDIS_CONNECTED 2 -#define CACHE_REDIS_DISCONNECTED 3 - -static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); - - if(status == REDIS_OK) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d success.", - ac->c.tcp.host, ac->c.tcp.port); - } - else - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d failed: %s.", - ac->c.tcp.host, ac->c.tcp.port, ac->errstr); - } - instance->redis_connecting = CACHE_REDIS_DISCONNECTED; -} - -static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); - - if(status == REDIS_OK) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d success.", - ac->c.tcp.host, ac->c.tcp.port); - instance->redis_connecting = CACHE_REDIS_CONNECTED; - } - else - { - instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d failed: %s.", - ac->c.tcp.host, ac->c.tcp.port, ac->errstr); - } -} - -int redis_asyn_connect_init(struct tango_cache_instance *instance) -{ - instance->redis_ac = redisClusterAsyncConnect(instance->param->redisaddrs, HIRCLUSTER_FLAG_ROUTE_USE_SLOTS); - if(instance->redis_ac == NULL) - { - return -1; - } - instance->redis_connecting = CACHE_REDIS_CONNECTING; - redisClusterLibeventAttach(instance->redis_ac, instance->evbase); - redisClusterAsyncSetConnectionData(instance->redis_ac, instance); - redisClusterAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb); - redisClusterAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb); - return 0; -} - -static int parse_object_meta_json(struct tango_cache_ctx *ctx, const char *jcontent) -{ - cJSON *root, *ptarget; - int ret = PARSE_JSON_RET_ERROR; - char usertag[2048]; - size_t datalen; - - if(NULL == (root=cJSON_Parse(jcontent))) - { - goto out_json; - } - if(NULL == (ptarget=cJSON_GetObjectItem(root, "Content-Length"))) - { - goto out_json; - } - ctx->get.result.tlength = ptarget->valuedouble; - if(NULL==(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-Lm"))) - { - goto out_json; - } - ctx->get.last_modify = ptarget->valuedouble; - if(NULL==(ptarget=cJSON_GetObjectItem(root, "Expires"))) - { - goto out_json; - } - ctx->get.expires = ptarget->valuedouble; - ctx->get.need_hdrs = RESPONSE_HDR_ALL; - if(!check_expires_fresh_header(ctx)) - { - ret = PARSE_JSON_RET_TIMEOUT; - goto out_json; - } - - if(NULL!=(ptarget=cJSON_GetObjectItem(root, "Headers"))) - { - easy_string_savedata(&ctx->response, ptarget->valuestring, strlen(ptarget->valuestring)); - } - if(NULL!=(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-User"))) - { - if((datalen = Base64_DecodeBlock((unsigned char*)ptarget->valuestring, strlen(ptarget->valuestring), (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->get.response_tag, usertag, datalen); - } - } - cJSON_Delete(root); - return PARSE_JSON_RET_SUCC; - -out_json: - cJSON_Delete(root); - return ret; -} - -static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) -{ - redisReply *reply = (redisReply *)vreply; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; - int ret; - - ctx->instance->statistic.session_redis -= 1; - if(reply == NULL || reply->type!=REDIS_REPLY_ARRAY) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); - if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str); - } - else - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - tango_cache_ctx_destroy(ctx); - return; - } - else if(reply->element[0]->type == REDIS_REPLY_NIL) - { - tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - tango_cache_ctx_destroy(ctx); - return; - } - - switch(ctx->get.state) - { - case GET_STATE_REDIS_META: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; - break; - case GET_STATE_REDIS_ALL: - ctx->get.result.location = OBJECT_IN_REDIS; - break; - - case GET_STATE_REDIS_TRY: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; - if(ctx->get.result.location == OBJECT_IN_HOS) - { - ctx->get.redis_redirect_minio_cb(ctx); - return; - } - ctx->locate = OBJECT_IN_REDIS; - break; - default: assert(0);break; - } - - ret = parse_object_meta_json(ctx, reply->element[0]->str); - switch(ret) - { - case PARSE_JSON_RET_ERROR: - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - tango_cache_ctx_destroy(ctx); - break; - case PARSE_JSON_RET_TIMEOUT: - if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_HOS) - { - ctx->get.state = GET_STATE_END; - cache_delete_minio_object(ctx); - } - else - { - tango_cache_ctx_destroy(ctx); - } - break; - case PARSE_JSON_RET_SUCC: - fetch_header_over_biz(ctx); - if(ctx->get.state != GET_STATE_REDIS_META) - { - ctx->get.result.data_frag = reply->element[2]->str; - ctx->get.result.size = reply->element[2]->len; - ctx->get.result.type = RESULT_TYPE_BODY; - promise_success(ctx->promise, &ctx->get.result); - } - ctx->get.result.type = RESULT_TYPE_END; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - tango_cache_ctx_destroy(ctx); - break; - default: assert(0);break; - } -} - -int tango_cache_head_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_META; - } - return ret; -} - -int tango_cache_fetch_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_ALL; - } - return ret; -} - -int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_TRY; - } - return ret; -} - -static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; - redisReply *reply = (redisReply *)vreply; - int ret; - - ctx->instance->statistic.session_redis -= 1; - if(reply == NULL || reply->type==REDIS_REPLY_ERROR || ac->err) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); - } - if(ctx->fail_state) - { - if(ctx->put.state==PUT_STATE_REDIS_META || ctx->put.state==PUT_STATE_REDIS_SETEX) //����һ��������ص� - { - ctx->put.state = PUT_STATE_END; - return; - } - tango_cache_ctx_destroy(ctx, true); - return; - } - - switch(ctx->put.state) - { - case PUT_STATE_REDIS_META: - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); - if(ret!=REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - ctx->put.state = PUT_STATE_END; - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_SETEX; - } - break; - case PUT_STATE_REDIS_EXPIRE: - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx, true); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_END; - } - break; - case PUT_STATE_REDIS_SETEX: - ctx->put.state = PUT_STATE_END; //����һ��EXPIRE��������� - break; - case PUT_STATE_END: - tango_cache_ctx_destroy(ctx, true); - break; - default: assert(0);break; - } -} - -int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback) -{ - int ret_mset, ret_set; - char *meta; - - meta = cJSON_PrintUnformatted(ctx->put.object_meta); - - ret_mset = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "HMSET %s OBJECT_LOCATION minio OBJECT_META %s MINIO_ADDR %s", ctx->object_key, meta, ctx->hostaddr); - ret_set = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "SET http://%s/%s 1 EX %u", ctx->hostaddr, ctx->object_key, ctx->put.object_ttl); - if(ret_mset==REDIS_OK && ret_set==REDIS_OK) - { - ctx->instance->statistic.session_redis += 2; - ctx->put.state = PUT_STATE_REDIS_META; - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - if(ret_mset==REDIS_OK) - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_EXPIRE; //��ʱ��PUT����object��һ�� - } - else if(ret_set==REDIS_OK) - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_END; - } - else - { - tango_cache_ctx_destroy(ctx, callback); - free(meta); - return -1; - } - } - free(meta); - return 0; -} - -int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - int ret; - char *meta; - - ctx->instance->statistic.memory_used -= size; - meta = cJSON_PrintUnformatted(ctx->put.object_meta); - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "HMSET %s OBJECT_LOCATION redis OBJECT_META %s OBJECT_BODY %b", ctx->object_key, meta, data, size); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx, callback); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_EXPIRE; - } - if(way == PUT_MEM_FREE) - { - free((void *)data); - } - free(meta); - return ret; -} - -int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback) -{ - char *data; - size_t size; - - data = (char *)malloc(object_size); - size = evbuffer_remove(ctx->put.evbuf, data, object_size); - if(size != object_size) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - free(data); - return CACHE_ERR_EVBUFFER; - } - return redis_put_complete_part_data(ctx, PUT_MEM_FREE, data, object_size, callback); -} - diff --git a/cache/src/tango_cache_redis.h b/cache/src/tango_cache_redis.h deleted file mode 100644 index 74e11e4..0000000 --- a/cache/src/tango_cache_redis.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef __TANGO_CACHE_REDIS_H__ -#define __TANGO_CACHE_REDIS_H__ - -#include <event2/event.h> -#include <event.h> - -#include "tango_cache_client_in.h" - -int tango_cache_head_redis(struct tango_cache_ctx *ctx); -int redis_asyn_connect_init(struct tango_cache_instance *instance); - - -int tango_cache_fetch_redis(struct tango_cache_ctx *ctx); -int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx); - -int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback); -int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback); -int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback); - -#endif - diff --git a/cache/src/tango_cache_tools.cpp b/cache/src/tango_cache_tools.cpp deleted file mode 100644 index e22a8b9..0000000 --- a/cache/src/tango_cache_tools.cpp +++ /dev/null @@ -1,258 +0,0 @@ -#include <sys/time.h> -#include <time.h> -#include <string.h> -#include <ctype.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <errno.h> - -#include "tango_cache_tools.h" - -static const char data_bin2ascii[65] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - -#define conv_bin2ascii(a) (data_bin2ascii[(a)&0x3f]) - -#define B64_EOLN 0xF0 -#define B64_CR 0xF1 -#define B64_EOF 0xF2 -#define B64_WS 0xE0 -#define B64_ERROR 0xFF -#define B64_NOT_BASE64(a) (((a)|0x13) == 0xF3) -#define B64_BASE64(a) !B64_NOT_BASE64(a) - -static const unsigned char data_ascii2bin[128] = { - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xE0, 0xF0, 0xFF, 0xFF, 0xF1, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xE0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0x3E, 0xFF, 0xF2, 0xFF, 0x3F, - 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, - 0x3C, 0x3D, 0xFF, 0xFF, 0xFF, 0x00, 0xFF, 0xFF, - 0xFF, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, - 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, - 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, - 0x17, 0x18, 0x19, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, - 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, - 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E, 0x2F, 0x30, - 0x31, 0x32, 0x33, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, -}; - -//�мDz�������˵���: conv_ascii2bin(x++); ��Ϊ��++���� -#define conv_ascii2bin(aa) (((aa) & 0x80)?(0xFF):data_ascii2bin[(aa)]) - -/********************************************************************* -�������ƣ�Base64_EncodeBlock -���ܼ�飺��һ����BASE64�������б��� -���������in����������ַ��� - inl��in�ij��� -���������out�������洢�Ļ������� -����ֵ�������ij��� -*********************************************************************/ -int Base64_EncodeBlock(const unsigned char *in, int inl, unsigned char *out) -{ - int i, ret = 0; - unsigned long l; - - for (i = inl; i > 0; i -= 3) { - if (i >= 3) { - l = (((unsigned long)in[0]) << 16L) | - (((unsigned long)in[1]) << 8L) | in[2]; - *(out++) = conv_bin2ascii(l >> 18L); - *(out++) = conv_bin2ascii(l >> 12L); - *(out++) = conv_bin2ascii(l >> 6L); - *(out++) = conv_bin2ascii(l); - } else { - l = ((unsigned long)in[0]) << 16L; - if (i == 2) - l |= ((unsigned long)in[1] << 8L); - - *(out++) = conv_bin2ascii(l >> 18L); - *(out++) = conv_bin2ascii(l >> 12L); - *(out++) = (i == 1) ? '=' : conv_bin2ascii(l >> 6L); - *(out++) = '='; - } - ret += 4; - in += 3; - } - - *out = '\0'; - return (ret); -} - -/********************************************************************* -�������ƣ�Base64_DecodeBlock -���ܼ�飺��һ����BASE64�������н������Զ�������β��BASE64�����ַ� -���������in����������ַ��� - inl��in�ij��� -���������out�������洢�Ļ������� - ���뱣֤���㹻�Ŀռ䣬һ��ﵽ@inl��С���ɣ� -����ֵ��<0��ʧ�ܣ�>=0�������ij��� -*********************************************************************/ -int Base64_DecodeBlock(const unsigned char *in, int inl, unsigned char *out, int outsize) -{ - int i, ret = 0; - unsigned char a, b, c, d; - unsigned long l; - - /* ignore not-base64-encoded charactor. */ - while ((conv_ascii2bin(*in) == B64_WS) && (inl > 0)) - { - in++; - inl--; - } - while ((inl > 3) && (B64_NOT_BASE64(conv_ascii2bin(in[inl - 1])))) - inl--; - - if (inl % 4 != 0) - return -1; - - if(outsize < (inl*3)/4) - return -2; - - for (i = 0; i < inl; i += 4) - { - a = conv_ascii2bin(*(in)); - b = conv_ascii2bin(*(in+1)); - c = conv_ascii2bin(*(in+2)); - d = conv_ascii2bin(*(in+3)); - if ((a & 0x80) || (b & 0x80) || (c & 0x80) || (d & 0x80)) - return (-1); - l = ((((unsigned long)a) << 18L) | (((unsigned long)b) << 12L) | - (((unsigned long)c) << 6L) | (((unsigned long)d))); - *(out++) = (unsigned char)(l >> 16L) & 0xff; - *(out++) = (unsigned char)(l >> 8L) & 0xff; - *(out++) = (unsigned char)(l) & 0xff; - ret += 3; - in+=4; - } - - for(i = inl; i > 0; i -= 4) - { - if(*(in-3) == '=') - { - in -= 4; - ret -= 3; - continue; - } - - while(*(--in) == '=') - ret -= 1; - - break; - } - - return ret; -} - - -//��֪������������ͬ������£��Ƚ��������Ƿ���� -int strcmp_one_word_mesa_equal_len(const char *s1_lowercase, const char *s1_uppercase, const char *s2, size_t len) -{ - unsigned char *s1,*s12; - - if (s2[len-1]-'a'>=0) - { - s1 = (unsigned char *)s1_lowercase; - s12= (unsigned char *)s1_uppercase; - } - else - { - s1 = (unsigned char *)s1_uppercase; - s12= (unsigned char *)s1_lowercase; - } - - do { - if (*s1 == *s2 || *s12 == *s2) - { - ++s1; - ++s12; - ++s2; - continue; - } - return 0; - } while (--len); - - return 1; -} - -int mkdir_according_path(const char * path) -{ - char buffer[256]; - const char *ps=path, *pc; - - if(*ps == '/') - ps += 1; - - while((pc = strchr(ps, '/')) != NULL) - { - while(*(pc+1) == '/') - pc++; - - memcpy(buffer, path, pc - path); - buffer[pc-path] = '\0'; - - if(access(buffer, F_OK)) - { - if(mkdir(buffer, 0777) && errno!=EEXIST) - { - return -1; - } - } - - ps = pc + 1; - } - if(access(path, F_OK)) - { - if(mkdir(path, 0777)) - { - return -1; - } - } - return 0; -} - -//��ʱ���ַ���ת��Ϊʱ��� -time_t expires_hdr2timestamp(const char *expires_val, int len) -{ - struct tm tm; - - while(len > 0 && (*expires_val==' '||*expires_val=='\t'||*expires_val=='\r'||*expires_val=='\n')) - { - expires_val++; - len--; - } - if(len == 0) - { - return 0; - } - - memset(&tm, 0, sizeof(struct tm)); - if(strptime(expires_val, "%a, %d %b %Y %T", &tm) == NULL) - { - return 0; - } - - return mktime(&tm); -} - -//������ʱ���ת��ΪGMTʱ���ַ��� -size_t expires_timestamp2hdr_str(time_t seconds, char *buffer, size_t size) -{ - struct tm save; - return strftime(buffer, size, "Expires: %a, %d %b %Y %T GMT", gmtime_r(&seconds, &save)); -} - -//������ʱ���ת��ΪGMTʱ��� -time_t get_gmtime_timestamp(time_t seconds) -{ - struct tm *tm, save; - - tm = gmtime_r(&seconds, &save); - return mktime(tm); -} - diff --git a/cache/src/tango_cache_tools.h b/cache/src/tango_cache_tools.h deleted file mode 100644 index e0ab730..0000000 --- a/cache/src/tango_cache_tools.h +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __TANGO_CACHE_TOOLS_H__ -#define __TANGO_CACHE_TOOLS_H__ - -#include <MESA/MESA_handle_logger.h> - -#define MESA_HANDLE_RUNTIME_LOGV2(handle, lv, fmt, args...) \ - MESA_handle_runtime_log((handle), (lv), "TANGO_CACHE", "%s:%d, " fmt, __FILE__, __LINE__, ##args) - -#ifdef CACHE_DEBUG_SWITCH -#define DBG_CACHE(fmt, args...) do{printf("%s():%d, " fmt, __FUNCTION__, __LINE__, ##args);}while(0) -#else -#define DBG_CACHE(msg...) -#endif - -int Base64_EncodeBlock(const unsigned char *in, int inl, unsigned char *out); -int Base64_DecodeBlock(const unsigned char *in, int inl, unsigned char *out, int outsize); - -int strcmp_one_word_mesa_equal_len(const char *s1_lowercase, const char *s1_uppercase, const char *s2, size_t len); -int mkdir_according_path(const char * path); - -time_t get_gmtime_timestamp(time_t seconds); -time_t expires_hdr2timestamp(const char *expires_val, int len); -size_t expires_timestamp2hdr_str(time_t seconds, char *buffer, size_t size); - -#endif - diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp deleted file mode 100644 index e8b4cfc..0000000 --- a/cache/src/tango_cache_transfer.cpp +++ /dev/null @@ -1,986 +0,0 @@ -#include <sys/types.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> -#include <curl/curl.h> - -#include "tango_cache_transfer.h" -#include "tango_cache_xml.h" -#include "tango_cache_tools.h" -#include "tango_cache_redis.h" - -static inline void curl_set_common_options(CURL *curl, long transfer_timeout, char *errorbuf) -{ - curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errorbuf); - curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //���Է��ֶ�������ij���ӽ��տ�ס����� - //ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds" - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L); - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); -} - -//response body�̻ܶ���ʱ -size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp) -{ - return size*count; -} - -static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - size_t totallen = size*count; - char *start = (char *)ptr, *end = start + totallen; - struct multipart_etag_list *etag; - - if(!strncmp(start, "Etag:", totallen>5?5:totallen)) - { - start += 5; end -= 1; totallen -= 5; - while(totallen>0 && (*start==' ')) {start++; totallen--;} - while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;} - if(totallen > 0) - { - etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list)); - totallen = end - start + 1; - etag->etag = (char *)malloc(totallen + 1); - etag->part_number = ctx->put.part_index; - memcpy(etag->etag, start, totallen); - *(etag->etag + totallen) = '\0'; - TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node); - } - } - - return size*count; -} - -static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp) -{ - size_t len; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size) - { - return 0; //��һ������ - } - - len = ctx->put.once_request.size - ctx->put.once_request.len; //ʣ����ϴ��ij��� - if(len > size * count) - { - len = size * count; - } - - memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len); - ctx->put.once_request.len += len; - - if(ctx->put.once_request.len >= ctx->put.once_request.size) - { - ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //δʹ��cache buffer���Լ������ڴ����� - easy_string_destroy(&ctx->put.once_request); - } - return len; -} - -static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp) -{ - size_t len, space=size*count, send_len; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length) - { - return 0; - } - - len = ctx->put.upload_length - ctx->put.upload_offset; - if(len > space) - { - len = space; - } - send_len = evbuffer_remove(ctx->put.evbuf, ptr, len); - assert(send_len>0); - ctx->put.upload_offset += send_len; - ctx->instance->statistic.memory_used -= send_len; - - return send_len; -} - -//return value: <0:fail; =0: not exec; >0: OK -static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full) -{ - UNUSED CURLMcode rc; - char minio_url[256]={0}, buffer[256]={0}; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return -1; - } - - ctx->put.upload_offset = 0; - if(full) - { - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - } - else - { - snprintf(minio_url, 256, "http://%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); - } - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - //token�ֶΣ�����hos�洢��֤ - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length); - curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); - curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, length: %lu, key: %s\n", ctx->put.state, ctx->put.upload_length, ctx->object_key); - return 1; -} - -static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - struct easy_string *estr = &ctx->response; - CURLcode code; - - if(ctx->fail_state) - { - return size*count; - } - - if(ctx->res_code == 0) - { - code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); - if(code != CURLE_OK || ctx->res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - return size*count; - } - } - - easy_string_savedata(estr, (const char*)ptr, size*count); - return size*count; -} - -int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256]={0}, buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s?uploads", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ��ʹ�ûص���������fread�����Է��ֹر�Expectʱ�ᵼ�¿���curl_multi_socket_action - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - return 1; -} - -int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - ctx->instance->statistic.del_recv_num += 1; - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, call_back); //�ս��� - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 1; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return false; - } - - snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return true; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) -{ - int len=0; - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return false; - } - construct_complete_xml(ctx, &ctx->put.combine_xml, &len); - - snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //���Content-Length - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - if(ctx->headers != NULL) - { - curl_slist_free_all(ctx->headers); - ctx->headers = NULL; - } - ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - return true; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len) -{ - int ret = 1; - - switch(ctx->put.state) - { - case PUT_STATE_START: - if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); - return false; - } - ctx->put.state = PUT_STATE_WAIT_START; - ret = curl_get_minio_uploadID(ctx); - break; - - case PUT_STATE_PART: - if(ctx->curl == NULL) - { - ctx->put.upload_length = block_len; - ret = http_put_bodypart_request_evbuf(ctx, false); - } - break; - - default: break;//nothing to do - } - - if(ret <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - return false; - } - return true; -} - -//callbackֱ��ʧ���Ƿ���ûص���������ʽ��Ҫ������һ���Բ���Ҫ -static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback) -{ - int ret=-1; - - ctx->put.state = PUT_STATE_END; - ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); - if(ctx->put.upload_length > 0) - { - ret = http_put_bodypart_request_evbuf(ctx, true); - if(ret <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, callback); - } - } - else - { - tango_cache_ctx_destroy(ctx, callback); - } - return ret; -} - -int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback) -{ - DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not"); - ctx->put.close_state = true;//������״̬�����������رգ��ڲ�״̬����ת�������ٹر� - if(ctx->fail_state) - { - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - - switch(ctx->put.state) - { - case PUT_STATE_START: //��ʱ��ͬ����һ�����ϴ� - if(sessions_exceeds_limit(ctx->instance, ctx->locate)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_evbuf(ctx, callback); - } - else - { - return redis_put_complete_part_evbuf(ctx, ctx->put.object_size, callback); - } - break; - - case PUT_STATE_PART: - if(ctx->curl == NULL) - { - ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); - if(ctx->put.upload_length == 0) - { - if(cache_kick_combine_minio(ctx)) - { - ctx->put.state = PUT_STATE_END; - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx); - return -1; - } - } - else if(http_put_bodypart_request_evbuf(ctx, false) <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else - { - tango_cache_ctx_destroy(ctx); - return -1; - } - } - } - break; - - case PUT_STATE_END: assert(0); //�û���������endʱ�����ܴ��ڴ�״̬ - case PUT_STATE_WAIT_START: //��ʱδ��ȡ��uploadId�������������ϴ� - default: break; - } - return 0; -} - -void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - switch(ctx->put.state) - { - case PUT_STATE_WAIT_START: - if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID)) - { - easy_string_destroy(&ctx->response); - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - } - else - { - easy_string_destroy(&ctx->response); - ctx->put.state = PUT_STATE_PART; - if(ctx->put.close_state) - { - do_tango_cache_update_end(ctx, true); - } - else - { - size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, upload_length); - } - } - } - break; - - case PUT_STATE_PART: - if(res != CURLE_OK || res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - if(ctx->fail_state) - { - if(cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - } - else if(ctx->put.close_state) - { - do_tango_cache_update_end(ctx, true); - } - else - { - size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, upload_length); - } - } - break; - - case PUT_STATE_CANCEL: //�ȴ��ر� - if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - break; - - case PUT_STATE_END: - if(res != CURLE_OK || res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state) - { - redis_put_minio_object_meta(ctx, true); - } - else - { - tango_cache_ctx_destroy(ctx); - } - break; - default: break; - } -} - -int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, callback); - if(way == PUT_MEM_FREE) free((void *)data); - ctx->instance->statistic.memory_used -= size; - return -1; - } - ctx->put.state = PUT_STATE_END; - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - if(way == PUT_MEM_COPY) - { - ctx->put.once_request.buff = (char *)malloc(size); - memcpy(ctx->put.once_request.buff, data, size); - } - else - { - ctx->put.once_request.buff = (char *)data; - } - ctx->put.once_request.size = size; - ctx->put.once_request.len = 0; - curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size); - curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb); - curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 0; -} - -int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->statistic.memory_used += size; - ctx->instance->error_code = CACHE_OK; - - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_data(ctx, way, data, size, false); - } - else - { - return redis_put_complete_part_data(ctx, way, data, size, false); - } -} - -int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback) -{ - size_t size; - - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->error_code = CACHE_OK; - - if(way == EVBUFFER_MOVE) - { - if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - } - else - { - if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - } - size = evbuffer_get_length(ctx->put.evbuf); - ctx->instance->statistic.memory_used += size; - - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_evbuf(ctx, callback); - } - else - { - return redis_put_complete_part_evbuf(ctx, size, callback); - } -} - -void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - tango_cache_ctx_destroy(ctx); -} - -void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - u_int32_t errnum=0; - - if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - ctx->del.fail_num = ctx->del.succ_num; - ctx->del.succ_num = 0; - } - else - { - if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE)) - { - ctx->del.fail_num = ctx->del.succ_num; - ctx->del.succ_num = 0; - } - else - { - ctx->del.fail_num = errnum; - ctx->del.succ_num -= errnum; - } - if(ctx->del.fail_num > 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - } - tango_cache_ctx_destroy(ctx); -} - -int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - ctx->instance->statistic.del_recv_num += ctx->del.succ_num; - ctx->instance->error_code = CACHE_OK; - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //���Content-Length����CURLOPT_COPYPOSTFIELDS֮ǰ���� - curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - easy_string_destroy(&ctx->response); - return 0; -} - -bool fetch_header_over_biz(struct tango_cache_ctx *ctx) -{ - if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //��Expiresʱ - { - tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); - ctx->get.state = GET_STATE_DELETE; - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - return false; - } - - if(ctx->get.response_tag.len > 0) - { - ctx->get.result.data_frag = ctx->get.response_tag.buff; - ctx->get.result.size = ctx->get.response_tag.len; - ctx->get.result.type = RESULT_TYPE_USERTAG; - promise_success(ctx->promise, &ctx->get.result); - easy_string_destroy(&ctx->get.response_tag); - } - if(ctx->response.len > 0) - { - ctx->get.result.data_frag = ctx->response.buff; - ctx->get.result.size = ctx->response.len; - ctx->get.result.type = RESULT_TYPE_HEADER; - promise_success(ctx->promise, &ctx->get.result); - easy_string_destroy(&ctx->response); - } - return true; -} - -static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) - { - return size*count; - } - - if(!fetch_header_over_biz(ctx)) - { - return size*count; - } - - ctx->get.result.data_frag = (const char *)ptr; - ctx->get.result.size = size * count; - ctx->get.result.type = RESULT_TYPE_BODY; - promise_success(ctx->promise, &ctx->get.result); - return size*count; -} - -bool check_expires_fresh_header(struct tango_cache_ctx *ctx) -{ - time_t now_gmt; - - if(ctx->get.need_hdrs != RESPONSE_HDR_ALL) - return true; - - now_gmt = get_gmtime_timestamp(time(NULL)); - - if(now_gmt > ctx->get.expires) - { - tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); - ctx->get.state = GET_STATE_DELETE; //����ʧЧʱ���������ʱ����ɾ������ - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - easy_string_destroy(&ctx->response); - return false; - } - - if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires) - { - tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - easy_string_destroy(&ctx->response); - return false; - } - return true; -} - -static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, long res_code) -{ - if(code != CURLE_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - return false; - } - - if(res_code != 200L) - { - if(res_code == 404L) - { - tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - return false; - } - return true; -} - -static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - char *start=(char *)ptr, *pos_colon; - size_t raw_len = size*count, hdrlen=size*count; - char usertag[2048]; - size_t datalen; - - if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) - { - return raw_len; - } - if(ctx->res_code == 0) //�״�Ӧ��ʱ�ȿ�Ӧ�����Ƿ���200 - { - UNUSED CURLcode code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); - if(!check_get_result_code(ctx, code, ctx->res_code)) - { - return raw_len; - } - ctx->get.result.location = OBJECT_IN_HOS; - } - pos_colon = (char*)memchr(start, ':', raw_len); - if(pos_colon == NULL) - { - return raw_len; - } - - datalen = pos_colon - start; - switch(datalen) - { - case 7: - if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7)) - { - ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES; - ctx->get.expires = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1); - if(!check_expires_fresh_header(ctx)) - { - return raw_len; - } - } - break; - case 13: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13)) - { - ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD; - sscanf(pos_colon+1, "%lu", &ctx->get.last_modify); - if(!check_expires_fresh_header(ctx)) - { - return raw_len; - } - } - break; - case 15: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15)) - { - if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen); - } - } - break; - case 14: - if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14)) - { - sscanf(pos_colon+1, "%lu", &ctx->get.result.tlength); - } - break; - case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - default: break; - } - return raw_len; -} - -void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - switch(ctx->get.state) - { - case GET_STATE_START: - if(!ctx->fail_state && check_get_result_code(ctx, res, res_code)) - { - if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD���ֵ��ֶβ�ȫ�Ȳ�ɾ������������ޣ� - { - ctx->get.result.type = RESULT_TYPE_END; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - } - } - tango_cache_ctx_destroy(ctx); - break; - - case GET_STATE_DELETE: - ctx->get.state = GET_STATE_END; - cache_delete_minio_object(ctx); - break; - - case GET_STATE_END: - tango_cache_ctx_destroy(ctx); - break; - default: assert(0);break; - } -} - -static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_ctx_destroy(ctx); - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - if(ctx->method == CACHE_REQUEST_HEAD) - { - curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L); - } - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 1; -} - -static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx) -{ - struct promise *p = ctx->promise; - - ctx->get.state = GET_STATE_START; - ctx->locate = OBJECT_IN_HOS; - if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - promise_failed(p, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - tango_cache_ctx_destroy(ctx); - } - else if(tango_cache_fetch_minio(ctx) != 1) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "tango_cache_fetch_minio failed"); - } -} - -int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get) -{ - ctx->instance->statistic.get_recv_num += 1; - switch(where_to_get) - { - case OBJECT_IN_HOS: - ctx->locate = OBJECT_IN_HOS; - return (tango_cache_fetch_minio(ctx)==1)?0:-2; - case OBJECT_IN_REDIS: - ctx->locate = OBJECT_IN_REDIS; - return tango_cache_fetch_redis(ctx); - default: - ctx->get.redis_redirect_minio_cb = redis_redirect_object2minio_cb; - return tango_cache_try_fetch_redis(ctx); - } - return 0; -} - -int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head) -{ - ctx->instance->statistic.get_recv_num += 1; - if(where_to_head == OBJECT_IN_REDIS) - { - return tango_cache_head_redis(ctx); - } - else - { - return (tango_cache_fetch_minio(ctx)==1)?0:-1; - } -} - diff --git a/cache/src/tango_cache_transfer.h b/cache/src/tango_cache_transfer.h deleted file mode 100644 index ec17215..0000000 --- a/cache/src/tango_cache_transfer.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef __TANGO_CACHE_UPLOAD_H__ -#define __TANGO_CACHE_UPLOAD_H__ - -#include <sys/queue.h> -#include <curl/curl.h> - -#include "tango_cache_client_in.h" - -bool check_expires_fresh_header(struct tango_cache_ctx *ctx); -bool fetch_header_over_biz(struct tango_cache_ctx *ctx); - -void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); -void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); -void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); -void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); - -int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false); -int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback=false); - -bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx); -bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); -int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback); - -int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool call_back=false); -int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool call_back=false); - -int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head); -int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get); - -#endif - diff --git a/cache/src/tango_cache_xml.cpp b/cache/src/tango_cache_xml.cpp deleted file mode 100644 index 97280b6..0000000 --- a/cache/src/tango_cache_xml.cpp +++ /dev/null @@ -1,172 +0,0 @@ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <fcntl.h> - -#include <libxml/parser.h> -#include "tango_cache_xml.h" - -bool parse_uploadID_xml(const char *content, int len, char **uploadID) -{ - xmlDoc *pdoc; - xmlNode *pcur; - - if((pdoc = xmlParseMemory(content, len)) == NULL) - { - return false; - } - if((pcur = xmlDocGetRootElement(pdoc)) == NULL) - { - xmlFreeDoc(pdoc); - return false; - } - - while(pcur->type != XML_ELEMENT_NODE) - pcur = pcur->next; - if(xmlStrcmp(pcur->name, (const xmlChar *)"InitiateMultipartUploadResult")) - { - xmlFreeDoc(pdoc); - return false; - } - pcur = pcur->children; - while(pcur != NULL) - { - if(pcur->type != XML_ELEMENT_NODE || xmlStrcmp(pcur->name, (const xmlChar *)"UploadId")) - { - pcur = pcur->next; - continue; - } - *uploadID = (char *)xmlNodeGetContent(pcur); - xmlFreeDoc(pdoc); - return true; - } - - xmlFreeDoc(pdoc); - return false; -} - -void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) -{ - struct multipart_etag_list *etag; - xmlDoc *pdoc; - xmlNode *root, *child; - char number[20]; - - pdoc = xmlNewDoc((const xmlChar *)"1.0"); - root = xmlNewNode(NULL, (const xmlChar *)"CompleteMultipartUpload"); - /*Big data deletion of this field parsing, shielding this field**/ - //xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/"); - xmlDocSetRootElement(pdoc, root); - - TAILQ_FOREACH(etag, &ctx->put.etag_head, node) - { - sprintf(number, "%u", etag->part_number); - child = xmlNewChild(root, NULL, (const xmlChar*)"Part", NULL); - xmlNewChild(child, NULL, (const xmlChar*)"ETag", (const xmlChar*)etag->etag); - xmlNewChild(child, NULL, (const xmlChar*)"PartNumber", (const xmlChar*)number); - } - - xmlDocDumpFormatMemory(pdoc, (xmlChar **)xml, len, 1); - xmlFreeDoc(pdoc); -} - -static void fill_multidelete_xml_errcode(xmlNode *error, char *out, int size) -{ - xmlChar *errcode; - xmlNode *child = error->children; - - while(child != NULL) - { - if(child->type != XML_ELEMENT_NODE || xmlStrcmp(child->name, (const xmlChar *)"Message")) - { - child = child->next; - continue; - } - errcode = xmlNodeGetContent(child); - snprintf(out, size, "%s", (char *)errcode); - xmlFree(errcode); - break; - } -} - -bool parse_multidelete_xml(const char *xml, int len, u_int32_t *errnum, char *errstr, int size) -{ - xmlDoc *pdoc; - xmlNode *pcur; - int errornum=0; - - if((pdoc = xmlParseMemory(xml, len)) == NULL) - { - return false; - } - if((pcur = xmlDocGetRootElement(pdoc)) == NULL) - { - xmlFreeDoc(pdoc); - return false; - } - - while(pcur->type != XML_ELEMENT_NODE) - pcur = pcur->next; - if(xmlStrcmp(pcur->name, (const xmlChar *)"DeleteResult")) - { - xmlFreeDoc(pdoc); - return false; - } - pcur = pcur->children; - while(pcur != NULL) - { - if(pcur->type != XML_ELEMENT_NODE || xmlStrcmp(pcur->name, (const xmlChar *)"Error")) - { - pcur = pcur->next; - continue; - } - if(errornum == 0) - { - fill_multidelete_xml_errcode(pcur, errstr, size); - } - errornum++; - pcur = pcur->next; - } - *errnum = errornum; - - xmlFreeDoc(pdoc); - return true; -} - -void construct_multiple_delete_xml(const char *bucket, char *key[], u_int32_t num, int is_hash, char **xml, size_t *len) -{ - xmlDoc *pdoc; - xmlNode *root, *child; - int xmllen; - - pdoc = xmlNewDoc((const xmlChar *)"1.0"); - root = xmlNewNode(NULL, (const xmlChar *)"Delete"); - xmlDocSetRootElement(pdoc, root); - - xmlNewChild(root, NULL, (const xmlChar*)"Quiet", (const xmlChar*)"true"); - - if(is_hash) - { - char hashkey[72], sha256[72]; - for(u_int32_t i=0; i<num; i++) - { - child = xmlNewChild(root, NULL, (const xmlChar*)"Object", NULL); - caculate_sha256(key[i], strlen(key[i]), sha256, 72); - snprintf(hashkey, 256, "%c%c_%c%c_%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); - xmlNewChild(child, NULL, (const xmlChar*)"Key", (const xmlChar*)hashkey); - } - } - else - { - for(u_int32_t i=0; i<num; i++) - { - child = xmlNewChild(root, NULL, (const xmlChar*)"Object", NULL); - xmlNewChild(child, NULL, (const xmlChar*)"Key", (const xmlChar*)key[i]); - } - } - xmlDocDumpFormatMemoryEnc(pdoc, (xmlChar **)xml, &xmllen, "UTF-8", 1); - xmlFreeDoc(pdoc); - *len = xmllen; -} - diff --git a/cache/src/tango_cache_xml.h b/cache/src/tango_cache_xml.h deleted file mode 100644 index 43543dc..0000000 --- a/cache/src/tango_cache_xml.h +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef __TANGO_CACHE_XML_H__ -#define __TANGO_CACHE_XML_H__ - -#include "tango_cache_client_in.h" - -bool parse_uploadID_xml(const char *content, int len, char **uploadID); -void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len); -void construct_multiple_delete_xml(const char *bucket, char *key[], u_int32_t num, int is_hash, char **xml, size_t *len); -bool parse_multidelete_xml(const char *xml, int len, u_int32_t *errnum, char *errstr, int size); - -#endif - |
