diff options
| author | zhangchengwei <[email protected]> | 2018-12-14 15:07:09 +0800 |
|---|---|---|
| committer | zhangchengwei <[email protected]> | 2018-12-14 15:07:09 +0800 |
| commit | e725b460e22d7616ab18e5586e937226f474426f (patch) | |
| tree | 74fabf41e0c570469732df6a655701dddf64e12d | |
| parent | 0e23a077ddf417fc0656c84b959630323a90e9c8 (diff) | |
增加集群版redis作为元信息和对象缓存,去除Minio事件通知的redis元信息获取方式。
32 files changed, 1549 insertions, 897 deletions
diff --git a/cache/CMakeLists.txt b/cache/CMakeLists.txt index 430a413..9065655 100644 --- a/cache/CMakeLists.txt +++ b/cache/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(tango-cache-client src/cache_evbase_client.cpp src/tango_cache_client.cpp src/tango_cache_redis.cpp src/tango_cache_pending.cpp src/tango_cache_tools.cpp src/tango_cache_transfer.cpp src/tango_cache_xml.cpp) +add_library(tango-cache-client src/object_store_client.cpp src/cache_evbase_client.cpp src/tango_cache_client.cpp src/tango_cache_redis.cpp src/tango_cache_pending.cpp src/tango_cache_tools.cpp src/tango_cache_transfer.cpp src/tango_cache_xml.cpp) target_link_libraries(tango-cache-client http) target_include_directories(tango-cache-client PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) diff --git a/cache/README.txt b/cache/README.txt index 1360813..535fe08 100644 --- a/cache/README.txt +++ b/cache/README.txt @@ -1,3 +1,12 @@ -1��HEAD����֧�ִ�minio��redis����������ȡ�����������ļ������� +����Minio�ͼ�Ⱥ��Redis�����Ļ���ͻ��ˣ�����redis��ȡ�Ǹÿͻ���ʵ�֣�����Minio���¼�֪ͨredis�� +1��֧�����»��淽ʽ + ��1��Ԫ��Ϣ�Ͷ���ȫ���洢��Minio�� + ��2��Ԫ��Ϣ�洢��Redis������洢��Minio��˳��Ҳ�洢��Ԫ��Ϣ����HEAD������Redisȡ���� + ��3��Ԫ��Ϣ��С�ļ��洢��Redis�����ļ��洢��Minio����ֵ�������ļ��趨���� +2��������İ汾���£� + libevent��2.1�Ժ�汾�� + libcurl�� 7.43�Ժ�汾���Ƽ��汾7.59��һ��Ҫע��汾���⣡������������ + libxml2: 2.6�Ժ�汾 + hiredis-vip��0.3.0���ÿ��Ǿ����ĵİ汾���������صIJ���ֱ��ʹ�ã����������� -2��ʹ�õ�redis�ͻ������Լ��Ĺ��İ汾��vendor���ṩ���� +3������ο�ͷ�ļ�tango_cache_client.h���ǽṹ����־API�ο�ͷ�ļ�cache_evbase_client.h��
\ No newline at end of file diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 5bead2f..86d757a 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -6,6 +6,8 @@ #include "tango_cache_client.h" +/* API��ʹ��˵���ο�tango_cache_client.h */ + struct cache_evbase_instance { struct tango_cache_instance *instance; @@ -16,11 +18,12 @@ struct cache_evbase_instance struct cache_evbase_ctx { + size_t object_size; //tango_ctx�������Ա�����ع�����ֱ�ӻ�ȡ�Ļ����ڶ�дһ�������� struct tango_cache_ctx *ctx; struct cache_evbase_instance *instance_asyn; }; -/*����API�̰߳�ȫ��API��ʹ��˵���ο�tango_cache_client.h*/ +/*����API�̰߳�ȫ������ͬһ��cache_evbase_ctx�����Կ��̷߳��ʡ�*/ enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn); enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance); @@ -34,9 +37,8 @@ struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_pat /*����ʵ�����̰߳�ȫ���ڲ�������һ���߳�*/ struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog); - //GET�ӿڣ��ɹ�����0��ʧ�ܷ���-1��future�ص���������������߳���ִ�У���ͬ -int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta); +int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get); int cache_evbase_head_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta); struct tango_cache_result *cache_evbase_read_result(void *promise_result); @@ -57,10 +59,8 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_put *meta); int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf); -void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn); +int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize); void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn); -void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize); - #endif diff --git a/cache/include/object_store_client.h b/cache/include/object_store_client.h new file mode 100644 index 0000000..e7d72bc --- /dev/null +++ b/cache/include/object_store_client.h @@ -0,0 +1,52 @@ +#ifndef __OBJECT_STORE_CLIENT_H__ +#define __OBJECT_STORE_CLIENT_H__ + +#include <event2/event.h> +#include <event.h> + +#include "cache_evbase_client.h" + +struct object_store_instance +{ + struct cache_evbase_instance **instances; + u_int32_t instance_num; +}; + +/*����API�̰߳�ȫ��API��ʹ��˵���ο�tango_cache_client.h*/ + +enum CACHE_ERR_CODE object_store_get_last_error(const struct cache_evbase_ctx *ctx_asyn); +void object_store_get_statistics(const struct object_store_instance *instance, struct cache_statistics *out); + +void object_store_global_init(void); + +/*����ʵ�����̰߳�ȫ���ڲ�������һ���߳�*/ +struct object_store_instance *object_store_instance_new(const char* profile_path, const char* section, int thread_num, void *runtimelog); + + +//GET�ӿڣ��ɹ�����0��ʧ�ܷ���-1��future�ص���������������߳���ִ�У���ͬ +int object_store_fetch_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get); +int object_store_head_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta); +struct tango_cache_result *object_store_read_result(void *promise_result); + +//DELETE�ӿ� +int object_store_delete_object(struct object_store_instance *instance, struct future* f, const char *objkey); + +//һ�����ϴ��ӿ� +int object_store_upload_once_data(struct object_store_instance *instance, struct future* f, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, + struct tango_cache_meta_put *meta, + char *path/*OUT*/, size_t pathsize); +int object_store_upload_once_evbuf(struct object_store_instance *instance, struct future* f, + struct evbuffer *evbuf, + struct tango_cache_meta_put *meta, + char *path/*OUT*/, size_t pathsize); + +//��ʽ�ϴ��ӿ� +struct cache_evbase_ctx *object_store_update_start(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_put *meta); +int object_store_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); +int object_store_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf); +int object_store_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize); +void object_store_update_cancel(struct cache_evbase_ctx *ctx_asyn); + +#endif + diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index c551e8f..83e87f2 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -12,47 +12,63 @@ enum CACHE_ERR_CODE { CACHE_OK=0, - CACHE_CACHE_MISS, //����δ���� - CACHE_TIMEOUT, //���泬ʱ - CACHE_OUTOF_MEMORY,//��ǰ�ڴ�ռ�ó������ƣ��鿴MAX_USED_MEMORY_SIZE_MB�Ƿ��С���ߵ�ǰ�ϴ����ʸ����ϵ����ߵ����� - CACHE_ERR_CURL, - CACHE_ERR_WIREDLB, - CACHE_ERR_SOCKPAIR, - CACHE_ERR_INTERNAL, - CACHE_ERR_REDIS_JSON, - CACHE_ERR_REDIS_CONNECT, - CACHE_OUTOF_SESSION, - CACHE_UPDATE_CANCELED, -}; - -enum PUT_MEMORY_COPY_WAY -{ - PUT_MEM_COPY=0, //��������ڴ� - PUT_MEM_FREE, //�������ڴ棬��������ɱ�����ģ���ͷŸ��ڴ� -}; -enum EVBUFFER_COPY_WAY -{ - EVBUFFER_MOVE=0,//evbuffer_add_buffer - EVBUFFER_COPY, //evbuffer_add_buffer_reference + CACHE_CACHE_MISS = -101, //����δ���� + CACHE_TIMEOUT = -102, //���泬ʱ + CACHE_OUTOF_MEMORY= -103,//��ǰ�ڴ�ռ�ó������ƣ��鿴MAX_USED_MEMORY_SIZE_MB�Ƿ��С���ߵ�ǰ�ϴ����ʸ����ϵ����ߵ����� + CACHE_ERR_CURL = -104, + CACHE_ERR_WIREDLB = -105, + CACHE_ERR_SOCKPAIR= -106, + CACHE_ERR_INTERNAL= -107, + CACHE_ERR_REDIS_JSON = -108, + CACHE_ERR_REDIS_CONNECT= -109, + CACHE_ERR_REDIS_EXEC = -110, + CACHE_OUTOF_SESSION = -111, + CACHE_ERR_EVBUFFER = -112, + CACHE_UPDATE_CANCELED = -113, }; struct cache_statistics { - long long get_recv_num; //����GET�Ĵ��� - long long get_succ_num; //GET�ɹ��Ĵ��� - long long get_miss_num; //GETδ���еĴ��� - long long get_error_num;//GETʧ�ܵĴ��� - long long put_recv_num; //����UPLOAD�Ĵ��� - long long put_succ_num; //UPLOAD�ɹ��Ĵ��� - long long put_error_num;//UPLOADʧ�ܵĴ��� - long long del_recv_num; //����DELETE�Ĵ��� - long long del_succ_num; //DELETE�ɹ��Ĵ��� - long long del_error_num;//DELETEʧ�ܵĴ��� - long long totaldrop_num;//�ڴ����Լ�WiredLB����ʱDROP�Ĵ��� - long long memory_used; //��ǰUPLOAD BODY��ռ�ڴ��С - long long session_num; //��ǰ���ڽ���GET/PUT��HTTP�Ự�� + long long get_recv_num; //����GET�Ĵ��� + long long get_succ_http; //GET minio�ɹ��Ĵ��� + long long get_succ_redis;//GET redis�ɹ��Ĵ��� + long long get_miss_num; //GETδ���еĴ��� + long long get_err_http; //GET minioʧ�ܵĴ��� + long long get_err_redis; //GET redisʧ�ܵĴ��� + long long put_recv_num; //����UPLOAD�Ĵ��� + long long put_succ_http; //UPLOAD minio�ɹ��Ĵ��� + long long put_succ_redis;//UPLOAD redis�ɹ��Ĵ��� + long long put_err_http; //UPLOAD minioʧ�ܵĴ��� + long long put_err_redis; //UPLOAD redisʧ�ܵĴ��� + long long del_recv_num; //����DELETE�Ĵ��� + long long del_succ_num; //DELETE�ɹ��Ĵ��� + long long del_error_num; //DELETEʧ�ܵĴ��� + long long totaldrop_num; //�ڴ����Լ�WiredLB����ʱDROP�Ĵ��� + long long memory_used; //��ǰUPLOAD BODY��ռ�ڴ��С + long long session_http; //��ǰ���ڽ���GET/PUT��HTTP�Ự�� + long long session_redis; //��ǰ���ڽ���GET/PUT��redis�Ự�� }; + +struct tango_cache_parameter; +struct tango_cache_instance; +struct tango_cache_ctx; + +enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx); +enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance); +void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out); + +/*ÿ������ִ��һ�γ�ʼ��*/ +void tango_cache_global_init(void); + +//ÿ��minio��Ⱥ��bucket����һ��parameter�����instance�ɹ���һ��parameter +struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtimelog); +/*��������API�̲߳���ȫ*/ +//ÿ�������̴߳���һ��instance +struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog); + + +/****************************************** GET�ӿڵ�API ******************************************/ enum CACHE_RESULT_TYPE { RESULT_TYPE_HEADER=0, //���ֻ����һ�� @@ -62,6 +78,19 @@ enum CACHE_RESULT_TYPE RESULT_TYPE_MISS, //����δ���У����������ͻ��⣬ֻ����һ��(��END֮��)������������ }; +enum OBJECT_LOCATION +{ + OBJECT_IN_UNKNOWN=0, + OBJECT_IN_MINIO, + OBJECT_IN_REDIS +}; + +struct tango_cache_meta_get +{ + const char* url; //����:URL���ǽṹ����־:�ļ�·������CACHE_OBJECT_KEY_HASH_SWITCH=0ʱ���256�ֽڣ�=1ʱ������ + struct request_freshness get; +}; + //promise_success�Ľ��result struct tango_cache_result { @@ -69,6 +98,34 @@ struct tango_cache_result size_t size; size_t tlength; //������ܳ��ȣ��ص�ʱ����Ч enum CACHE_RESULT_TYPE type; + enum OBJECT_LOCATION location; +}; + +//�ɹ�ʱ�ص�promise_success +//ʧ��ʱ�ص�promise_failed(��һ��)��ʹ��get_last_error��ȡ�����룻 +//future������ΪNULL +int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get); +int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta); +//��promise_success��result������ȡ��� +struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result); + + +/****************************************** DELETE�ӿڵ�API ******************************************/ +int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey); + + +/****************************************** UPLOAD�ӿڵ�API ******************************************/ +/* ע��: ��future��ΪNULL�������ϴ�����ʱ�����֪ͨ�ص������������ã�*/ + +enum PUT_MEMORY_COPY_WAY +{ + PUT_MEM_COPY=0, //��������ڴ� + PUT_MEM_FREE, //�������ڴ棬��������ɱ�����ģ���ͷŸ��ڴ� +}; +enum EVBUFFER_COPY_WAY +{ + EVBUFFER_MOVE=0,//evbuffer_add_buffer + EVBUFFER_COPY, //evbuffer_add_buffer_reference }; enum CACHE_HTTP_HDR_TYPE @@ -81,12 +138,6 @@ enum CACHE_HTTP_HDR_TYPE HDR_CONTENT_NUM, }; -struct tango_cache_meta_get -{ - const char* url; //����:URL���ǽṹ����־:�ļ�·������CACHE_OBJECT_KEY_HASH_SWITCH=0ʱ���256�ֽڣ�=1ʱ������ - struct request_freshness get; -}; - struct tango_cache_meta_put { const char* url; @@ -96,43 +147,7 @@ struct tango_cache_meta_put struct response_freshness put; }; -struct tango_cache_parameter; -struct tango_cache_instance; -struct tango_cache_ctx; - -enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx); -enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance); -void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out); - -/*ÿ������ִ��һ�γ�ʼ��*/ -void tango_cache_global_init(void); - -//ÿ��minio��Ⱥ��bucket����һ��parameter�����instance�ɹ���һ��parameter -struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtimelog); -/*��������API�̲߳���ȫ*/ -//ÿ�������̴߳���һ��instance -struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog); - - -/* GET�ӿڵ�API*/ -//�ɹ�ʱ�ص�promise_success -//ʧ��ʱ�ص�promise_failed(��һ��)��ʹ��get_last_error��ȡ�����룻 -//future������ΪNULL -int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta); -int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta); -//��promise_success��result������ȡ��� -struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result); - - -/* DELETE�ӿڵ�API*/ -int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey); - - -/* UPLOAD�ӿڵ�API - * ע��: UPLOAD�ӿڵ�API����future��ΪNULL�������ϴ�����ʱ�����֪ͨ�ص������������ã� - */ - -/*����һ���ϴ�API*/ +/****************************************** ����һ��UPLOAD�ӿڵ�API ******************************************/ //��path��Ϊ�գ����������Ĵ洢·�� //����ֵ: 0-�ɹ���<0ʧ�ܣ���ͬ int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f, @@ -143,18 +158,17 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path/*OUT*/, size_t pathsize); -/*��ʽ�ϴ�API*/ + +/****************************************** ��ʽUPLOAD�ӿڵ�API ******************************************/ //����ֵ: ��ΪNULL���ʾ����ʧ�ܣ�����tango_cache_ctx_error�鿴�������Ƿ���CACHE_OUTOF_MEMORY(�����������)�� struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta); //����ֵ: 0-�ɹ���<0ʧ�ܣ�����tango_cache_get_last_error�鿴�����룻 int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size); int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf); -void tango_cache_update_end(struct tango_cache_ctx *ctx); +//ע��: ����ʧ��ʱ���ٵ���promise�ص�������path��ΪNULLʱ���ش洢·�� +int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); //����cancel���������ص�����������ʧ�� void tango_cache_update_cancel(struct tango_cache_ctx *ctx); -//��ȡ����keyֵ����CACHE_OBJECT_KEY_HASH_SWITCH=1������URL/�ļ�����ϣʱ���� -void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); - #endif diff --git a/cache/include/tango_cache_pending.h b/cache/include/tango_cache_pending.h index 0c72635..a8d5858 100644 --- a/cache/include/tango_cache_pending.h +++ b/cache/include/tango_cache_pending.h @@ -1,13 +1,48 @@ #pragma once #include<time.h> -#include<tfe_http.h> + +enum tfe_http_std_field +{ + TFE_HTTP_UNKNOWN_FIELD = 0, + TFE_HTTP_HOST, + TFE_HTTP_REFERER, + TFE_HTTP_USER_AGENT, + TFE_HTTP_COOKIE, + TFE_HTTP_PROXY_AUTHORIZATION, + TFE_HTTP_AUTHORIZATION, + TFE_HTTP_LOCATION, + TFE_HTTP_SERVER, + TFE_HTTP_ETAG, + TFE_HTTP_DATE, + TFE_HTTP_TRAILER, + TFE_HTTP_TRANSFER_ENCODING, + TFE_HTTP_VIA, + TFE_HTTP_PRAGMA, + TFE_HTTP_CONNECTION, + TFE_HTTP_CONT_ENCODING, + TFE_HTTP_CONT_LANGUAGE, + TFE_HTTP_CONT_LOCATION, + TFE_HTTP_CONT_RANGE, + TFE_HTTP_CONT_LENGTH, + TFE_HTTP_CONT_TYPE, + TFE_HTTP_CONT_DISPOSITION, + TFE_HTTP_EXPIRES, + TFE_HTTP_ACCEPT_ENCODING, + TFE_HTTP_CACHE_CONTROL, + TLF_HTTP_IF_MATCH, + TLF_HTTP_IF_NONE_MATCH, + TLF_HTTP_IF_MODIFIED_SINCE, + TLF_HTTP_IF_UNMODIFIED_SINCE, + TLF_HTTP_LAST_MODIFIED +}; + enum cache_pending_action { UNDEFINED = 0, ALLOWED, FORBIDDEN, - REVALIDATE + VERIFY }; @@ -30,8 +65,6 @@ struct response_freshness{ time_t timeout; }; - -time_t read_GMT_time(const char* gmt_string); /* 函数功能: 根据请求头字段判断是否允许将缓存作为该请求的响应,并且将请求字段对缓存新鲜度的约束范围作为传出参数返回给调用者 @@ -43,9 +76,9 @@ restrict:如果该函数返回值为ALLOWED,则返回请求Cache-Control字段 UNDEFINED = 0,//请求字段中未定义缓存的行为 ALLOWED ,//允许使用缓存作为该请求的响应 FORBIDDEN,//禁止使用缓存作为该请求的响应,需要向源服务器请求 -REVALIDATE,//禁止使用未验证有效性的缓存作为该请求的响应 +VERIFY,//禁止使用未验证有效性的缓存作为该请求的响应 */ -enum cache_pending_action tfe_cache_get_pending(const struct tfe_http_half *request, struct request_freshness* restrict); +enum cache_pending_action tfe_cache_get_pending(const struct tfe_http_field *request, size_t n_fields,struct request_freshness* restrict); @@ -61,4 +94,4 @@ UNDEFINED = 0,//响应字段中未定义缓存的行为 ALLOWED ,//允许缓存该响应 FORBIDDEN,//禁止缓存该响应 */ -enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_half *response, struct response_freshness* freshness); +enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_field *response, size_t n_fields, struct response_freshness* freshness);
\ No newline at end of file diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index ca2f6c5..27aa420 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -17,7 +17,6 @@ #include "cache_evbase_client.h" #include "tango_cache_transfer.h" #include "tango_cache_tools.h" -#include "tango_cache_redis.h" enum CACHE_ASYN_CMD { @@ -39,6 +38,7 @@ struct databuffer size_t size; struct evbuffer *evbuf; enum CACHE_ASYN_CMD cmd_type; + enum OBJECT_LOCATION where_to_get; struct cache_evbase_ctx *ctx_asyn; }; @@ -61,11 +61,6 @@ struct tango_cache_result *cache_evbase_read_result(void *promise_result) return tango_cache_read_result(promise_result); } -void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx_asyn, char *path, size_t pathsize) -{ - tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); -} - static int create_notification_pipe(evutil_socket_t sv[2]) { if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1) @@ -161,7 +156,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) { case CACHE_ASYN_FETCH: p = ctx_asyn->ctx->promise; - if(tango_cache_fetch_start(ctx_asyn->ctx) < 0) + if(do_tango_cache_fetch_object(ctx_asyn->ctx, buffer->where_to_get) < 0) { promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); } @@ -169,14 +164,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) break; case CACHE_ASYN_HEAD: p = ctx_asyn->ctx->promise; - if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS) - { - ret = tango_cache_head_redis(ctx_asyn->ctx); - } - else - { - ret = tango_cache_fetch_start(ctx_asyn->ctx); - } + ret = do_tango_cache_head_object(ctx_asyn->ctx, buffer->where_to_get); if(ret<0) { promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); @@ -190,11 +178,11 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) break; case CACHE_ASYN_UPLOAD_ONCE_DATA: - tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true); + do_tango_cache_upload_once_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_ONCE_EVBUF: - tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true); + do_tango_cache_upload_once_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true); evbuffer_free(buffer->evbuf); cache_asyn_ctx_destroy(ctx_asyn); break; @@ -215,7 +203,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) break; case CACHE_ASYN_UPLOAD_END: - tango_cache_update_end(ctx_asyn->ctx); + do_tango_cache_update_end(ctx_asyn->ctx, true); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_CANCEL: @@ -271,24 +259,37 @@ static void* thread_listen_sockpair(void *arg) return NULL; } -void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) +int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize) { struct databuffer *buffer; + if(ctx_asyn->ctx->fail_state) + { + tango_cache_ctx_destroy(ctx_asyn->ctx, false); + cache_asyn_ctx_destroy(ctx_asyn); + return -1; + } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_END; - + + //ENDʱ����δ��ʼ�ֶ��ϴ��������ϴ�֮ǰ����locateһ��λ�� + ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size); + tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); + if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + { + cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size); + } + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - if(!ctx_asyn->ctx->fail_state) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - } - tango_cache_ctx_destroy(ctx_asyn->ctx); + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); + tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); + return -2; } + return 0; } void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn) @@ -317,8 +318,10 @@ int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_ME if(ctx_asyn->ctx->fail_state) { - return -1; + if(way == PUT_MEM_FREE) free((void *)data); + return 0;//��ʱ�Ⱥ��Է���ֵ���Իص���ʽ֪ͨ����������û���֪��ʱEND�����⡣ } + ctx_asyn->object_size += size; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); if(way == PUT_MEM_COPY) @@ -336,10 +339,7 @@ int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_ME if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - if(!ctx_asyn->ctx->fail_state) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - } + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); free(buffer->data); free(buffer); return -2; @@ -353,8 +353,9 @@ int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evb if(ctx_asyn->ctx->fail_state) { - return -1; + return 0; } + ctx_asyn->object_size += evbuffer_get_length(evbuf); buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF; @@ -363,10 +364,7 @@ int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evb if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - if(!ctx_asyn->ctx->fail_state) - { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - } + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); evbuffer_free(buffer->evbuf); free(buffer); return -2; @@ -379,13 +377,17 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance struct cache_evbase_ctx *ctx_asyn; struct tango_cache_ctx *ctx; struct databuffer *buffer; + enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; - ctx = tango_cache_update_prepare(instance->instance, f, meta); + if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) + { + maybe_loc = OBJECT_IN_MINIO; + } + ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc); if(ctx == NULL) { return NULL; } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; @@ -413,16 +415,12 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct struct tango_cache_ctx *ctx; struct databuffer *buffer; - ctx = tango_cache_update_prepare(instance->instance, f, meta); + ctx = tango_cache_update_once_prepare(instance->instance, f, meta, size, path, pathsize); if(ctx == NULL) { + if(way == PUT_MEM_FREE) free((void *)data); return -1; } - if(path != NULL) - { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key); - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; @@ -460,16 +458,11 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc struct tango_cache_ctx *ctx; struct databuffer *buffer; - ctx = tango_cache_update_prepare(instance->instance, f, meta); + ctx = tango_cache_update_once_prepare(instance->instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); if(ctx == NULL) { return -1; } - if(path != NULL) - { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key); - } - ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; @@ -492,14 +485,18 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc return 0; } -int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta) +int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; + if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) + { + where_to_get = OBJECT_IN_MINIO; + } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_GET, f, meta); + ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_GET, f, meta, where_to_get); if(ctx_asyn->ctx == NULL) { free(ctx_asyn); @@ -509,6 +506,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_FETCH; + buffer->where_to_get = where_to_get; if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { @@ -525,10 +523,15 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; + enum OBJECT_LOCATION location = OBJECT_IN_MINIO; + if(instance->instance->param->object_store_way != CACHE_ALL_MINIO) + { + location = OBJECT_IN_REDIS; + } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; - ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_HEAD, f, meta); + ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_HEAD, f, meta, location); if(ctx_asyn->ctx == NULL) { free(ctx_asyn); @@ -538,6 +541,7 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_HEAD; + buffer->where_to_get = location; if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { diff --git a/cache/src/object_store_client.cpp b/cache/src/object_store_client.cpp new file mode 100644 index 0000000..bd2ecd9 --- /dev/null +++ b/cache/src/object_store_client.cpp @@ -0,0 +1,145 @@ +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <net/if.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <sys/prctl.h> +#include <string.h> +#include <pthread.h> + +#include "object_store_client.h" +#include "tango_cache_tools.h" + +enum CACHE_ERR_CODE object_store_get_last_error(const struct cache_evbase_ctx *ctx) +{ + return cache_evbase_get_last_error(ctx); +} + +void object_store_get_statistics(const struct object_store_instance *instance, struct cache_statistics *out) +{ + struct cache_statistics out_cache; + + memset(out, 0, sizeof(struct cache_statistics)); + for(u_int32_t i=0; i<instance->instance_num; i++) + { + cache_evbase_get_statistics(instance->instances[i], &out_cache); + + out->del_error_num += out_cache.del_error_num; + out->del_recv_num += out_cache.del_recv_num; + out->del_succ_num += out_cache.del_succ_num; + out->get_err_http += out_cache.get_err_http; + out->get_err_redis += out_cache.get_err_redis; + out->get_miss_num += out_cache.get_miss_num; + out->get_recv_num += out_cache.get_recv_num; + out->get_succ_http += out_cache.get_succ_http; + out->get_succ_redis+= out_cache.get_succ_redis; + out->put_err_http += out_cache.put_err_http; + out->put_err_redis += out_cache.put_err_redis; + out->put_recv_num += out_cache.put_recv_num; + out->put_succ_http += out_cache.put_succ_http; + out->put_succ_redis+= out_cache.put_succ_redis; + out->session_http += out_cache.session_http; + out->session_redis += out_cache.session_redis; + out->memory_used += out_cache.memory_used; + out->totaldrop_num += out_cache.totaldrop_num; + } +} + +struct tango_cache_result *object_store_read_result(void *promise_result) +{ + return cache_evbase_read_result(promise_result); +} + +int object_store_update_end(struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize) +{ + return cache_evbase_update_end(ctx, path, pathsize); +} + +void object_store_update_cancel(struct cache_evbase_ctx *ctx) +{ + cache_evbase_update_cancel(ctx); +} + +int object_store_update_frag_data(struct cache_evbase_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) +{ + return cache_evbase_update_frag_data(ctx, way, data, size); +} + +int object_store_update_frag_evbuf(struct cache_evbase_ctx *ctx, struct evbuffer *evbuf) +{ + return cache_evbase_update_frag_evbuf(ctx, evbuf); +} + +struct cache_evbase_ctx *object_store_update_start(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_put *meta) +{ + return cache_evbase_update_start(instance->instances[rand()%instance->instance_num], f, meta); +} + +int object_store_upload_once_data(struct object_store_instance *instance, struct future* f, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) +{ + return cache_evbase_upload_once_data(instance->instances[rand()%instance->instance_num], f, way, data, size, meta, path, pathsize); +} + +int object_store_upload_once_evbuf(struct object_store_instance *instance, struct future* f, + struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) +{ + return cache_evbase_upload_once_evbuf(instance->instances[rand()%instance->instance_num], f, evbuf, meta, path, pathsize); +} + +int object_store_fetch_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) +{ + return cache_evbase_fetch_object(instance->instances[rand()%instance->instance_num], f, meta, where_to_get); +} + +int object_store_head_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta) +{ + return cache_evbase_head_object(instance->instances[rand()%instance->instance_num], f, meta); +} + +int object_store_delete_object(struct object_store_instance *instance, struct future* f, const char *objkey) +{ + return cache_evbase_delete_object(instance->instances[rand()%instance->instance_num], f, objkey); +} + +struct object_store_instance *object_store_instance_new(const char* profile_path, const char* section, int thread_num, void *runtimelog) +{ + struct object_store_instance *object_instance; + struct tango_cache_parameter *parameter; + + parameter = tango_cache_parameter_new(profile_path, section, runtimelog); + if(parameter == NULL) + { + return NULL; + } + + object_instance = (struct object_store_instance *)calloc(1, sizeof(struct object_store_instance)); + object_instance->instance_num = thread_num; + object_instance->instances = (struct cache_evbase_instance **)calloc(1, sizeof(struct cache_evbase_instance *)*object_instance->instance_num); + + for(u_int32_t i=0; i<object_instance->instance_num; i++) + { + object_instance->instances[i] = cache_evbase_instance_new(parameter, runtimelog); + if(object_instance->instances[i] == NULL) + { + free(parameter); + free(object_instance); + return NULL; + } + } + srandom(time(NULL)); + return object_instance; +} + +void object_store_global_init(void) +{ + cache_evbase_global_init(); +} + diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 3a93a17..7010e8b 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -86,35 +86,25 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) { switch(ctx->error_code) { - case CACHE_CACHE_MISS: return "cache not hit"; - case CACHE_TIMEOUT: return "cache not fresh"; - case CACHE_OUTOF_MEMORY:return "outof memory"; - case CACHE_ERR_WIREDLB: return "wiredlb error"; - case CACHE_ERR_SOCKPAIR:return "socketpair error"; - case CACHE_ERR_INTERNAL:return "internal error"; - case CACHE_ERR_REDIS_JSON:return "parse redis json error"; + case CACHE_CACHE_MISS: return "cache not hit"; + case CACHE_TIMEOUT: return "cache not fresh"; + case CACHE_OUTOF_MEMORY: return "outof memory"; + case CACHE_ERR_WIREDLB: return "wiredlb error"; + case CACHE_ERR_SOCKPAIR: return "socketpair error"; + case CACHE_ERR_INTERNAL: return "internal error"; + case CACHE_ERR_REDIS_JSON: return "parse redis json error"; case CACHE_ERR_REDIS_CONNECT:return "redis is not connected"; - case CACHE_OUTOF_SESSION:return "two many curl sessions"; - case CACHE_UPDATE_CANCELED:return "update was canceled"; + case CACHE_ERR_REDIS_EXEC: return "redis command reply error"; + case CACHE_OUTOF_SESSION: return "two many curl sessions"; + case CACHE_UPDATE_CANCELED: return "update was canceled"; + case CACHE_ERR_EVBUFFER: return "evbuffer read write error"; default: return ctx->error; } } void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out) { - out->get_recv_num = instance->statistic.get_recv_num; - out->get_succ_num = instance->statistic.get_succ_num; - out->get_error_num= instance->statistic.get_error_num; - out->get_miss_num = instance->statistic.get_miss_num; - out->put_recv_num = instance->statistic.put_recv_num; - out->put_succ_num = instance->statistic.put_succ_num; - out->put_error_num= instance->statistic.put_error_num; - out->del_recv_num = instance->statistic.del_recv_num; - out->del_succ_num = instance->statistic.del_succ_num; - out->del_error_num= instance->statistic.del_error_num; - out->totaldrop_num= instance->statistic.totaldrop_num; - out->session_num = instance->statistic.session_num; - out->memory_used = instance->statistic.memory_used; + *out = instance->statistic; } struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result) @@ -122,11 +112,6 @@ struct tango_cache_result *tango_cache_read_result(future_result_t *promise_resu return (struct tango_cache_result *)promise_result; } -void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize) -{ - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); -} - static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic) { switch(ctx->method) @@ -134,11 +119,17 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti case CACHE_REQUEST_PUT: if(ctx->fail_state) { - statistic->put_error_num += 1; + if(ctx->locate == OBJECT_IN_MINIO) + statistic->put_err_http += 1; + else + statistic->put_err_redis += 1; } else { - statistic->put_succ_num += 1; + if(ctx->locate == OBJECT_IN_MINIO) + statistic->put_succ_http += 1; + else + statistic->put_succ_redis += 1; } break; case CACHE_REQUEST_GET: @@ -147,12 +138,17 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti { if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT) statistic->get_miss_num += 1; + else if(ctx->locate == OBJECT_IN_MINIO) + statistic->get_err_http += 1; else - statistic->get_error_num += 1; + statistic->get_err_redis += 1; } else { - statistic->get_succ_num += 1; + if(ctx->locate == OBJECT_IN_MINIO) + statistic->get_succ_http += 1; + else + statistic->get_succ_redis += 1; } break; case CACHE_REQUEST_DELETE: @@ -218,6 +214,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) case CACHE_REQUEST_PUT: if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); + if(ctx->put.object_meta == NULL) cJSON_Delete(ctx->put.object_meta); if(ctx->put.evbuf!=NULL) { ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); @@ -253,9 +250,61 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) free(ctx); } -void tango_cache_update_end(struct tango_cache_ctx *ctx) +//�ж�session�Ƿ����ƣ�����ȡ��ʼ���ж�where_to_get�Ƿ�ȫ����MINIO�� +bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get) +{ + if(where_to_get == OBJECT_IN_MINIO) + { + return (instance->statistic.session_http>=instance->param->maximum_sessions); + } + else + { + return (instance->statistic.session_redis>=instance->param->maximum_sessions); + } +} + + +//�����ϴ�API��ʹ��ctx��evbuffer������������ctx��ȡ���� +enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size) +{ + if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize) + { + return OBJECT_IN_MINIO; + } + else + { + return OBJECT_IN_REDIS; + } +} + +void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) { - cache_kick_upload_minio_end(ctx); + if(path != NULL) + { + if(ctx->locate == OBJECT_IN_MINIO) + { + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); + } + else + { + snprintf(path, pathsize, "redis://%s/%s/%s", ctx->instance->redisaddr, ctx->instance->param->bucketname, ctx->object_key); + } + } +} + +int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) +{ + if(!ctx->fail_state) + { + ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size); + tango_cache_get_object_path(ctx, path, pathsize); + + if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + { + cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size); + } + } + return do_tango_cache_update_end(ctx, false); } void tango_cache_update_cancel(struct tango_cache_ctx *ctx) @@ -268,7 +317,6 @@ void tango_cache_update_cancel(struct tango_cache_ctx *ctx) ctx->curl = NULL; } tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED); - //�Ѿ������ֶ��ϴ�IDʱ������cancelɾ�� if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx)) { ctx->put.state = PUT_STATE_CANCEL; @@ -291,6 +339,7 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, return 0; } ctx->instance->statistic.memory_used += size; + ctx->put.object_size += size; if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); @@ -312,7 +361,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP { if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); return 0; } } @@ -320,11 +369,12 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP { if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); return 0; } } ctx->instance->statistic.memory_used += size; + ctx->put.object_size += size; if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); @@ -332,13 +382,14 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP return 0; } -struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta) +struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc) { struct tango_cache_ctx *ctx; - char buffer[2064]; + char buffer[2064], *user_tag=NULL, *user_tag_value=NULL; time_t expires, now, last_modify; + struct easy_string hdr_estr={NULL, 0, 0}; - if((u_int64_t)instance->statistic.memory_used>=instance->param->cache_limit_size || instance->statistic.session_num>=instance->param->max_session_num) + if(sessions_exceeds_limit(instance, maybe_loc) || (u_int64_t)instance->statistic.memory_used>=instance->param->maximum_used_mem) { instance->error_code = CACHE_OUTOF_MEMORY; instance->statistic.totaldrop_num += 1; @@ -349,6 +400,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * ctx->instance = instance; ctx->promise = future_to_promise(f); ctx->method = CACHE_REQUEST_PUT; + ctx->locate = maybe_loc; if(instance->param->hash_object_key) { @@ -366,6 +418,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; + if(ctx->headers!=NULL) curl_slist_free_all(ctx->headers); free(ctx); return NULL; } @@ -377,6 +430,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { ctx->headers = curl_slist_append(ctx->headers, buffer); } + ctx->put.object_ttl = expires; //Last-Modify�ֶΣ�����GETʱ�ж��Ƿ����� last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified; if(last_modify == 0) @@ -391,21 +445,51 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if(meta->std_hdr[i] != NULL) { ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); + if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + { + easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i])); + easy_string_savedata(&hdr_estr, "\r\n", 2); + } } } if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL) { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); + if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + { + easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n")); + } } ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE //���������ͷ����GETʱ��ԭ������ if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) { - char *p = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //������������ռ䣻18=17+1: ͷ��+�ַ��������� - memcpy(p, "x-amz-meta-user: ", 17); - Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)p+17); - ctx->headers = curl_slist_append(ctx->headers, p); - free(p); + user_tag = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //������������ռ䣻18=17+1: ͷ��+�ַ��������� + memcpy(user_tag, "x-amz-meta-user: ", 17); + user_tag_value = user_tag+17; + Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)user_tag_value); + ctx->headers = curl_slist_append(ctx->headers, user_tag); + } + + if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + { + ctx->put.object_meta = cJSON_CreateObject(); + if(instance->param->hash_object_key) + { + cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-Url", meta->url); + } + cJSON_AddNumberToObject(ctx->put.object_meta, "Expires", now + expires); + cJSON_AddNumberToObject(ctx->put.object_meta, "X-Amz-Meta-Lm", last_modify); + cJSON_AddStringToObject(ctx->put.object_meta, "Headers", hdr_estr.buff); + if(user_tag_value != NULL) + { + cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-User", user_tag_value); + } + easy_string_destroy(&hdr_estr); + } + if(user_tag != NULL) + { + free(user_tag); } ctx->put.evbuf = evbuffer_new(); @@ -415,9 +499,15 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta) { - struct tango_cache_ctx *ctx; + struct tango_cache_ctx *ctx; + enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; + + if(instance->param->object_store_way != CACHE_SMALL_REDIS) + { + maybe_loc = OBJECT_IN_MINIO; + } - ctx = tango_cache_update_prepare(instance, f, meta); + ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc); if(ctx == NULL) { return NULL; @@ -427,22 +517,40 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in return ctx; } -int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) +//һ�����ϴ�ʱ��ֱ�Ӷ�λ�����ϴ���λ�� +struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, + size_t object_size, char *path, size_t pathsize) { - struct tango_cache_ctx *ctx; + struct tango_cache_ctx *ctx; + enum OBJECT_LOCATION location; - ctx = tango_cache_update_prepare(instance, f, meta); + location = tango_cache_object_locate(instance, object_size); + ctx = tango_cache_update_prepare(instance, f, meta, location); if(ctx == NULL) { - return -1; + return NULL; } - if(path != NULL) + tango_cache_get_object_path(ctx, path, pathsize); + + if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key); + cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size); } + return ctx; +} - return tango_cache_upload_once_start_data(ctx, way, data, size); +int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) +{ + struct tango_cache_ctx *ctx; + + ctx = tango_cache_update_once_prepare(instance, f, meta, size, path, pathsize); + if(ctx == NULL) + { + if(way == PUT_MEM_FREE) free((void *)data); + return -1; + } + return do_tango_cache_upload_once_data(ctx, way, data, size); } int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f, @@ -450,25 +558,21 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct { struct tango_cache_ctx *ctx; - ctx = tango_cache_update_prepare(instance, f, meta); + ctx = tango_cache_update_once_prepare(instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); if(ctx == NULL) { return -1; } - if(path != NULL) - { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key); - } - - return tango_cache_upload_once_start_evbuf(ctx, way, evbuf); + return do_tango_cache_upload_once_evbuf(ctx, way, evbuf); } -struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta) +struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, + struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) { struct tango_cache_ctx *ctx; char sha256[72]; - if(instance->param->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->param->max_session_num) + if(sessions_exceeds_limit(instance, where_to_get)) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -478,7 +582,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->promise = future_to_promise(f); - promise_allow_many_successes(ctx->promise); + promise_allow_many_successes(ctx->promise); //��λص��������ʱ����promise_finish ctx->method = method; ctx->get.state = GET_STATE_START; ctx->get.max_age = meta->get.max_age; @@ -503,36 +607,36 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i return ctx; } -int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta) +int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) { struct tango_cache_ctx *ctx; - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta); + if(instance->param->object_store_way != CACHE_SMALL_REDIS) + { + where_to_get = OBJECT_IN_MINIO; + } + + ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get); if(ctx == NULL) { return -1; } - return (tango_cache_fetch_start(ctx)==1)?0:-1; + return do_tango_cache_fetch_object(ctx, where_to_get); } int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta) { struct tango_cache_ctx *ctx; + enum OBJECT_LOCATION location; - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta); + //���������Redis����Ԫ��Ϣ�洢��Redis�� + location = (instance->param->object_store_way != CACHE_ALL_MINIO)?OBJECT_IN_REDIS:OBJECT_IN_MINIO; + ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location); if(ctx == NULL) { return -1; } - - if(instance->param->head_meta_source == HEAD_META_FROM_REDIS) - { - return tango_cache_head_redis(ctx); - } - else - { - return (tango_cache_fetch_start(ctx)==1)?0:-1; - } + return do_tango_cache_head_object(ctx, location); } struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey) @@ -540,7 +644,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * struct tango_cache_ctx *ctx; char sha256[72]; - if(instance->statistic.session_num >= instance->param->max_session_num) + if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO)) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -588,7 +692,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst struct tango_cache_ctx *ctx; char md5[48]={0}, content_md5[48]; - if(instance->statistic.session_num >= instance->param->max_session_num) + if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO)) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -628,7 +732,7 @@ int tango_cache_multi_delete(struct tango_cache_instance *instance, struct futur { return -1; } - return tango_cache_multi_delete_start(ctx); + return do_tango_cache_multi_delete(ctx); } static void check_multi_info(CURLM *multi) @@ -686,7 +790,7 @@ static void libevent_socket_event_cb(int fd, short action, void *userp) what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0); rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running); - instance->statistic.session_num = still_running; + instance->statistic.session_http = still_running; assert(rc==CURLM_OK); check_multi_info(instance->multi_hd); @@ -704,7 +808,7 @@ static void libevent_timer_event_cb(int fd, short kind, void *userp) int still_running; rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_num = still_running; + instance->statistic.session_http = still_running; assert(rc==CURLM_OK); check_multi_info(instance->multi_hd); } @@ -758,7 +862,7 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) //timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once. //To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs. rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_num = still_running; + instance->statistic.session_http = still_running; assert(rc==CURLM_OK); } else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer. @@ -773,18 +877,18 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) return 0; //0-success; -1-error } -static int wired_load_balancer_init(const char *topic, const char *datacenter, int override, struct wiredlb_parameter *wparam, void *runtime_log) +static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log) { - wparam->wiredlb = wiredLB_create(topic, wparam->wiredlb_group, WLB_PRODUCER); + wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER); if(wparam->wiredlb == NULL) { MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); return -1; } wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port)); - wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &override, sizeof(override)); - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, datacenter, strlen(datacenter)+1); - if(override) + wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override)); + wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1); + if(wparam->wiredlb_override) { wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1); wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port)); @@ -807,17 +911,17 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path //multi curl MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1); - param->max_cnn_host = intval; + param->maximum_host_cnns = intval; MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20); - param->max_pipeline_num = intval; + param->maximum_pipelines = intval; MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 0); param->transfer_timeout = intval; //instance - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", ¶m->max_session_num, 200); + MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", ¶m->maximum_sessions, 100); MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); longval = intval; - param->cache_limit_size = longval * 1024 * 1024; + param->maximum_used_mem = longval * 1024 * 1024; MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1); if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0) { @@ -830,7 +934,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); return NULL; } - MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999); + MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 604800); if(intval < 60) { MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); @@ -838,47 +942,41 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path } param->relative_ttl = intval; - //wiredlb common - MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->wiredlb_datacenter, 64, "ASTANA"); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->wiredlb_override, 1); - - //wiredlb minio - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_MINIO_HEALTH_PORT", &intval, 52100); + //wiredlb + MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64, "ASTANA"); + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->minio.wiredlb_override, 1); + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); param->minio.wiredlb_ha_port = intval; - MESA_load_profile_string_def(profile_path, section, "WIREDLB_MINIO_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP"); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP"); MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", ¶m->minio.port, 9000); if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0) { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.", profile_path, section); return NULL; } - if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, ¶m->minio, runtime_log)) + if(wired_load_balancer_init(¶m->minio, runtime_log)) { return NULL; } - //wiredlb redis - MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", ¶m->head_meta_source, HEAD_META_FROM_MINIO); - if(param->head_meta_source == HEAD_META_FROM_REDIS) + MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_MINIO); + if(param->object_store_way!=CACHE_ALL_MINIO && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS) { - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_REDIS_HEALTH_PORT", &intval, 0); - param->redis.wiredlb_ha_port = intval; - MESA_load_profile_string_def(profile_path, section, "WIREDLB_REDIS_GROUP", param->redis.wiredlb_group, 64, "REDIS_GROUP"); - MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", param->redis_key, 256, param->bucketname); - MESA_load_profile_uint_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", ¶m->redis.port, 6379); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IPLIST", param->redis.iplist, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IPLIST not found.\n", profile_path, section); - return NULL; - } - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_MAIN_REDIS_IP", param->redis.mainip, 64) < 0) + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section); + return NULL; + } + if(param->object_store_way != CACHE_ALL_MINIO) + { + if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_ADDRS", param->redisaddrs, 4096) < 0) { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_MAIN_REDIS_IP not found.\n", profile_path, section); + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_ADDRS not found.", profile_path, section); return NULL; } - if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, ¶m->redis, runtime_log)) + MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", ¶m->redis_object_maxsize, 10240); + if(param->redis_object_maxsize >= param->upload_block_size) { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CACHE_OBJECT_SIZE must be smaller than CACHE_UPLOAD_BLOCK_SIZE.", profile_path, section); return NULL; } } @@ -888,6 +986,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog) { struct tango_cache_instance *instance; + char *redis_sep, *save_ptr=NULL; instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance)); memset(instance, 0, sizeof(struct tango_cache_instance)); @@ -897,22 +996,23 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet instance->multi_hd = curl_multi_init(); curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->max_cnn_host); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->max_pipeline_num); + curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->maximum_host_cnns); + curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->maximum_pipelines); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); - if(param->head_meta_source == HEAD_META_FROM_REDIS) + if(param->object_store_way != CACHE_ALL_MINIO) { - if(redis_asyn_connect_init(instance, instance->param->redis.mainip)) + if(redis_asyn_connect_init(instance)) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", - instance->current_redisip, instance->param->redis.port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s failed.", instance->param->redisaddrs); free(instance); return NULL; - } + } + redis_sep = strtok_r(param->redisaddrs, ",", &save_ptr); + sprintf(instance->redisaddr, "%s", redis_sep); } evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); return instance; diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index 8a76868..5f0a938 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -6,7 +6,9 @@ #include <event2/event.h> #include <event.h> -#include <hiredis/async.h> +#include <hiredis-vip/async.h> +#include <hiredis-vip/hircluster.h> +#include <cjson/cJSON.h> #include <MESA/wiredLB.h> #include "tango_cache_client.h" @@ -15,8 +17,9 @@ #define RESPONSE_HDR_LAST_MOD 2 #define RESPONSE_HDR_ALL 3 -#define HEAD_META_FROM_MINIO 1 -#define HEAD_META_FROM_REDIS 2 +#define CACHE_ALL_MINIO 0 //Ԫ��Ϣ�Ͷ�����MINIO +#define CACHE_META_REDIS 1 //Ԫ��Ϣ��REDIS������MINIO +#define CACHE_SMALL_REDIS 2 //Ԫ��Ϣ��С�ļ���REDIS�����ļ���MINIO enum CACHE_REQUEST_METHOD { @@ -31,6 +34,9 @@ enum GET_OBJECT_STATE { GET_STATE_START=0, GET_STATE_DELETE, + GET_STATE_REDIS_META, + GET_STATE_REDIS_ALL, + GET_STATE_REDIS_TRY, GET_STATE_END, }; @@ -39,8 +45,10 @@ enum PUT_OBJECT_STATE PUT_STATE_START=0, PUT_STATE_WAIT_START, PUT_STATE_PART, - PUT_STATE_END, PUT_STATE_CANCEL, + PUT_STATE_REDIS_META, + PUT_STATE_REDIS_ALL, + PUT_STATE_END, }; struct easy_string @@ -52,35 +60,33 @@ struct easy_string struct wiredlb_parameter { + char wiredlb_topic[64]; + char wiredlb_datacenter[64]; char wiredlb_group[64]; - char mainip[64]; //Ĭ�Ϸ��ʵ�redis��ַ - char iplist[4096];//minio: minio�б���redis: mainip���˺�ѡ���б�������mainip + char iplist[4096];//minio�б� + WLB_handle_t wiredlb; + u_int32_t wiredlb_override; u_int32_t port; short wiredlb_ha_port; - WLB_handle_t wiredlb; }; struct tango_cache_parameter { char bucketname[256]; char redis_key[256]; - long max_cnn_host; + long maximum_host_cnns; long transfer_timeout;//������ʱ������ - long max_pipeline_num; - u_int64_t cache_limit_size; - u_int32_t max_session_num; + long maximum_pipelines; + u_int64_t maximum_used_mem; + u_int32_t maximum_sessions; u_int32_t upload_block_size; //minio�ֶ��ϴ������С���� time_t relative_ttl; //����������Ч�� u_int32_t hash_object_key; - //wiredlb - int head_meta_source; //���Դ�MINIO��REDIS��ȡԪ��Ϣ - u_int32_t wiredlb_override; - char wiredlb_topic[64]; - char wiredlb_datacenter[64]; - + int object_store_way; //��ȡobject��Ϣ�ķ�ʽ struct wiredlb_parameter minio; - struct wiredlb_parameter redis; + char redisaddrs[4096]; + u_int32_t redis_object_maxsize;//С�ļ�����redisʱ�����������С }; struct tango_cache_instance @@ -90,11 +96,9 @@ struct tango_cache_instance CURLM *multi_hd; enum CACHE_ERR_CODE error_code; - //Ԫ��Ϣ��ȡ��ʽRedis int redis_connecting; - redisAsyncContext *redis_ac; - char current_redisip[64]; - struct event timer_redis; + redisClusterAsyncContext *redis_ac; + char redisaddr[128]; const struct tango_cache_parameter *param; void *runtime_log; @@ -108,6 +112,8 @@ struct multipart_etag_list TAILQ_ENTRY(multipart_etag_list) node; }; +typedef void (redisRedirectMinioCallback)(struct tango_cache_ctx *ctx); + struct cache_ctx_data_get { time_t max_age; @@ -118,6 +124,7 @@ struct cache_ctx_data_get enum GET_OBJECT_STATE state; struct easy_string response_tag; struct tango_cache_result result; + redisRedirectMinioCallback *redis_redirect_minio_cb; }; struct cache_ctx_data_put @@ -128,9 +135,12 @@ struct cache_ctx_data_put char *uploadID; char *combine_xml; TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head; + cJSON *object_meta; enum PUT_OBJECT_STATE state; u_int32_t part_index; //��RESPONSE_HDR_ + u_int32_t object_ttl; bool close_state; //���������ùر� + size_t object_size; }; struct cache_ctx_multi_delete @@ -153,6 +163,7 @@ struct tango_cache_ctx struct easy_string response; bool fail_state; + enum OBJECT_LOCATION locate; //�ɳ��������϶�����λ�� long res_code; union{ @@ -177,9 +188,19 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback=true); void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code); const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx); -struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta); -struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta); +bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get); + +struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, + struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc); +struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, + enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get); struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey); + +enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size); +void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); +struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, + struct future* f, struct tango_cache_meta_put *meta, size_t object_size, char *path, size_t pathsize); + #endif diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index d4e6edf..dbd1537 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -8,9 +8,9 @@ #include <time.h> #include <string.h> -#include <hiredis/hiredis.h> -#include <hiredis/async.h> -#include <hiredis/adapters/libevent.h> +#include <hiredis-vip/hircluster.h> +#include <hiredis-vip/async.h> +#include <hiredis-vip/adapters/libevent.h> #include <cjson/cJSON.h> #include "tango_cache_transfer.h" @@ -26,95 +26,21 @@ #define CACHE_REDIS_CONNECTED 2 #define CACHE_REDIS_DISCONNECTED 3 -struct http_hdr_name -{ - const char *json_name; - const char *http_name; -}; -struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]= -{ - {"content-type", "Content-Type: "}, - {"content-encoding", "Content-Encoding: "}, - {"content-disposition", "Content-Disposition: "}, - {"content-md5", "Content-MD5: "} -}; - -//һ��mainip���ӳɹ������л����� -static void main_redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); - - if(status == REDIS_OK) - { - evtimer_del(&instance->timer_redis); - if(instance->redis_connecting == CACHE_REDIS_CONNECTED) - { - redisAsyncDisconnect(instance->redis_ac); - } - sprintf(instance->current_redisip, "%s", instance->param->redis.mainip); - instance->redis_ac = (struct redisAsyncContext *)ac; - instance->redis_connecting = CACHE_REDIS_CONNECTED; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", - instance->param->redis.mainip, instance->param->redis.port); - } - else - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", - instance->param->redis.mainip, instance->param->redis.port, ac->errstr); - } -} - static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); if(status == REDIS_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", - instance->current_redisip, instance->param->redis.port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d success.", + ac->c.tcp.host, ac->c.tcp.port); } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", - instance->current_redisip, instance->param->redis.port, ac->errstr); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d failed: %s.", + ac->c.tcp.host, ac->c.tcp.port, ac->errstr); } instance->redis_connecting = CACHE_REDIS_DISCONNECTED; - - if(!strcmp(instance->current_redisip, instance->param->redis.mainip)) - { - main_redis_check_timer_start(instance); - } -} - -void main_redis_check_timer_cb(evutil_socket_t fd, short what, void *arg) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)arg; - redisAsyncContext *redis_ac; - struct timeval tv; - - redis_ac = redisAsyncConnect(instance->param->redis.mainip, instance->param->redis.port); - if(redis_ac == NULL) - { - return ; - } - redisLibeventAttach(redis_ac, instance->evbase); - redisAsyncSetConnectionData(redis_ac, instance); - redisAsyncSetConnectCallback(redis_ac, main_redis_asyn_connect_cb); - redisAsyncSetDisconnectCallback(redis_ac, redis_asyn_disconnect_cb); - - tv.tv_sec = 60; - tv.tv_usec = 0; - evtimer_add(&instance->timer_redis, &tv); -} - -void main_redis_check_timer_start(struct tango_cache_instance *instance) -{ - struct timeval tv; - - tv.tv_sec = 60; - tv.tv_usec = 0; - evtimer_assign(&instance->timer_redis, instance->evbase, main_redis_check_timer_cb, instance); - evtimer_add(&instance->timer_redis, &tv); } static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) @@ -123,129 +49,77 @@ static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status if(status == REDIS_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", - instance->current_redisip, instance->param->redis.port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d success.", + ac->c.tcp.host, ac->c.tcp.port); instance->redis_connecting = CACHE_REDIS_CONNECTED; } else { instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", - instance->current_redisip, instance->param->redis.port, ac->errstr); - if(!strcmp(instance->current_redisip, instance->param->redis.mainip)) - { - main_redis_check_timer_start(instance); - } + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d failed: %s.", + ac->c.tcp.host, ac->c.tcp.port, ac->errstr); } } -int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip) +int redis_asyn_connect_init(struct tango_cache_instance *instance) { - sprintf(instance->current_redisip, "%s", redisip); //mainip�õ�ʱ��ʹ��mainip - - instance->redis_ac = redisAsyncConnect(instance->current_redisip, instance->param->redis.port); + instance->redis_ac = redisClusterAsyncConnect(instance->param->redisaddrs, HIRCLUSTER_FLAG_ROUTE_USE_SLOTS); if(instance->redis_ac == NULL) { return -1; } instance->redis_connecting = CACHE_REDIS_CONNECTING; - redisLibeventAttach(instance->redis_ac, instance->evbase); - redisAsyncSetConnectionData(instance->redis_ac, instance); - redisAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb); - redisAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb); - return 0; -} - -int wiredlb_redis_asyn_connect(struct tango_cache_instance *instance) -{ - struct WLB_consumer_t cons_array[64]; - int i, cons_num; - - cons_num = wiredLB_list(instance->param->redis.wiredlb, 64, cons_array); - for(i=0; i<cons_num; i++) - { - if(strcmp(instance->param->redis.mainip, cons_array[i].ip_addr)) - { - if(0==redis_asyn_connect_init(instance, cons_array[i].ip_addr)) - { - break; - } - } - } - if(i == cons_num) - { - return -1; - } + redisClusterLibeventAttach(instance->redis_ac, instance->evbase); + redisClusterAsyncSetConnectionData(instance->redis_ac, instance); + redisClusterAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb); + redisClusterAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb); return 0; } -static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent) +static int parse_object_meta_json(struct tango_cache_ctx *ctx, const char *jcontent) { - cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires; + cJSON *root, *ptarget; int ret = PARSE_JSON_RET_ERROR; char usertag[2048]; size_t datalen; - //Records[0]->s3->object->key...userMetaData->metas... if(NULL == (root=cJSON_Parse(jcontent))) { goto out_json; } - if(NULL==(pobject=cJSON_GetObjectItem(root, "Records")) || pobject->type!=cJSON_Array) - { - goto out_json; - } - if(NULL == (pobject=cJSON_GetArrayItem(pobject, 0))) //��һ������Ԫ�أ�һ��ֻ��һ�� - { - goto out_json; - } - if(NULL == (pobject=cJSON_GetObjectItem(pobject, "s3")) || pobject->type!=cJSON_Object) - { - goto out_json; - } - if(NULL == (pobject=cJSON_GetObjectItem(pobject, "object")) || pobject->type!=cJSON_Object) - { - goto out_json; - } - //��ȡ��� - if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "size")) || ptarget->type!=cJSON_Number) + if(NULL == (ptarget=cJSON_GetObjectItem(root, "Content-Length"))) { goto out_json; } ctx->get.result.tlength = ptarget->valuedouble; - if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "userMetadata")) || ptarget->type!=cJSON_Object) + if(NULL==(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-Lm"))) { goto out_json; } - if(NULL==(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-Lm")) || NULL==(pexpires=cJSON_GetObjectItem(ptarget, "expires"))) + ctx->get.last_modify = ptarget->valuedouble; + if(NULL==(ptarget=cJSON_GetObjectItem(root, "Expires"))) { goto out_json; } + ctx->get.expires = ptarget->valuedouble; ctx->get.need_hdrs = RESPONSE_HDR_ALL; - ctx->get.last_modify = atol(plastMod->valuestring); - ctx->get.expires = expires_hdr2timestamp(pexpires->valuestring, strlen(pexpires->valuestring)); if(!check_expires_fresh_header(ctx)) { ret = PARSE_JSON_RET_TIMEOUT; goto out_json; } - if(NULL!=(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-User"))) + if(NULL!=(ptarget=cJSON_GetObjectItem(root, "Headers"))) { - if((datalen = Base64_DecodeBlock((unsigned char*)plastMod->valuestring, strlen(plastMod->valuestring), (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->get.response_tag, usertag, datalen); - } + easy_string_savedata(&ctx->response, ptarget->valuestring, strlen(ptarget->valuestring)); } - for(int i=0; i<HDR_CONTENT_NUM; i++) + if(NULL!=(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-User"))) { - if(NULL != (plastMod=cJSON_GetObjectItem(ptarget, g_http_hdr_name[i].json_name))) + if((datalen = Base64_DecodeBlock((unsigned char*)ptarget->valuestring, strlen(ptarget->valuestring), (unsigned char*)usertag, 2048))>0) { - easy_string_savedata(&ctx->response, g_http_hdr_name[i].http_name, strlen(g_http_hdr_name[i].http_name)); - easy_string_savedata(&ctx->response, plastMod->valuestring, strlen(plastMod->valuestring)); - easy_string_savedata(&ctx->response, "\r\n", strlen("\r\n")); + easy_string_savedata(&ctx->get.response_tag, usertag, datalen); } - } + } cJSON_Delete(root); return PARSE_JSON_RET_SUCC; @@ -254,38 +128,59 @@ out_json: return ret; } -static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata) +static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) { redisReply *reply = (redisReply *)vreply; struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; int ret; - if(reply == NULL || reply->type!=REDIS_REPLY_STRING) + ctx->instance->statistic.session_redis -= 1; + if(reply == NULL || reply->type!=REDIS_REPLY_ARRAY) { - if(reply!=NULL && reply->type == REDIS_REPLY_NIL) + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); + if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) { - tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str); } else { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); - if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str); - } - else - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } tango_cache_ctx_destroy(ctx); return; } + else if(reply->element[0]->type == REDIS_REPLY_NIL) + { + tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); + ctx->get.result.type = RESULT_TYPE_MISS; + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); + tango_cache_ctx_destroy(ctx); + return; + } + + switch(ctx->get.state) + { + case GET_STATE_REDIS_META: + ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS; + break; + case GET_STATE_REDIS_ALL: + ctx->get.result.location = OBJECT_IN_REDIS; + break; - ret = parse_minio_events_json(ctx, reply->str); + case GET_STATE_REDIS_TRY: + ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS; + if(ctx->get.result.location == OBJECT_IN_MINIO) + { + ctx->get.redis_redirect_minio_cb(ctx); + return; + } + ctx->locate = OBJECT_IN_REDIS; + break; + default: assert(0);break; + } + + ret = parse_object_meta_json(ctx, reply->element[0]->str); switch(ret) { case PARSE_JSON_RET_ERROR: @@ -294,14 +189,25 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo tango_cache_ctx_destroy(ctx); break; case PARSE_JSON_RET_TIMEOUT: - if(ctx->get.state == GET_STATE_DELETE) + if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_MINIO) { ctx->get.state = GET_STATE_END; cache_delete_minio_object(ctx); } + else + { + tango_cache_ctx_destroy(ctx); + } break; case PARSE_JSON_RET_SUCC: fetch_header_over_biz(ctx); + if(ctx->get.state != GET_STATE_REDIS_META) + { + ctx->get.result.data_frag = reply->element[2]->str; + ctx->get.result.size = reply->element[2]->len; + ctx->get.result.type = RESULT_TYPE_BODY; + promise_success(ctx->promise, &ctx->get.result); + } ctx->get.result.type = RESULT_TYPE_END; promise_success(ctx->promise, &ctx->get.result); promise_finish(ctx->promise); @@ -315,32 +221,163 @@ int tango_cache_head_redis(struct tango_cache_ctx *ctx) { int ret = -1; - ctx->instance->statistic.get_recv_num += 1; - switch(ctx->instance->redis_connecting) + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HMGET %s/%s OBJECT_META OBJECT_LOCATION", + ctx->instance->param->bucketname, ctx->object_key); + if(ret != REDIS_OK) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + tango_cache_ctx_destroy(ctx); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->get.state = GET_STATE_REDIS_META; + } + return ret; +} + +int tango_cache_fetch_redis(struct tango_cache_ctx *ctx) +{ + int ret = -1; + + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HMGET %s/%s OBJECT_META OBJECT_LOCATION OBJECT_BODY", + ctx->instance->param->bucketname, ctx->object_key); + if(ret != REDIS_OK) { - case CACHE_REDIS_CONNECTED: - ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s", - ctx->instance->param->redis_key, ctx->instance->param->bucketname, ctx->object_key); + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + tango_cache_ctx_destroy(ctx); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->get.state = GET_STATE_REDIS_ALL; + } + return ret; +} + +int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx) +{ + int ret = -1; + + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HMGET %s/%s OBJECT_META OBJECT_LOCATION OBJECT_BODY", + ctx->instance->param->bucketname, ctx->object_key); + if(ret != REDIS_OK) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + tango_cache_ctx_destroy(ctx); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->get.state = GET_STATE_REDIS_TRY; + } + return ret; +} + +static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) +{ + struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; + redisReply *reply = (redisReply *)vreply; + int ret; + + ctx->instance->statistic.session_redis -= 1; + if(reply == NULL || reply->type==REDIS_REPLY_ERROR || ac->err) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); + } + if(ctx->fail_state) + { + tango_cache_ctx_destroy(ctx, true); + return; + } + + switch(ctx->put.state) + { + case PUT_STATE_REDIS_META: + case PUT_STATE_REDIS_ALL: + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, "EXPIRE %s/%s %u", + ctx->instance->param->bucketname, ctx->object_key, ctx->put.object_ttl); if(ret != REDIS_OK) { - ctx->instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; - if(!strcmp(ctx->instance->current_redisip, ctx->instance->param->redis.mainip)) - { - main_redis_check_timer_start(ctx->instance); - } - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); + tango_cache_ctx_destroy(ctx, true); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_END; } break; - case CACHE_REDIS_DISCONNECTED: - case CACHE_REDIS_CONNECT_IDLE: - wiredlb_redis_asyn_connect(ctx->instance); - case CACHE_REDIS_CONNECTING: - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); + case PUT_STATE_END: + tango_cache_ctx_destroy(ctx, true); break; default: assert(0);break; } +} + +int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback) +{ + int ret; + char *meta; + + meta = cJSON_PrintUnformatted(ctx->put.object_meta); + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, "HMSET %s/%s OBJECT_LOCATION minio OBJECT_META %s", + ctx->instance->param->bucketname, ctx->object_key, meta); + if(ret != REDIS_OK) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + tango_cache_ctx_destroy(ctx, callback); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_REDIS_META; + } + free(meta); return ret; } +int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) +{ + int ret; + char *meta; + + ctx->instance->statistic.memory_used -= size; + meta = cJSON_PrintUnformatted(ctx->put.object_meta); + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, "HMSET %s/%s OBJECT_LOCATION redis OBJECT_META %s OBJECT_BODY %b", + ctx->instance->param->bucketname, ctx->object_key, meta, data, size); + if(ret != REDIS_OK) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + tango_cache_ctx_destroy(ctx, callback); + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_REDIS_ALL; + } + if(way == PUT_MEM_FREE) + { + free((void *)data); + } + free(meta); + return ret; +} + +int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback) +{ + char *data; + size_t size; + + data = (char *)malloc(object_size); + size = evbuffer_remove(ctx->put.evbuf, data, object_size); + if(size != object_size) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); + tango_cache_ctx_destroy(ctx, callback); + free(data); + return CACHE_ERR_EVBUFFER; + } + return redis_put_complete_part_data(ctx, PUT_MEM_FREE, data, object_size, callback); +} + diff --git a/cache/src/tango_cache_redis.h b/cache/src/tango_cache_redis.h index 7edf119..74e11e4 100644 --- a/cache/src/tango_cache_redis.h +++ b/cache/src/tango_cache_redis.h @@ -7,8 +7,15 @@ #include "tango_cache_client_in.h" int tango_cache_head_redis(struct tango_cache_ctx *ctx); -int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip); -void main_redis_check_timer_start(struct tango_cache_instance *instance); +int redis_asyn_connect_init(struct tango_cache_instance *instance); + + +int tango_cache_fetch_redis(struct tango_cache_ctx *ctx); +int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx); + +int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback); +int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback); +int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback); #endif diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index 4b8b890..7413e2f 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -12,6 +12,7 @@ #include "tango_cache_transfer.h" #include "tango_cache_xml.h" #include "tango_cache_tools.h" +#include "tango_cache_redis.h" static inline void curl_set_common_options(CURL *curl, long transfer_timeout, char *errorbuf) { @@ -297,6 +298,11 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block switch(ctx->put.state) { case PUT_STATE_START: + if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_MINIO)) + { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); + return false; + } ctx->put.state = PUT_STATE_WAIT_START; ret = curl_get_minio_uploadID(ctx); break; @@ -343,20 +349,33 @@ static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callba return ret; } -void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) +int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback) { DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not"); ctx->put.close_state = true;//������״̬�����������رգ��ڲ�״̬����ת�������ٹر� if(ctx->fail_state) { - tango_cache_ctx_destroy(ctx); - return; + tango_cache_ctx_destroy(ctx, callback); + return -1; } switch(ctx->put.state) { - case PUT_STATE_START: - http_put_complete_part_evbuf(ctx, true); + case PUT_STATE_START: //��ʱ��ͬ����һ�����ϴ� + if(sessions_exceeds_limit(ctx->instance, ctx->locate)) + { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); + tango_cache_ctx_destroy(ctx, callback); + return -1; + } + if(ctx->locate == OBJECT_IN_MINIO) + { + return http_put_complete_part_evbuf(ctx, callback); + } + else + { + return redis_put_complete_part_evbuf(ctx, ctx->put.object_size, callback); + } break; case PUT_STATE_PART: @@ -373,6 +392,7 @@ void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); tango_cache_ctx_destroy(ctx); + return -1; } } else if(http_put_bodypart_request_evbuf(ctx, false) <= 0) @@ -385,6 +405,7 @@ void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) else { tango_cache_ctx_destroy(ctx); + return -1; } } } @@ -394,6 +415,7 @@ void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) case PUT_STATE_WAIT_START: //��ʱδ��ȡ��uploadId�������������ϴ� default: break; } + return 0; } void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) @@ -417,7 +439,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r ctx->put.state = PUT_STATE_PART; if(ctx->put.close_state) { - cache_kick_upload_minio_end(ctx); + do_tango_cache_update_end(ctx, true); } else { @@ -448,7 +470,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r } else if(ctx->put.close_state) { - cache_kick_upload_minio_end(ctx); + do_tango_cache_update_end(ctx, true); } else { @@ -472,24 +494,30 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); } - tango_cache_ctx_destroy(ctx); + if(ctx->instance->param->object_store_way!=CACHE_ALL_MINIO && !ctx->fail_state) + { + redis_put_minio_object_meta(ctx, true); + } + else + { + tango_cache_ctx_destroy(ctx); + } break; default: break; } } -int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) +int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) { CURLMcode rc; char minio_url[256]; - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->error_code = CACHE_OK; if(NULL == (ctx->curl=curl_easy_init())) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); tango_cache_ctx_destroy(ctx, callback); if(way == PUT_MEM_FREE) free((void *)data); + ctx->instance->statistic.memory_used -= size; return -1; } ctx->put.state = PUT_STATE_END; @@ -513,7 +541,6 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM } ctx->response.size = size; ctx->response.len = 0; - ctx->instance->statistic.memory_used += size; curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size); curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb); @@ -524,19 +551,34 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM return 0; } -int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback) +int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) +{ + ctx->instance->statistic.put_recv_num += 1; + ctx->instance->statistic.memory_used += size; + ctx->instance->error_code = CACHE_OK; + + if(ctx->locate == OBJECT_IN_MINIO) + { + return http_put_complete_part_data(ctx, way, data, size, false); + } + else + { + return redis_put_complete_part_data(ctx, way, data, size, false); + } +} + +int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback) { size_t size; ctx->instance->statistic.put_recv_num += 1; ctx->instance->error_code = CACHE_OK; - size = evbuffer_get_length(evbuf); if(way == EVBUFFER_MOVE) { if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); tango_cache_ctx_destroy(ctx, callback); return -1; } @@ -545,14 +587,22 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF { if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); tango_cache_ctx_destroy(ctx, callback); return -1; } } + size = evbuffer_get_length(ctx->put.evbuf); ctx->instance->statistic.memory_used += size; - return http_put_complete_part_evbuf(ctx, callback); + if(ctx->locate == OBJECT_IN_MINIO) + { + return http_put_complete_part_evbuf(ctx, callback); + } + else + { + return redis_put_complete_part_evbuf(ctx, size, callback); + } } void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) @@ -594,7 +644,7 @@ void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, lon tango_cache_ctx_destroy(ctx); } -int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback) +int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback) { CURLMcode rc; char minio_url[256]; @@ -754,6 +804,7 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, { return raw_len; } + ctx->get.result.location = OBJECT_IN_MINIO; } pos_colon = (char*)memchr(start, ':', raw_len); if(pos_colon == NULL) @@ -839,12 +890,11 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r } } -int tango_cache_fetch_start(struct tango_cache_ctx *ctx) +static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx) { CURLMcode rc; char minio_url[256]; - ctx->instance->statistic.get_recv_num += 1; if(NULL == (ctx->curl=curl_easy_init())) { tango_cache_ctx_destroy(ctx); @@ -869,3 +919,52 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx) return 1; } +static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx) +{ + struct promise *p = ctx->promise; + + ctx->get.state = GET_STATE_START; + ctx->locate = OBJECT_IN_MINIO; + if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions) + { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + promise_failed(p, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + tango_cache_ctx_destroy(ctx); + } + else if(tango_cache_fetch_minio(ctx) != 1) + { + promise_failed(p, FUTURE_ERROR_CANCEL, "tango_cache_fetch_minio failed"); + } +} + +int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get) +{ + ctx->instance->statistic.get_recv_num += 1; + switch(where_to_get) + { + case OBJECT_IN_MINIO: + ctx->locate = OBJECT_IN_MINIO; + return (tango_cache_fetch_minio(ctx)==1)?0:-2; + case OBJECT_IN_REDIS: + ctx->locate = OBJECT_IN_REDIS; + return tango_cache_fetch_redis(ctx); + default: + ctx->get.redis_redirect_minio_cb = redis_redirect_object2minio_cb; + return tango_cache_try_fetch_redis(ctx); + } + return 0; +} + +int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head) +{ + ctx->instance->statistic.get_recv_num += 1; + if(where_to_head == OBJECT_IN_REDIS) + { + return tango_cache_head_redis(ctx); + } + else + { + return (tango_cache_fetch_minio(ctx)==1)?0:-1; + } +} + diff --git a/cache/src/tango_cache_transfer.h b/cache/src/tango_cache_transfer.h index 12bfa0b..ec17215 100644 --- a/cache/src/tango_cache_transfer.h +++ b/cache/src/tango_cache_transfer.h @@ -15,16 +15,17 @@ void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long r void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false); -int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback=false); +int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback=false); bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx); -void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); +int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback); -int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool call_back=false); -int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool call_back=false); +int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool call_back=false); +int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool call_back=false); -int tango_cache_fetch_start(struct tango_cache_ctx *ctx); +int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head); +int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get); #endif diff --git a/cache/test/CMakeLists.txt b/cache/test/CMakeLists.txt new file mode 100644 index 0000000..dc4a0d1 --- /dev/null +++ b/cache/test/CMakeLists.txt @@ -0,0 +1,22 @@ +add_definitions(-fPIC -Wall -g) + +add_executable (cache_evbase_test cache_evbase_test.cpp) +target_link_libraries(cache_evbase_test tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (cache_evbase_test MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) + +add_executable (cache_evbase_benchmark cache_evbase_benchmark.cpp) +target_link_libraries(cache_evbase_benchmark tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (cache_evbase_benchmark MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) + +#add_executable (cache_evbase_test_threads cache_evbase_test_threads.cpp) +#target_link_libraries(cache_evbase_test_threads tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +#target_link_libraries (cache_evbase_test_threads MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) + +add_executable (tango_cache_test tango_cache_test.cpp) +target_link_libraries(tango_cache_test tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (tango_cache_test MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread) + +#INSTALL (TARGETS cache_evbase_test cache_evbase_test_threads tango_cache_test cache_evbase_benchmark DESTINATION bin) +INSTALL (TARGETS cache_evbase_test tango_cache_test cache_evbase_benchmark DESTINATION bin) +INSTALL (FILES ${CMAKE_CURRENT_SOURCE_DIR}/pangu_tg_cahce.conf DESTINATION bin) +INSTALL (FILES ${CMAKE_CURRENT_SOURCE_DIR}/cmd.txt DESTINATION bin) diff --git a/cache/test/Makefile b/cache/test/Makefile index 07f89f6..9068db8 100644 --- a/cache/test/Makefile +++ b/cache/test/Makefile @@ -6,17 +6,17 @@ CFLAGS=-Wall -g $(INC_PATH) LIBS = -lMESA_handle_logger -lMESA_prof_load -lWiredLB LIBS += -lssl -lcrypto LIBS += ../lib/libtango_cache_client.a -LIBS += ./lib/libcurl.a ./lib/libevent.a ./lib/libxml2.a ./lib/libhiredis.a ./lib/libhiredis.a +LIBS += ./lib/libcurl.a ./lib/libevent.a ./lib/libxml2.a ./lib/libhiredis_vip.a OBJS = tango_cache_test.o OBJS_EVBASE=cache_evbase_test.o -OBJS_EVBASE_THREADS=cache_evbase_test_threads.o +OBJS_EVBASE_BENCHMARK=cache_evbase_benchmark.o TARGET_EXE=tango_cache_test TARGET_EXE_EVBASE=cache_evbase_test -TARGET_EXE_EVBASE_THREAD=cache_evbase_test_threads +TARGET_EXE_EVBASE_BENCHMARK=cache_evbase_benchmark -ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE) $(TARGET_EXE_EVBASE_THREAD) +ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE) $(TARGET_EXE_EVBASE_BENCHMARK) $(TARGET_EXE):$(OBJS) $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) @@ -24,7 +24,7 @@ $(TARGET_EXE):$(OBJS) $(TARGET_EXE_EVBASE):$(OBJS_EVBASE) $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread -$(TARGET_EXE_EVBASE_THREAD):$(OBJS_EVBASE_THREADS) +$(TARGET_EXE_EVBASE_BENCHMARK):$(OBJS_EVBASE_BENCHMARK) $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread .c.o: @@ -35,5 +35,5 @@ $(TARGET_EXE_EVBASE_THREAD):$(OBJS_EVBASE_THREADS) -include $(DEPS) clean: - rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE) $(OBJS_EVBASE_THREADS) $(TARGET_EXE_EVBASE_THREAD) + rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE) $(OBJS_EVBASE_BENCHMARK) $(TARGET_EXE_EVBASE_BENCHMARK) diff --git a/cache/test/cache_evbase_benchmark.cpp b/cache/test/cache_evbase_benchmark.cpp new file mode 100644 index 0000000..d245244 --- /dev/null +++ b/cache/test/cache_evbase_benchmark.cpp @@ -0,0 +1,429 @@ +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <net/if.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <sys/prctl.h> +#include <sys/un.h> +#include <unistd.h> +#include <sys/poll.h> +#include <curl/curl.h> +#include <fcntl.h> +#include <sys/stat.h> +#include <errno.h> +#include <sys/cdefs.h> +#include <MESA/MESA_handle_logger.h> +#include <pthread.h> +#include <sys/prctl.h> + +#include "object_store_client.h" + +#define METHOD_GET 1 +#define METHOD_PUT 2 +#define METHOD_HEAD 3 +#define METHOD_PUTONCEEV 4 +#define METHOD_PUTONCE 5 +#define METHOD_DEL 6 + +struct object_store_instance *instance_asyn; + +struct filecontent +{ + char *buf; + size_t len; +}; + +struct future_pdata +{ + struct future * future; + FILE *fp; + char filename[256]; +}; + +void get_future_success(future_result_t* result, void * user) +{ + struct tango_cache_result *res = cache_evbase_read_result(result); + struct future_pdata *pdata = (struct future_pdata *)user; + //char buffer[1024]; + + switch(res->type) + { + case RESULT_TYPE_USERTAG: + case RESULT_TYPE_HEADER: + //memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); + //buffer[res->size] = '\0'; + //printf("%s", buffer); + break; + case RESULT_TYPE_BODY: + //fwrite(res->data_frag, res->size, 1, pdata->fp); + break; + case RESULT_TYPE_MISS: + //printf("cache not hit/fresh\n"); + case RESULT_TYPE_END: + //if(res->type != RESULT_TYPE_MISS) + // printf("get cache over, total length: %ld\n", res->tlength); + future_destroy(pdata->future); + //fclose(pdata->fp); + free(pdata); + break; + default:break; + } +} + +void get_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + future_destroy(pdata->future); + free(pdata); + //printf("GET fail: %s\n", what); +} + +void head_future_success(future_result_t* result, void * user) +{ + struct tango_cache_result *res = cache_evbase_read_result(result); + struct future_pdata *pdata = (struct future_pdata *)user; + char buffer[1024]; + + switch(res->type) + { + case RESULT_TYPE_USERTAG: + case RESULT_TYPE_HEADER: + memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); + buffer[res->size] = '\0'; + printf("%s", buffer); + break; + case RESULT_TYPE_BODY: + assert(0); + break; + case RESULT_TYPE_MISS: + printf("cache not hit/fresh\n"); + case RESULT_TYPE_END: + if(res->type != RESULT_TYPE_MISS) + printf("HEAD cache over, total length: %ld\n", res->tlength); + future_destroy(pdata->future); + free(pdata); + break; + default:break; + } +} + +void head_future_failed(enum e_future_error err, const char * what, void * user) +{ + printf("HEAD fail: %s\n", what); +} + + +void put_future_success(future_result_t* result, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + //printf("PUT %s succ\n", pdata->filename); + future_destroy(pdata->future); + free(pdata); +} +void put_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + //printf("PUT %s fail: %s\n", what, pdata->filename); + future_destroy(pdata->future); + free(pdata); +} + +void del_future_success(future_result_t* result, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s succ\n", pdata->filename); + future_destroy(pdata->future); + free(pdata); +} +void del_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s fail: %s\n", pdata->filename, what); + future_destroy(pdata->future); + free(pdata); +} + +char * get_file_content(const char *filename, size_t *filelen_out) +{ + char *buffer; + FILE *fp; + size_t filelen = 0; + struct stat filestat; + int readlen; + + fp = fopen(filename, "rb"); + if(fp == NULL) + { + printf("fopen file %s failed.\n", filename); + return NULL; + } + if(fstat(fileno(fp), &filestat)) + { + printf("fstat %s failed.\n", filename); + return NULL; + } + + buffer = (char *)malloc(filestat.st_size); + + while(filelen < (size_t)filestat.st_size) + { + readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp); + if(readlen < 0) + { + printf("read error: %s\n", strerror(errno)); + return NULL; + } + + filelen += readlen; + } + fclose(fp); + *filelen_out = filestat.st_size; + + return buffer; +} + +struct cache_statistics g_out_last; +void timer_cb(evutil_socket_t fd, short what, void *arg) +{ + struct timeval tv; + struct cache_statistics out_now; + struct cache_statistics out; + + tv.tv_sec = 10; + tv.tv_usec = 0; + + /*static int num=0; + num++; + if(ctx_global!=NULL && num>5) + { + tango_cache_update_end(ctx_global); + ctx_global = NULL; + }*/ + + object_store_get_statistics(instance_asyn, &out_now); + out.del_error_num = out_now.del_error_num - g_out_last.del_error_num; + out.del_recv_num = out_now.del_recv_num - g_out_last.del_recv_num; + out.del_succ_num = out_now.del_succ_num - g_out_last.del_succ_num; + out.get_err_http = out_now.get_err_http - g_out_last.get_err_http; + out.get_err_redis = out_now.get_err_redis - g_out_last.get_err_redis; + out.get_miss_num = out_now.get_miss_num - g_out_last.get_miss_num; + out.get_recv_num = out_now.get_recv_num - g_out_last.get_recv_num; + out.get_succ_http = out_now.get_succ_http - g_out_last.get_succ_http; + out.get_succ_redis = out_now.get_succ_redis - g_out_last.get_succ_redis; + out.put_err_http = out_now.put_err_http - g_out_last.put_err_http; + out.put_err_redis = out_now.put_err_redis - g_out_last.put_err_redis; + out.put_recv_num = out_now.put_recv_num - g_out_last.put_recv_num; + out.put_succ_http = out_now.put_succ_http - g_out_last.put_succ_http; + out.put_succ_redis = out_now.put_succ_redis - g_out_last.put_succ_redis; + out.session_http = out_now.session_http; + out.session_redis = out_now.session_redis; + out.memory_used = out_now.memory_used; + out.totaldrop_num = out_now.totaldrop_num - g_out_last.totaldrop_num; + + printf("-------------------------------------------------------------------------------------------\n" + "get_recv: %llu, get_http: %llu, get_redis: %llu, get_fail_http: %llu, get_fail_redis: %llu, get_miss: %llu\n" + "put_recv: %llu, put_http: %llu, put_redis: %llu, put_fail_http: %llu, put_fail_redis: %llu\n" + "del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session_redis: %llu, session_http: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_http, out.get_succ_redis, out.get_err_http, out.get_err_redis, out.get_miss_num, + out.put_recv_num, out.put_succ_http, out.put_succ_redis, out.put_err_http, out.put_err_redis, + out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_redis, out.session_http, out.memory_used); + + g_out_last = out_now; + event_add((struct event *)arg, &tv); +} + +struct filecontentcmd +{ + int method; + int threads; + int total_num; + int sess_limit; + char file[256]; +}; + + +static void* thread_transfer_cmd(void *arg) +{ + int index=0; + char filename_in[256]; + struct tango_cache_meta_put putmeta; + struct tango_cache_meta_get getmeta; + struct future_pdata *pdata; + struct cache_evbase_ctx *ctx; + struct filecontent filecont; + size_t remain_len; + struct cache_statistics out; + struct evbuffer *evbuf; + struct filecontentcmd *filecmd = (struct filecontentcmd *)arg; + + prctl(PR_SET_NAME, "transfer_cmd"); + + memset(&putmeta, 0, sizeof(struct tango_cache_meta_put)); + memset(&getmeta, 0, sizeof(struct tango_cache_meta_get)); + putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype"; + putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip"; + putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n"; + putmeta.usertag_len = strlen(putmeta.usertag); + + filecont.buf = get_file_content(filecmd->file, &filecont.len); + assert(filecont.buf != NULL); + + while(1) + { + object_store_get_statistics(instance_asyn, &out); + if(out.session_http >= filecmd->sess_limit || out.session_redis>=filecmd->sess_limit) + { + usleep(1000); + continue; + } + + switch(filecmd->method) + { + case METHOD_GET: + sprintf(filename_in, "%s_%u", filecmd->file, index); + getmeta.url = filename_in; + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(get_future_success, get_future_failed, pdata); + object_store_fetch_object(instance_asyn, pdata->future, &getmeta, OBJECT_IN_UNKNOWN); + break; + + case METHOD_PUT: + remain_len = filecont.len; + + sprintf(filename_in, "%s_%u", filecmd->file, index); + putmeta.url = filename_in; + + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(put_future_success, put_future_failed, pdata); + ctx = object_store_update_start(instance_asyn, pdata->future, &putmeta); + if(ctx == NULL) + { + future_destroy(pdata->future); + free(pdata); + continue; + } + while(remain_len >= 1024) + { + object_store_update_frag_data(ctx, PUT_MEM_COPY, filecont.buf+(filecont.len-remain_len), 1024); + remain_len -= 1024; + } + if(remain_len > 0) + { + object_store_update_frag_data(ctx, PUT_MEM_COPY, filecont.buf+(filecont.len-remain_len), remain_len); + } + object_store_update_end(ctx, pdata->filename, 256); + break; + + case METHOD_HEAD: + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(head_future_success, head_future_failed, pdata); + object_store_head_object(instance_asyn, pdata->future, &getmeta); + break; + + case METHOD_DEL: + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(del_future_success, del_future_failed, pdata); + sprintf(pdata->filename, "%s_%u", filecmd->file, index); + object_store_delete_object(instance_asyn, pdata->future, pdata->filename); + break; + case METHOD_PUTONCE: + remain_len = filecont.len; + + sprintf(filename_in, "%s_%u", filecmd->file, index); + putmeta.url = filename_in; + + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(put_future_success, put_future_failed, pdata); + object_store_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, filecont.buf, filecont.len, &putmeta, pdata->filename, 256); + break; + case METHOD_PUTONCEEV: + remain_len = filecont.len; + + sprintf(filename_in, "%s_%u", filecmd->file, index); + putmeta.url = filename_in; + + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + pdata->future = future_create(put_future_success, put_future_failed, pdata); + evbuf = evbuffer_new(); + + remain_len = filecont.len; + while(remain_len >= 1024) + { + evbuffer_add(evbuf, filecont.buf+(filecont.len-remain_len), 1024); + remain_len -= 1024; + } + if(remain_len > 0) + { + evbuffer_add(evbuf, filecont.buf+(filecont.len-remain_len), remain_len); + } + object_store_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &putmeta, pdata->filename, 256); + break; + default:break; + } + + index = (index+1) % filecmd->total_num; + } + + return NULL; +} + +int main(int argc, char **argv) +{ + pthread_t thread_tid; + pthread_attr_t attr; + void *runtime_log; + struct filecontentcmd filecmd; + + struct event ev_timer; + struct timeval tv; + struct event_base *ev_base; + + if(argc != 6) + { + printf("USAGE: %s <method,1-GET,2-PUT> <file> <threads> <total_num> <limit_session_num>\n", argv[0]); + return -1; + } + + runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); + if(NULL==runtime_log) + { + return -1; + } + filecmd.method = atoi(argv[1]); + filecmd.threads = atoi(argv[3]); + filecmd.total_num = atoi(argv[4]); + filecmd.sess_limit = atoi(argv[5]); + sprintf(filecmd.file, "%s", argv[2]); + + object_store_global_init(); + instance_asyn = object_store_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", filecmd.threads, runtime_log); + assert(instance_asyn!=NULL); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + for(int i=0; i<4; i++) + { + if(pthread_create(&thread_tid, &attr, thread_transfer_cmd, &filecmd)) + { + return -1; + } + } + + ev_base = event_base_new(); + tv.tv_sec = 10; + tv.tv_usec = 0; + evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer); + evtimer_add(&ev_timer, &tv); + event_base_dispatch(ev_base); + return 0; +} + diff --git a/cache/test/cache_evbase_test.cpp b/cache/test/cache_evbase_test.cpp index b3f0350..e1c7462 100644 --- a/cache/test/cache_evbase_test.cpp +++ b/cache/test/cache_evbase_test.cpp @@ -243,7 +243,7 @@ int main(int argc, char **argv) pdata->future = future_create(get_future_success, get_future_failed, pdata); pdata->fp = fopen(filename_out, "w"); - cache_evbase_fetch_object(instance_asyn, pdata->future, &getmeta); + cache_evbase_fetch_object(instance_asyn, pdata->future, &getmeta, OBJECT_IN_UNKNOWN); } else if(!strcasecmp(p, "HEAD")) { @@ -289,8 +289,6 @@ int main(int argc, char **argv) pdata->future = future_create(put_future_success, put_future_failed, pdata); ctx = cache_evbase_update_start(instance_asyn, pdata->future, &putmeta); - cache_evbase_get_object_path(ctx, pdata->filename, 256); - char buffer[1024]; FILE *fp = fopen(filename_in, "r"); while(!feof(fp)) @@ -300,7 +298,7 @@ int main(int argc, char **argv) cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n); } - cache_evbase_update_end(ctx); + cache_evbase_update_end(ctx, pdata->filename, 256); } } @@ -321,8 +319,13 @@ int main(int argc, char **argv) struct cache_statistics out; cache_evbase_get_statistics(instance_asyn, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); + printf("-------------------------------------------------------------------------------------------\n" + "get_recv: %llu, get_http: %llu, get_redis: %llu, get_fail_http: %llu, get_fail_redis: %llu, get_miss: %llu\n" + "put_recv: %llu, put_http: %llu, put_redis: %llu, put_fail_http: %llu, put_fail_redis: %llu\n" + "del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session_redis: %llu, session_http: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_http, out.get_succ_redis, out.get_err_http, out.get_err_redis, out.get_miss_num, + out.put_recv_num, out.put_succ_http, out.put_succ_redis, out.put_err_http, out.put_err_redis, + out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_redis, out.session_http, out.memory_used); return 0; } diff --git a/cache/test/cache_evbase_test_threads.cpp b/cache/test/cache_evbase_test_threads.cpp deleted file mode 100644 index f2924b7..0000000 --- a/cache/test/cache_evbase_test_threads.cpp +++ /dev/null @@ -1,322 +0,0 @@ -#include <sys/ioctl.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <net/if.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/prctl.h> -#include <sys/un.h> -#include <unistd.h> -#include <sys/poll.h> -#include <curl/curl.h> -#include <fcntl.h> -#include <sys/stat.h> -#include <errno.h> -#include <sys/cdefs.h> -#include <pthread.h> -#include <MESA/MESA_handle_logger.h> - -#include "cache_evbase_client.h" - -struct cache_evbase_instance *instance_asyn; -int runing_over=0; - -struct future_pdata -{ - struct future * future; - FILE *fp; - char filename[256]; -}; - -void get_future_success(future_result_t* result, void * user) -{ - struct tango_cache_result *res = cache_evbase_read_result(result); - struct future_pdata *pdata = (struct future_pdata *)user; - char buffer[1024]; - - switch(res->type) - { - case RESULT_TYPE_USERTAG: - case RESULT_TYPE_HEADER: - memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); - buffer[res->size] = '\0'; - printf("%s", buffer); - break; - case RESULT_TYPE_BODY: - fwrite(res->data_frag, res->size, 1, pdata->fp); - break; - case RESULT_TYPE_MISS: - printf("cache not hit/fresh\n"); - case RESULT_TYPE_END: - if(res->type != RESULT_TYPE_MISS) - printf("get cache over, total length: %ld\n", res->tlength); - future_destroy(pdata->future); - fclose(pdata->fp); - free(pdata); - runing_over = 1; - break; - default:break; - } -} - -void get_future_failed(enum e_future_error err, const char * what, void * user) -{ - printf("GET fail: %s\n", what); - runing_over = 2; -} - -void put_future_success(future_result_t* result, void * user) -{ - struct future_pdata *pdata = (struct future_pdata *)user; - - printf("PUT %s succ\n", pdata->filename); - future_destroy(pdata->future); - free(pdata); - runing_over = 1; -} -void put_future_failed(enum e_future_error err, const char * what, void * user) -{ - struct future_pdata *pdata = (struct future_pdata *)user; - - printf("PUT %s fail: %s\n", what, pdata->filename); - future_destroy(pdata->future); - free(pdata); - runing_over = 1; -} - -void del_future_success(future_result_t* result, void * user) -{ - struct future_pdata *pdata = (struct future_pdata *)user; - - printf("DEL %s succ\n", pdata->filename); - future_destroy(pdata->future); - free(pdata); - runing_over = 1; -} -void del_future_failed(enum e_future_error err, const char * what, void * user) -{ - struct future_pdata *pdata = (struct future_pdata *)user; - - printf("DEL %s fail: %s\n", pdata->filename, what); - future_destroy(pdata->future); - free(pdata); - runing_over = 1; -} - -char * get_file_content(const char *filename, size_t *filelen_out) -{ - char *buffer; - FILE *fp; - size_t filelen = 0; - struct stat filestat; - int readlen; - - fp = fopen(filename, "rb"); - if(fp == NULL) - { - printf("fopen file %s failed.\n", filename); - return NULL; - } - if(fstat(fileno(fp), &filestat)) - { - printf("fstat %s failed.\n", filename); - return NULL; - } - - buffer = (char *)malloc(filestat.st_size); - - while(filelen < (size_t)filestat.st_size) - { - readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp); - if(readlen < 0) - { - printf("read error: %s\n", strerror(errno)); - return NULL; - } - - filelen += readlen; - } - fclose(fp); - *filelen_out = filestat.st_size; - - return buffer; -} - -struct pthread_data -{ - char *argv; - int upload_times; - int thread_id; -}; - -void* thread_upload_download(void *arg) -{ - int n; - char method[16], filename_in[256], filename_out[256], *p; - struct tango_cache_meta_put putmeta; - struct tango_cache_meta_get getmeta; - struct future_pdata *pdata; - struct cache_evbase_ctx *ctx; - struct pthread_data *thread_data = (struct pthread_data *)arg; - - if(sscanf(thread_data->argv, "%[^:]:%1023s%n", method, filename_in, &n) != 2) - { - assert(0); - } - if(strlen(filename_in) <= 0) - { - return NULL; - } - - memset(&putmeta, 0, sizeof(struct tango_cache_meta_put)); - putmeta.url = filename_in; - putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype"; - putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip"; - putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n"; - putmeta.usertag_len = strlen(putmeta.usertag); - - getmeta.url = filename_in; - - p = method; - while(*p=='\r'||*p=='\n') p++; - assert(*p!='\0'); - - for(int i=0; i<thread_data->upload_times; i++) - { - pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - - if(!strcasecmp(p, "GET")) - { - sprintf(filename_out, "file_index_%d_%d.bin", thread_data->thread_id, i); - pdata->future = future_create(get_future_success, get_future_failed, pdata); - pdata->fp = fopen(filename_out, "w"); - - cache_evbase_fetch_object(instance_asyn, pdata->future, &getmeta); - } - else if(!strcasecmp(p, "DEL")) - { - pdata->future = future_create(del_future_success, del_future_failed, pdata); - sprintf(pdata->filename, "%s", filename_in); - cache_evbase_delete_object(instance_asyn, pdata->future, filename_in); - } - else if(!strcasecmp(p, "PUTONCE")) - { - size_t filelen; - p = get_file_content(filename_in, &filelen); - pdata->future = future_create(put_future_success, put_future_failed, pdata); - - if(cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256)) - { - printf("cache_evbase_upload_once_data fail: %d\n", cache_evbase_ctx_error(instance_asyn)); - future_destroy(pdata->future); - free(pdata); - } - } - else if(!strcasecmp(p, "PUTONCEEV")) - { - size_t readlen; - pdata->future = future_create(put_future_success, put_future_failed, pdata); - struct evbuffer *evbuf = evbuffer_new(); - char buffer[1024]; - - FILE *fp = fopen(filename_in, "rb"); - while(!feof(fp)) - { - readlen = fread(buffer, 1, 1024, fp); - if(readlen < 0) - { - assert(0); - } - evbuffer_add(evbuf, buffer, readlen); - } - fclose(fp); - if(cache_evbase_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &putmeta, pdata->filename, 256)) - { - printf("cache_evbase_upload_once_evbuf fail: %d\n", cache_evbase_ctx_error(instance_asyn)); - future_destroy(pdata->future); - free(pdata); - } - evbuffer_free(evbuf); - } - else - { - pdata->future = future_create(put_future_success, put_future_failed, pdata); - - ctx = cache_evbase_update_start(instance_asyn, pdata->future, &putmeta); - if(ctx==NULL) - { - printf("cache_evbase_update_start fail: %d\n", cache_evbase_ctx_error(instance_asyn)); - future_destroy(pdata->future); - free(pdata); - continue; - } - cache_evbase_get_object_path(ctx, pdata->filename, 256); - - char buffer[1024]; - FILE *fp = fopen(filename_in, "r"); - while(!feof(fp)) - { - n = fread(buffer, 1, 1024, fp); - assert(n>=0); - cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n); - } - - cache_evbase_update_end(ctx); - } - } - - printf("transfer over\n"); - return NULL; -} - -int main(int argc, char **argv) -{ - struct cache_statistics out; - void *runtime_log; - pthread_t thread_tid; - pthread_attr_t attr; - struct pthread_data pdata[20]; - struct tango_cache_parameter *parameter; - - if(argc!=3) - { - printf("USGAE: %s <PUT/PUTONCE/PUTONCEEV/GET/DEL:filename> <how many times>\n", argv[0]); - return -1; - } - - runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); - if(NULL==runtime_log) - { - return -1; - } - - cache_evbase_global_init(); - parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); - assert(parameter != NULL); - instance_asyn = cache_evbase_instance_new(parameter, runtime_log); - assert(instance_asyn!=NULL); - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - for(int i=0; i<20; i++) - { - pdata[i].argv = argv[1]; - pdata[i].thread_id = i; - pdata[i].upload_times = atoi(argv[2]); - pthread_create(&thread_tid, &attr, thread_upload_download, &pdata[i]); - } - - while(1) - { - sleep(30); - cache_evbase_get_statistics(instance_asyn, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); - } - return 0; -} - diff --git a/cache/test/lib/libcjson.a b/cache/test/lib/libcjson.a Binary files differnew file mode 100644 index 0000000..4bf20ae --- /dev/null +++ b/cache/test/lib/libcjson.a diff --git a/cache/test/lib/libcrypto.a b/cache/test/lib/libcrypto.a Binary files differnew file mode 100644 index 0000000..4aea07e --- /dev/null +++ b/cache/test/lib/libcrypto.a diff --git a/cache/test/lib/libcurl.a b/cache/test/lib/libcurl.a Binary files differindex 7b0c467..88b1b70 100644 --- a/cache/test/lib/libcurl.a +++ b/cache/test/lib/libcurl.a diff --git a/cache/test/lib/libevent.a b/cache/test/lib/libevent.a Binary files differindex 5495d20..772ecb0 100644 --- a/cache/test/lib/libevent.a +++ b/cache/test/lib/libevent.a diff --git a/cache/test/lib/libhiredis.a b/cache/test/lib/libhiredis.a Binary files differdeleted file mode 100644 index e0c95b6..0000000 --- a/cache/test/lib/libhiredis.a +++ /dev/null diff --git a/cache/test/lib/libhiredis_vip.a b/cache/test/lib/libhiredis_vip.a Binary files differnew file mode 100644 index 0000000..a87868b --- /dev/null +++ b/cache/test/lib/libhiredis_vip.a diff --git a/cache/test/lib/libssl.a b/cache/test/lib/libssl.a Binary files differnew file mode 100644 index 0000000..ffb498e --- /dev/null +++ b/cache/test/lib/libssl.a diff --git a/cache/test/lib/libxml2.a b/cache/test/lib/libxml2.a Binary files differindex f39781b..28fea0d 100644 --- a/cache/test/lib/libxml2.a +++ b/cache/test/lib/libxml2.a diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf index 8316e11..e3ef66c 100644 --- a/cache/test/pangu_tg_cahce.conf +++ b/cache/test/pangu_tg_cahce.conf @@ -1,40 +1,35 @@ -[tango_cache] -#Addresses of minio cluster. Format is defined by WiredLB. -minio_ip_list=192.168.10.61-64; -minio_listen_port=9000 +[TANGO_CACHE] +#Addresses of minio. Format is defined by WiredLB. +MINIO_IP_LIST=10.3.35.1; +MINIO_LISTEN_PORT=9000 #Maximum number of connections opened by per host. -#max_connection_per_host=1 +#MAX_CONNECTION_PER_HOST=1 #Maximum number of requests in a pipeline. -max_cnnt_pipeline_num=20 -#max_curl_session_num=100 +#MAX_CNNT_PIPELINE_NUM=20 +#Maximum parellel sessions(http and redis) is allowed to open. +#MAX_CURL_SESSION_NUM=100 #Maximum time the request is allowed to take(seconds). -max_curl_transfer_timeout_s=15 +#MAX_CURL_TRANSFER_TIMEOUT_S=0 #Bucket name in minio. -cache_bucket_name=openbucket +CACHE_BUCKET_NAME=openbucket #Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value. -max_used_memory_size_mb=5120 +MAX_USED_MEMORY_SIZE_MB=5120 #Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute). -cache_default_ttl_second=600 -#Whether to hash the object key before cache actions. GET/PUT will be faster if you open it. -cache_object_key_hash_switch=1 +CACHE_DEFAULT_TTL_SECOND=3600 +#Whether to hash the object key before cache actions. GET/PUT may be faster if you open it. +CACHE_OBJECT_KEY_HASH_SWITCH=1 -#Where to HEAD meta information of objects. 1-minio;2-redis. -cache_head_from_source=2 -#If cache_head_from_source is 2, fill in the following configs. -cache_head_redis_key=MINIO_EVENTS_INFO -#The IP tango_cache_client will always first use to HEAD meta. -cache_head_main_redis_ip=192.168.10.63 -#Only when cache_head_main_redis_ip fails will tango_cache_client chose one IP from the list as backup. -cache_head_redis_iplist=192.168.10.62-63; -cache_head_redis_port=6379 - -#Configs for WiredLB. -#wiredlb_override=1 -#wiredlb_topic= -#wiredlb_datacenter= -wiredlb_minio_health_port=52100 -#wiredlb_minio_group= -wiredlb_redis_health_port=52101 -#wiredlb_redis_group= +#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio; +CACHE_STORE_OBJECT_WAY=2 +#If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis. +REDIS_CACHE_OBJECT_SIZE=20480 +#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. +REDIS_CLUSTER_ADDRS=10.4.35.33:9001,10.4.35.34:9001 +#Configs of WiredLB for Minios load balancer. +#WIREDLB_OVERRIDE=1 +#WIREDLB_TOPIC= +#WIREDLB_DATACENTER= +WIREDLB_HEALTH_PORT=52101 +#WIREDLB_GROUP= diff --git a/cache/test/tango_cache_test.cpp b/cache/test/tango_cache_test.cpp index b449a11..00ba43c 100644 --- a/cache/test/tango_cache_test.cpp +++ b/cache/test/tango_cache_test.cpp @@ -240,7 +240,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) pdata->fp = fopen(filename, "w"); pdata->future = future_create(get_future_success, get_future_failed, pdata); - if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta) < 0) + if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta, OBJECT_IN_UNKNOWN) < 0) { get_future_failed(FUTURE_ERROR_CANCEL, "", pdata); } @@ -315,8 +315,6 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) put_future_failed(FUTURE_ERROR_CANCEL, "tango_cache_update_start_NULL", pdata); continue; } - tango_cache_get_object_path(ctx, pdata->filename, 256); - FILE *fp = fopen(s, "r"); while(!feof(fp)) { @@ -325,7 +323,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) tango_cache_update_frag_data(ctx, buffer, n); } fclose(fp); - tango_cache_update_end(ctx); + tango_cache_update_end(ctx, pdata->filename, 256); } } else @@ -387,8 +385,13 @@ void timer_cb(evutil_socket_t fd, short what, void *arg) }*/ tango_cache_get_statistics(tango_instance, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); + printf("-------------------------------------------------------------------------------------------\n" + "get_recv: %llu, get_http: %llu, get_redis: %llu, get_fail_http: %llu, get_fail_redis: %llu, get_miss: %llu\n" + "put_recv: %llu, put_http: %llu, put_redis: %llu, put_fail_http: %llu, put_fail_redis: %llu\n" + "del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session_redis: %llu, session_http: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_http, out.get_succ_redis, out.get_err_http, out.get_err_redis, out.get_miss_num, + out.put_recv_num, out.put_succ_http, out.put_succ_redis, out.put_err_http, out.put_err_redis, + out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_redis, out.session_http, out.memory_used); event_add((struct event *)arg, &tv); } diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 788ffcd..b3f07a5 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -249,21 +249,21 @@ set_property(TARGET libcurl-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib set_property(TARGET libcurl-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) ### hiredis -ExternalProject_Add(hiredis PREFIX hiredis - URL ${CMAKE_CURRENT_SOURCE_DIR}/hiredis-0.14.0.zip - URL_MD5 376af92277701fae52a8c917c3ce3044 +ExternalProject_Add(hiredisCluster PREFIX hiredisCluster + URL ${CMAKE_CURRENT_SOURCE_DIR}/hiredis-vip-0.3.0.zip + URL_MD5 950ccc040a705ebe6c6b1854a744ae2d CONFIGURE_COMMAND "" BUILD_COMMAND make INSTALL_COMMAND make install BUILD_IN_SOURCE 1) -ExternalProject_Get_Property(hiredis SOURCE_DIR) +ExternalProject_Get_Property(hiredisCluster SOURCE_DIR) set(HIREDIS_INCLUDE_DIRECTORIES ${SOURCE_DIR}/out/include/) file(MAKE_DIRECTORY ${HIREDIS_INCLUDE_DIRECTORIES}) add_library(hiredis-static STATIC IMPORTED GLOBAL) -add_dependencies(libcurl-static hiredis) -set_property(TARGET hiredis-static PROPERTY IMPORTED_LOCATION ${SOURCE_DIR}/libhiredis.a) +add_dependencies(libcurl-static hiredisCluster) +set_property(TARGET hiredis-static PROPERTY IMPORTED_LOCATION ${SOURCE_DIR}/libhiredis_vip.a) set_property(TARGET hiredis-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${HIREDIS_INCLUDE_DIRECTORIES}) ### dablooms diff --git a/vendor/hiredis-0.14.0.zip b/vendor/hiredis-0.14.0.zip Binary files differdeleted file mode 100644 index 6f97da4..0000000 --- a/vendor/hiredis-0.14.0.zip +++ /dev/null diff --git a/vendor/hiredis-vip-0.3.0.zip b/vendor/hiredis-vip-0.3.0.zip Binary files differnew file mode 100644 index 0000000..e99c1c6 --- /dev/null +++ b/vendor/hiredis-vip-0.3.0.zip |
