diff options
Diffstat (limited to 'cache/src/cache_evbase_client.cpp')
| -rw-r--r-- | cache/src/cache_evbase_client.cpp | 640 |
1 files changed, 0 insertions, 640 deletions
diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp deleted file mode 100644 index e7835ef..0000000 --- a/cache/src/cache_evbase_client.cpp +++ /dev/null @@ -1,640 +0,0 @@ -#include <sys/ioctl.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <net/if.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/prctl.h> -#include <string.h> -#include <pthread.h> - -#include "cache_evbase_client.h" -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" - -enum CACHE_ASYN_CMD -{ - CACHE_ASYN_FETCH=0, - CACHE_ASYN_UPLOAD_ONCE_DATA, - CACHE_ASYN_UPLOAD_ONCE_EVBUF, - CACHE_ASYN_UPLOAD_START, - CACHE_ASYN_UPLOAD_FRAG_DATA, - CACHE_ASYN_UPLOAD_FRAG_EVBUF, - CACHE_ASYN_UPLOAD_END, - CACHE_ASYN_UPLOAD_CANCEL, - CACHE_ASYN_DELETE, - CACHE_ASYN_HEAD, -}; - -struct databuffer -{ - char *data; - size_t size; - struct evbuffer *evbuf; - enum CACHE_ASYN_CMD cmd_type; - enum OBJECT_LOCATION where_to_get; - struct cache_evbase_ctx *ctx_asyn; -}; - -enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn) -{ - return tango_cache_get_last_error(ctx_asyn->ctx); -} -enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance) -{ - return tango_cache_ctx_error(instance->instance); -} - -void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out) -{ - tango_cache_get_statistics(instance->instance, out); -} - -struct tango_cache_result *cache_evbase_read_result(void *promise_result) -{ - return tango_cache_read_result(promise_result); -} - -static int create_notification_pipe(evutil_socket_t sv[2]) -{ - if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1) - { - return -1; - } - if(evutil_make_socket_nonblocking(sv[0])<0 || evutil_make_socket_nonblocking(sv[1])<0) - { - return -1; - } - if(evutil_make_socket_closeonexec(sv[0])<0 || evutil_make_socket_closeonexec(sv[1])<0) - { - return -1; - } - return 0; -} - -static int32_t mesa_tcp_sock_write (int32_t write_fd, void *buf, int32_t bufsize) -{ - int32_t res; - - do{ - res = send(write_fd, buf, bufsize, MSG_NOSIGNAL); - }while (res==-1 &&(errno == EINTR)); - - return res; -} - -static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t len, int32_t s_time_out) -{ - fd_set w_set, e_set; - struct timeval tv; - int32_t res=0, sndlen=0, sendsize=0; - - while(len > sndlen) - { - FD_ZERO (&w_set); - FD_ZERO (&e_set); - FD_SET (socket_fd, &w_set); - FD_SET (socket_fd, &e_set); - if(s_time_out == 0) - { - res = select (socket_fd + 1, NULL, &w_set, &e_set, NULL); - } - else - { - tv.tv_sec = s_time_out; - tv.tv_usec = 0; - res = select (socket_fd + 1, NULL, &w_set, &e_set, &tv); - } - if(res <= 0) - { - printf("log_error: select io res=%d, error: %s\n", res, strerror(errno)); - return -1; - } - - if(FD_ISSET(socket_fd, &e_set)) - { - printf("log_error: select io is in efds, error: %s\n", strerror(errno)); - return -2; - } - - if(FD_ISSET(socket_fd, &w_set)) - { - sendsize = mesa_tcp_sock_write(socket_fd, (char*)content + sndlen, len - sndlen); - if (sendsize < 0) - { - if(errno == EAGAIN) - { - continue; - } - return -1; - } - sndlen += sendsize; - } - } - - return sndlen; -} - -static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn) -{ - free(ctx_asyn); -} - -static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) -{ - struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn; - struct promise *p; - int ret=0; - - switch(buffer->cmd_type) - { - case CACHE_ASYN_FETCH: - p = ctx_asyn->ctx->promise; - if(do_tango_cache_fetch_object(ctx_asyn->ctx, buffer->where_to_get) < 0) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); - } - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_HEAD: - p = ctx_asyn->ctx->promise; - ret = do_tango_cache_head_object(ctx_asyn->ctx, buffer->where_to_get); - if(ret<0) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); - } - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_DELETE: - cache_delete_minio_object(ctx_asyn->ctx, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_UPLOAD_ONCE_DATA: - do_tango_cache_upload_once_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_UPLOAD_ONCE_EVBUF: - do_tango_cache_upload_once_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true); - evbuffer_free(buffer->evbuf); - cache_asyn_ctx_destroy(ctx_asyn); - break; - - case CACHE_ASYN_UPLOAD_START: - ctx_asyn->ctx->instance->statistic.put_recv_num += 1; - ctx_asyn->ctx->instance->error_code = CACHE_OK; - break; - - case CACHE_ASYN_UPLOAD_FRAG_DATA: - tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size); - free(buffer->data); - break; - - case CACHE_ASYN_UPLOAD_FRAG_EVBUF: - tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); - evbuffer_free(buffer->evbuf); - break; - - case CACHE_ASYN_UPLOAD_END: - do_tango_cache_update_end(ctx_asyn->ctx, true); - cache_asyn_ctx_destroy(ctx_asyn); - break; - case CACHE_ASYN_UPLOAD_CANCEL: - tango_cache_update_cancel(ctx_asyn->ctx); - cache_asyn_ctx_destroy(ctx_asyn); - break; - default: assert(0);break; - } -} - -static void sockpair_notification_handler(evutil_socket_t fd, short events, void *arg) -{ - ssize_t readlen, needlen; - struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; - struct databuffer *buffer; - - while(1) - { - needlen=sizeof(struct cache_evbase_ctx *); - readlen = recv(fd, &buffer, needlen, 0); - if(readlen == needlen) - { - cache_asyn_ioevent_dispatch(buffer); - free(buffer); - } - else - { - if(errno!=EWOULDBLOCK && errno!=EAGAIN) - { - MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "read pipe error: %s.", strerror(errno)); - assert(0); - return; - } - break; - } - } -} - -static void* thread_listen_sockpair(void *arg) -{ - struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; - struct event listen_event; - - prctl(PR_SET_NAME, "tango_cache"); - - event_assign(&listen_event, instance_asyn->evbase, instance_asyn->notify_readfd, EV_READ|EV_PERSIST, sockpair_notification_handler, instance_asyn); - event_add(&listen_event, NULL); - event_base_dispatch(instance_asyn->evbase); - - printf("Libevent dispath error, should not run here.\n"); - MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); - assert(0); - return NULL; -} - -int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -1; - } - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_END; - - //ENDʱ����δ��ʼ�ֶ��ϴ��������ϴ�֮ǰ����locateһ��λ�� - ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size); - tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); - if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size); - } - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn) -{ - struct databuffer *buffer; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_CANCEL; - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - if(!ctx_asyn->ctx->fail_state) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - } - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - } -} - -int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - if(way == PUT_MEM_FREE) free((void *)data); - return 0;//��ʱ�Ⱥ��Է���ֵ���Իص���ʽ֪ͨ����������û���֪��ʱEND�����⡣ - } - ctx_asyn->object_size += size; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_MEM_COPY) - { - buffer->data = (char *)malloc(size); - memcpy(buffer->data, data, size); - } - else - { - buffer->data = (char*)data; - } - buffer->size = size; - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA; - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - free(buffer->data); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf) -{ - struct databuffer *buffer; - - if(ctx_asyn->ctx->fail_state) - { - return 0; - } - ctx_asyn->object_size += evbuffer_get_length(evbuf); - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF; - buffer->evbuf = evbuffer_new(); - evbuffer_add_buffer(buffer->evbuf, evbuf); - - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - evbuffer_free(buffer->evbuf); - free(buffer); - return -2; - } - return 0; -} - -struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_put *meta) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; - - if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) - { - maybe_loc = OBJECT_IN_HOS; - } - ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc); - if(ctx == NULL) - { - return NULL; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_START; - - //�¼�֪ͨ��Ϊ������ͳ����Ϣ - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return NULL; - } - return ctx_asyn; -} - -int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - - ctx = tango_cache_update_once_prepare(instance->instance, f, meta, size, path, pathsize); - if(ctx == NULL) - { - if(way == PUT_MEM_FREE) free((void *)data); - return -1; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_MEM_COPY) - { - buffer->data = (char *)malloc(size); - memcpy(buffer->data, data, size); - } - else - { - buffer->data = (char*)data; - } - buffer->size = size; - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - free(buffer->data); - free(buffer); - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -2; - } - return 0; -} - -int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* f, - struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct cache_evbase_ctx *ctx_asyn; - struct tango_cache_ctx *ctx; - struct databuffer *buffer; - - ctx = tango_cache_update_once_prepare(instance->instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); - if(ctx == NULL) - { - return -1; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = ctx; - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF; - buffer->evbuf = evbuffer_new(); - evbuffer_add_buffer(buffer->evbuf, evbuf); - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - evbuffer_free(buffer->evbuf); - free(buffer); - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - return -2; - } - return 0; -} - -int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - - if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) - { - where_to_get = OBJECT_IN_HOS; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_GET, f, meta, where_to_get); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_FETCH; - buffer->where_to_get = where_to_get; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_head_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - enum OBJECT_LOCATION location = OBJECT_IN_HOS; - - if(instance->instance->param->object_store_way != CACHE_ALL_HOS) - { - location = OBJECT_IN_REDIS; - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_HEAD, f, meta, location); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_HEAD; - buffer->where_to_get = location; - - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct cache_evbase_ctx *ctx_asyn; - struct databuffer *buffer; - - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); - ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, f, objkey, minio_addr, bucket); - if(ctx_asyn->ctx == NULL) - { - free(ctx_asyn); - return -1; - } - - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_DELETE; - - //�ο�Unix�����432ҳ���ڶ��߳�д�İ�ȫ������ - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) - { - instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx, false); - cache_asyn_ctx_destroy(ctx_asyn); - free(buffer); - return -2; - } - return 0; -} - -struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog) -{ - return tango_cache_parameter_new(profile_path, section, runtimelog); -} - -struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog) -{ - evutil_socket_t notification_fd[2]; - struct cache_evbase_instance *instance_asyn; - struct event_base *evbase; - pthread_t thread_tid; - pthread_attr_t attr; - - if(create_notification_pipe(notification_fd)) - { - return NULL; - } - if((evbase = event_base_new()) == NULL) - { - return NULL; - } - - instance_asyn = (struct cache_evbase_instance *)calloc(1, sizeof(struct cache_evbase_instance)); - instance_asyn->evbase = evbase; - instance_asyn->notify_readfd = notification_fd[0]; - instance_asyn->notify_sendfd = notification_fd[1]; - instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog); - if(instance_asyn->instance == NULL) - { - free(instance_asyn); - return NULL; - } - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if(pthread_create(&thread_tid, &attr, thread_listen_sockpair, instance_asyn)) - { - free(instance_asyn); - return NULL; - } - return instance_asyn; -} - -void cache_evbase_global_init(void) -{ - tango_cache_global_init(); -} - |
