diff options
| author | fengweihao <[email protected]> | 2023-12-29 14:39:03 +0800 |
|---|---|---|
| committer | fengweihao <[email protected]> | 2023-12-29 14:39:03 +0800 |
| commit | ced991b4a0569f52fbae33aa8ce15c9d4ea6bda7 (patch) | |
| tree | f28257f66772f6dcdf130d82272054dddfe6dc59 /cache/src/tango_cache_client.cpp | |
| parent | b011a9268042db22cc54ca8171640dbfb2ab617c (diff) | |
TSG-18286 Proxy支持虚拟表表名变更,删除代理本地缓存,删除tsg-http相关配置v4.8.56-20231229
Diffstat (limited to 'cache/src/tango_cache_client.cpp')
| -rw-r--r-- | cache/src/tango_cache_client.cpp | 1251 |
1 files changed, 0 insertions, 1251 deletions
diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp deleted file mode 100644 index 07674ad..0000000 --- a/cache/src/tango_cache_client.cpp +++ /dev/null @@ -1,1251 +0,0 @@ -#include <sys/types.h> -#include <sys/stat.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> -#include <openssl/sha.h> -#include <openssl/md5.h> - -#include <MESA/MESA_prof_load.h> - -#include "tango_cache_client_in.h" -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" -#include "tango_cache_xml.h" -#include "tango_cache_redis.h" - -int TANGO_CACHE_VERSION_20181009=0; - -static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size) -{ - MD5_CTX c; - unsigned char md5[17]={0}; - - if(size < 33) - return -1; - - MD5_Init(&c); - MD5_Update(&c, data, len); - MD5_Final(md5, &c); - - Base64_EncodeBlock(md5, 16, result); - return 0; -} - -void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) -{ - SHA256_CTX c; - unsigned char sha256[128]; - u_int32_t length; - - SHA256_Init(&c); - SHA256_Update(&c, data, len); - SHA256_Final(sha256, &c); - - length = (size > 64)?32:(size-1)/2; //Ԥ��һ���ռ� - for(u_int32_t i=0; i<length; i++) - { - sprintf(result + i*2, "%02x", sha256[i]); - } - result[length*2] = '\0'; -} - -static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize) -{ - struct WLB_consumer_t chosen; - - if(wiredLB_lookup(wiredlb, key, keylen, &chosen)) - { - return -1; - } - snprintf(host, hostsize, "%s:%hu", chosen.ip_addr, chosen.data_port); - return 0; -} - -enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx) -{ - return ctx->error_code; -} -enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance) -{ - return instance->error_code; -} - -void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code) -{ - ctx->fail_state = true; - ctx->error_code = error_code; -} - -const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) -{ - switch(ctx->error_code) - { - case CACHE_CACHE_MISS: return "cache not hit"; - case CACHE_TIMEOUT: return "cache not fresh"; - case CACHE_OUTOF_MEMORY: return "outof memory"; - case CACHE_ERR_WIREDLB: return "wiredlb error"; - case CACHE_ERR_SOCKPAIR: return "socketpair error"; - case CACHE_ERR_INTERNAL: return "internal error"; - case CACHE_ERR_REDIS_JSON: return "parse redis json error"; - case CACHE_ERR_REDIS_CONNECT:return "redis is not connected"; - case CACHE_ERR_REDIS_EXEC: return "redis command reply error"; - case CACHE_OUTOF_SESSION: return "two many curl sessions"; - case CACHE_UPDATE_CANCELED: return "update was canceled"; - case CACHE_ERR_EVBUFFER: return "evbuffer read write error"; - default: return ctx->error; - } -} - -void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out) -{ - *out = instance->statistic; -} - -struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result) -{ - return (struct tango_cache_result *)promise_result; -} - -static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic) -{ - switch(ctx->method) - { - case CACHE_REQUEST_PUT: - if(ctx->fail_state) - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->put_err_http += 1; - else - statistic->put_err_redis += 1; - } - else - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->put_succ_http += 1; - else - statistic->put_succ_redis += 1; - } - break; - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - if(ctx->fail_state) - { - if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT) - statistic->get_miss_num += 1; - else if(ctx->locate == OBJECT_IN_HOS) - statistic->get_err_http += 1; - else - statistic->get_err_redis += 1; - } - else - { - if(ctx->locate == OBJECT_IN_HOS) - statistic->get_succ_http += 1; - else - statistic->get_succ_redis += 1; - } - break; - case CACHE_REQUEST_DELETE: - if(ctx->fail_state) - { - statistic->del_error_num += 1; - } - else - { - statistic->del_succ_num += 1; - } - break; - case CACHE_REQUEST_DELETE_MUL: - statistic->del_succ_num += ctx->del.succ_num; - statistic->del_error_num += ctx->del.fail_num; - break; - default:break; - } -} - -void easy_string_destroy(struct easy_string *estr) -{ - if(estr->buff != NULL) - { - free(estr->buff); - estr->buff = NULL; - estr->len = estr->size = 0; - } -} - -void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) -{ - if(estr->size-estr->len < len+1) - { - estr->size += len*4+1; - estr->buff = (char*)realloc(estr->buff, estr->size); - } - - memcpy(estr->buff+estr->len, data, len); - estr->len += len; - estr->buff[estr->len]='\0'; -} - -//callback: �Ƿ���ûص���������ҪΪ���ֱ�ӵ���APIʱʧ�ܣ����ٵ��ûص�������ͨ������ֵ�ж� -void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) -{ - struct multipart_etag_list *etag; - - if(ctx->curl != NULL) - { - curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); - curl_easy_cleanup(ctx->curl); - } - easy_string_destroy(&ctx->response); - - switch(ctx->method) - { - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - easy_string_destroy(&ctx->get.response_tag); - break; - - case CACHE_REQUEST_PUT: - if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); - if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); - if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta); - if(ctx->put.once_request.len > 0) - { - ctx->instance->statistic.memory_used -= ctx->put.once_request.size; - easy_string_destroy(&ctx->put.once_request); - } - if(ctx->put.evbuf!=NULL) - { - ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); - evbuffer_free(ctx->put.evbuf); - } - while(NULL != (etag = TAILQ_FIRST(&ctx->put.etag_head))) - { - TAILQ_REMOVE(&ctx->put.etag_head, etag, node); - free(etag->etag); - free(etag); - }//no break here - case CACHE_REQUEST_DELETE_MUL: - if(ctx->headers != NULL) - { - curl_slist_free_all(ctx->headers); - }//no break here - case CACHE_REQUEST_DELETE: - if(callback && ctx->promise != NULL) - { - if(ctx->fail_state) - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - else - { - promise_success(ctx->promise, NULL); - } - } - break; - default: break; - } - update_statistics(ctx, &ctx->instance->statistic); - free(ctx); -} - -//�ж�session�Ƿ����ƣ�����ȡ��ʼ���ж�where_to_get�Ƿ�ȫ����MINIO�� -bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get) -{ - if(where_to_get == OBJECT_IN_HOS) - { - return (instance->statistic.session_http>=instance->param->maximum_sessions); - } - else - { - return (instance->statistic.session_redis>=instance->param->maximum_sessions); - } -} - - -//�����ϴ�API��ʹ��ctx��evbuffer������������ctx��ȡ���� -enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size) -{ - if(instance->param->fsstatid_trig) - { - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size); - } - if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize) - { - return OBJECT_IN_HOS; - } - else - { - return OBJECT_IN_REDIS; - } -} - -void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) -{ - if(path != NULL) - { - if(ctx->locate == OBJECT_IN_HOS) - { - snprintf(path, pathsize, "http://%s/%s", ctx->hostaddr, ctx->object_key); - } - else - { - snprintf(path, pathsize, "redis://%s/%s", ctx->instance->redisaddr, ctx->object_key); - } - } -} - -int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize) -{ - if(!ctx->fail_state) - { - ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size); - tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size); - } - } - return do_tango_cache_update_end(ctx, false); -} - -void tango_cache_update_cancel(struct tango_cache_ctx *ctx) -{ - ctx->put.close_state = true; - if(ctx->curl != NULL) - { - curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); - curl_easy_cleanup(ctx->curl); - ctx->curl = NULL; - } - tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED); - if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else - { - tango_cache_ctx_destroy(ctx); - } -} - -int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size) -{ - if(ctx->fail_state) - { - return 0; //TODO: ��ʱ���Է���ֵ!! - } - if(evbuffer_add(ctx->put.evbuf, data, size)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - return 0; - } - ctx->instance->statistic.memory_used += size; - ctx->put.object_size += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); - } - return 0; -} - -int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) -{ - size_t size; - - if(ctx->fail_state) - { - return 0;//TODO: ��ʱ���Է���ֵ!! - } - - size = evbuffer_get_length(evbuf); - if(way == EVBUFFER_MOVE) - { - if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - return 0; - } - } - else - { - if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - return 0; - } - } - ctx->instance->statistic.memory_used += size; - ctx->put.object_size += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); - } - return 0; -} - -struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc) -{ - struct tango_cache_ctx *ctx; - char buffer[2064], *user_tag=NULL, *user_tag_value=NULL; - time_t expires, now, last_modify; - struct easy_string hdr_estr={NULL, 0, 0}; - - if(sessions_exceeds_limit(instance, maybe_loc) || (u_int64_t)instance->statistic.memory_used>=instance->param->maximum_used_mem) - { - instance->error_code = CACHE_OUTOF_MEMORY; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_PUT; - ctx->locate = maybe_loc; - - if(instance->param->hash_object_key) - { - caculate_sha256(meta->url, strlen(meta->url), buffer, 72); - if(meta->user_log_name) - { - struct timespec start_time; - clock_gettime(CLOCK_REALTIME,&start_time); - snprintf(ctx->object_key, 256, "%s/%lu_%c%c_%c%c_%s", instance->param->bucketname, start_time.tv_nsec, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); - } - //����ԭʼURL - snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url); - ctx->headers = curl_slist_append(ctx->headers, buffer); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); - } - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - if(ctx->headers!=NULL) curl_slist_free_all(ctx->headers); - free(ctx); - return NULL; - } - - //Expires�ֶΣ����ڻ����ڲ��ж������Ƿ�ʱ - now = time(NULL); - expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout; - if(expires_timestamp2hdr_str(now + expires, buffer, 256)) - { - ctx->headers = curl_slist_append(ctx->headers, buffer); - } - ctx->put.object_ttl = expires; - //Last-Modify�ֶΣ�����GETʱ�ж��Ƿ����� - last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified; - if(last_modify == 0) - { - last_modify = get_gmtime_timestamp(now); - } - sprintf(buffer, "x-amz-meta-lm: %lu", last_modify); - ctx->headers = curl_slist_append(ctx->headers, buffer); - //�б���֧�ֵı�ͷ�� - for(int i=0; i<HDR_CONTENT_NUM; i++) - { - if(meta->std_hdr[i] != NULL) - { - ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i])); - easy_string_savedata(&hdr_estr, "\r\n", 2); - } - } - } - if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL) - { - ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n")); - } - } - ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE - //���������ͷ����GETʱ��ԭ������ - if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) - { - user_tag = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //������������ռ䣻18=17+1: ͷ��+�ַ��������� - memcpy(user_tag, "x-amz-meta-user: ", 17); - user_tag_value = user_tag+17; - Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)user_tag_value); - ctx->headers = curl_slist_append(ctx->headers, user_tag); - } - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - ctx->put.object_meta = cJSON_CreateObject(); - if(instance->param->hash_object_key) - { - cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-Url", meta->url); - } - cJSON_AddNumberToObject(ctx->put.object_meta, "Expires", now + expires); - cJSON_AddNumberToObject(ctx->put.object_meta, "X-Amz-Meta-Lm", last_modify); - cJSON_AddStringToObject(ctx->put.object_meta, "Headers", hdr_estr.buff); - if(user_tag_value != NULL) - { - cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-User", user_tag_value); - } - easy_string_destroy(&hdr_estr); - } - if(user_tag != NULL) - { - free(user_tag); - } - - ctx->put.evbuf = evbuffer_new(); - TAILQ_INIT(&ctx->put.etag_head); - return ctx; -} - -struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN; - - if(instance->param->object_store_way != CACHE_SMALL_REDIS) - { - maybe_loc = OBJECT_IN_HOS; - } - - ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc); - if(ctx == NULL) - { - return NULL; - } - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->error_code = CACHE_OK; - return ctx; -} - -//һ�����ϴ�ʱ��ֱ�Ӷ�λ�����ϴ���λ�� -struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, - size_t object_size, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION location; - - location = tango_cache_object_locate(instance, object_size); - ctx = tango_cache_update_prepare(instance, f, meta, location); - if(ctx == NULL) - { - return NULL; - } - tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) - { - cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size); - } - return ctx; -} - -int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f, - enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_update_once_prepare(instance, f, meta, size, path, pathsize); - if(ctx == NULL) - { - if(way == PUT_MEM_FREE) free((void *)data); - return -1; - } - return do_tango_cache_upload_once_data(ctx, way, data, size); -} - -int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f, - enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_update_once_prepare(instance, f, meta, evbuffer_get_length(evbuf), path, pathsize); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_upload_once_evbuf(ctx, way, evbuf); -} - -struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, - struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct tango_cache_ctx *ctx; - char sha256[72]; - - if(sessions_exceeds_limit(instance, where_to_get)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - promise_allow_many_successes(ctx->promise); //��λص��������ʱ����promise_finish - ctx->method = method; - ctx->get.state = GET_STATE_START; - ctx->get.max_age = meta->get.max_age; - ctx->get.min_fresh = meta->get.min_fresh; - - if(instance->param->hash_object_key) - { - caculate_sha256(meta->url, strlen(meta->url), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); - } - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - free(ctx); - return NULL; - } - return ctx; -} - -int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get) -{ - struct tango_cache_ctx *ctx; - - if(instance->param->object_store_way != CACHE_SMALL_REDIS) - { - where_to_get = OBJECT_IN_HOS; - } - - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_fetch_object(ctx, where_to_get); -} - -int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta) -{ - struct tango_cache_ctx *ctx; - enum OBJECT_LOCATION location; - - //���������Redis����Ԫ��Ϣ�洢��Redis�� - location = (instance->param->object_store_way != CACHE_ALL_HOS)?OBJECT_IN_REDIS:OBJECT_IN_HOS; - ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_head_object(ctx, location); -} - -struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct tango_cache_ctx *ctx; - char sha256[72]; - const char *pbucket; - - if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_DELETE; - - pbucket = (bucket==NULL)?instance->param->bucketname:bucket; - if(instance->param->hash_object_key) - { - caculate_sha256(objkey, strlen(objkey), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); - } - else - { - snprintf(ctx->object_key, 256, "%s/%s", pbucket, objkey); - } - if(minio_addr != NULL) - { - snprintf(ctx->hostaddr, 48, "%s", minio_addr); - } - else if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += 1; - free(ctx); - return NULL; - } - return ctx; -} - -int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_delete_prepare(instance, f, objkey, minio_addr, bucket); - if(ctx == NULL) - { - return -1; - } - return (cache_delete_minio_object(ctx)==1)?0:-1; -} - -struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num) -{ - struct tango_cache_ctx *ctx; - char md5[48]={0}, content_md5[64]; - - if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) - { - instance->error_code = CACHE_OUTOF_SESSION; - instance->statistic.totaldrop_num += 1; - return NULL; - } - - ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); - ctx->instance = instance; - ctx->promise = future_to_promise(f); - ctx->method = CACHE_REQUEST_DELETE_MUL; - ctx->del.succ_num = num; - - if(wired_load_balancer_lookup(instance->param->cache.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) - { - instance->error_code = CACHE_ERR_WIREDLB; - instance->statistic.totaldrop_num += num; - free(ctx); - return NULL; - } - - construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size); - caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48); - sprintf(content_md5, "Content-MD5: %s", md5); - ctx->headers = curl_slist_append(ctx->headers, content_md5); - ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); - ctx->headers = curl_slist_append(ctx->headers, "Expect:"); - return ctx; -} - -//TODO: AccessDenied -int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num) -{ - struct tango_cache_ctx *ctx; - - ctx = tango_cache_multi_delete_prepare(instance, f, objlist, num); - if(ctx == NULL) - { - return -1; - } - return do_tango_cache_multi_delete(ctx); -} - -static void check_multi_info(CURLM *multi) -{ - CURLMsg *msg; - int msgs_left; - struct tango_cache_ctx *ctx; - CURL *easy; - CURLcode res; - long res_code; - - while((msg = curl_multi_info_read(multi, &msgs_left))) - { - if(msg->msg != CURLMSG_DONE) - { - continue; - } - - easy = msg->easy_handle; - res = msg->data.result; - curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); - curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code); - curl_multi_remove_handle(multi, easy); - curl_easy_cleanup(easy); - ctx->curl = NULL; - ctx->res_code = 0; - - switch(ctx->method) - { - case CACHE_REQUEST_GET: - case CACHE_REQUEST_HEAD: - tango_cache_curl_get_done(ctx, res, res_code); - break; - case CACHE_REQUEST_PUT: - tango_cache_curl_put_done(ctx, res, res_code); - break; - case CACHE_REQUEST_DELETE: - tango_cache_curl_del_done(ctx, res, res_code); - break; - case CACHE_REQUEST_DELETE_MUL: - tango_cache_curl_muldel_done(ctx, res, res_code); - break; - default: break; - } - } -} - -/* Called by libevent when we get action on a multi socket */ -static void libevent_socket_event_cb(int fd, short action, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from event_assign - UNUSED CURLMcode rc; - int what, still_running; - - what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0); - - rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - - check_multi_info(instance->multi_hd); - if(still_running<=0 && evtimer_pending(&instance->timer_event, NULL)) - { - evtimer_del(&instance->timer_event); - } -} - -/* Called by libevent when our timeout expires */ -static void libevent_timer_event_cb(int fd, short kind, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - UNUSED CURLMcode rc; - int still_running; - - rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - check_multi_info(instance->multi_hd); -} - -static int curl_socket_function_cb(CURL *curl, curl_socket_t sockfd, int what, void *userp, void *sockp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from multi handle - struct curl_socket_data *sockinfo = (struct curl_socket_data *)sockp; //curl_multi_assign, for socket - int action; - - if(what == CURL_POLL_REMOVE) - { - if(sockinfo != NULL) - { - event_del(&sockinfo->sock_event); - free(sockinfo); - } - } - else - { - if(sockinfo == NULL) - { - sockinfo = (struct curl_socket_data *)calloc(1, sizeof(struct curl_socket_data)); - curl_multi_assign(instance->multi_hd, sockfd, sockinfo); - } - else - { - event_del(&sockinfo->sock_event); - } - - action = (what&CURL_POLL_IN?EV_READ:0)|(what&CURL_POLL_OUT?EV_WRITE:0)|EV_PERSIST; - event_assign(&sockinfo->sock_event, instance->evbase, sockfd, action, libevent_socket_event_cb, instance); - event_add(&sockinfo->sock_event, NULL); - } - - return 0; -} - -static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - struct timeval timeout; - UNUSED CURLMcode rc; - int still_running; - - timeout.tv_sec = timeout_ms/1000; - timeout.tv_usec = (timeout_ms%1000)*1000; - - if(timeout_ms == 0) - { - //timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once. - //To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs. - rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running); - instance->statistic.session_http = still_running; - assert(rc==CURLM_OK); - } - else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer. - { - evtimer_del(&instance->timer_event); - } - else //update the timer to the new value. - { - evtimer_add(&instance->timer_event, &timeout); - } - - return 0; //0-success; -1-error -} - -static void instance_statistic_timer_cb(int fd, short kind, void *userp) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; - struct timeval tv; - struct cache_statistics incr_statistic; - long long *plast_statistic = (long long*)&instance->statistic_last; - long long *pnow_statistic = (long long*)&instance->statistic; - long long *pinc_statistic = (long long*)&incr_statistic; - - for(u_int32_t i=0; i<sizeof(struct cache_statistics)/sizeof(long long); i++) - { - pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i]; - } - instance->statistic_last = instance->statistic; - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http); - FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis); - tv.tv_sec = instance->param->fsstat_period; - tv.tv_usec = 0; - event_add(&instance->timer_statistic, &tv); -} - -static int _unfold_IP_range(char* ip_range, char***ip_list, int size) -{ - int i=0,count=0, ret=0; - int range_digits[5]; - memset(range_digits,0,sizeof(range_digits)); - ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); - if(ret!=4&&ret!=5) - { - return 0; - } - if(ret==4&&range_digits[4]==0) - { - range_digits[4]=range_digits[3]; - } - for(i=0;i<5;i++) - { - if(range_digits[i]<0||range_digits[i]>255) - { - return 0; - } - } - count=range_digits[4]-range_digits[3]+1; - *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); - for(i=0;i<count;i++) - { - (*ip_list)[size+i]=(char*)malloc(64); - snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i); - } - return count; -} - -static int unfold_IP_range(const char* ip_range, char***ip_list) -{ - char *token=NULL,*sub_token=NULL,*saveptr; - char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1); - int count=0; - strcpy(buffer,ip_range); - for (token = buffer; ; token= NULL) - { - sub_token= strtok_r(token,";", &saveptr); - if (sub_token == NULL) - break; - count+=_unfold_IP_range(sub_token, ip_list,count); - } - free(buffer); - return count; -} - -static int build_redis_cluster_addrs(const char *iplist, const char *ports, char *redisaddrs, size_t size, void *runtimelog) -{ - u_int32_t redis_ip_num; - u_int32_t redis_port_start, redis_port_end; - char **redis_iplist=NULL; - size_t addrlen; - int ret; - - redis_ip_num = unfold_IP_range(iplist, &redis_iplist); - if(redis_ip_num ==0 ) - { - MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_IP_LIST %s failed.", iplist); - return -1; - } - ret = sscanf(ports, "%u-%u", &redis_port_start, &redis_port_end); - if(ret!=1 && ret!=2) - { - MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_PORT_RANGE %s failed.", iplist); - return -2; - } - - memset(redisaddrs, 0, size); - for(u_int32_t i=0; i<redis_ip_num; i++) - { - addrlen = strlen(redisaddrs); - snprintf(redisaddrs+addrlen, size-addrlen, "%s:%u,", redis_iplist[i], redis_port_start); - } - addrlen = strlen(redisaddrs); - redisaddrs[addrlen-1] = '\0'; - return 0; -} - -static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log) -{ - wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER); - if(wparam->wiredlb == NULL) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); - return -1; - } - wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port)); - wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override)); - if(strlen(wparam->wiredlb_datacenter) > 0) - { - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1); - } - if(wparam->wiredlb_override) - { - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1); - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port)); - } - if(wiredLB_init(wparam->wiredlb) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group); - return -1; - } - return 0; -} - -int register_field_stat(struct tango_cache_parameter *param, void *runtime_log) -{ - int value; - const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS", - "PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS", - "DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"}; - - param->fsstat_handle = FS_create_handle(); - FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); - value = 1; - FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value)); - value = 2; - FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); - value = 1; - FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); - FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); - FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); - if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen); - return -1; - } - - for(int i=0; i<=FS_FILED_TOTAL_DROP; i++) - { - param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); - } - for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++) - { - param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]); - } - param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3); - if(param->fsstat_histlen_id < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed."); - return -1; - } - FS_start(param->fsstat_handle); - return 0; -} - -struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log) -{ - u_int32_t intval; - u_int64_t longval; - struct tango_cache_parameter *param; - char redis_cluster_ip[512], redis_ports[256]; - - param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter)); - - //multi curl - MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1); - param->maximum_host_cnns = intval; - MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20); - param->maximum_pipelines = intval; - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 0); - param->transfer_timeout = intval; - - //instance - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", ¶m->maximum_sessions, 100); - MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); - longval = intval; - param->maximum_used_mem = longval * 1024 * 1024; - MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1); - MESA_load_profile_string_def(profile_path, section, "CACHE_TOKEN", param->cache_token, 256, "c21f969b5f03d33d43e04f8f136e7682"); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", ¶m->upload_block_size, 5242880); - if(param->upload_block_size < 5242880) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 604800); - if(intval < 60) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); - return NULL; - } - param->relative_ttl = intval; - - //wiredlb - MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->cache.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->cache.wiredlb_datacenter, 64); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->cache.wiredlb_override, 1); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); - param->cache.wiredlb_ha_port = intval; - MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->cache.wiredlb_group, 64, "MINIO_GROUP"); - MESA_load_profile_uint_def(profile_path, section, "CACHE_LISTEN_PORT", ¶m->cache.port, 9000); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_IP_LIST", param->cache.iplist, 4096) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BROKERS_LIST not found.", profile_path, section); - return NULL; - } - if(wired_load_balancer_init(¶m->cache, runtime_log)) - { - return NULL; - } - - MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_HOS); - if(param->object_store_way!=CACHE_ALL_HOS && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section); - return NULL; - } - if(param->object_store_way != CACHE_ALL_HOS) - { - if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section); - return NULL; - } - if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section); - return NULL; - } - if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log)) - { - return NULL; - } - MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", ¶m->redis_object_maxsize, 10240); - if(param->redis_object_maxsize >= param->upload_block_size) - { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CACHE_OBJECT_SIZE must be smaller than CACHE_UPLOAD_BLOCK_SIZE.", profile_path, section); - return NULL; - } - } - - //FieldStat LOG - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE"); - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log"); - MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", ¶m->fsstat_period, 10); - MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", ¶m->fsstatid_trig, 0); - MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2"); - MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", ¶m->fsstat_dst_port, 8125); - MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256); - if(param->fsstatid_trig && register_field_stat(param, runtime_log)) - { - return NULL; - } - return param; -} - -struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog) -{ - struct tango_cache_instance *instance; - char *redis_sep, *save_ptr=NULL; - struct timeval tv; - time_t now, remain; - - instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance)); - memset(instance, 0, sizeof(struct tango_cache_instance)); - instance->runtime_log = runtimelog; - instance->evbase = evbase; - instance->param = param; - - instance->multi_hd = curl_multi_init(); - curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->maximum_host_cnns); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->maximum_pipelines); - curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); - curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp - curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); - curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); - - if(param->object_store_way != CACHE_ALL_HOS) - { - if(redis_asyn_connect_init(instance)) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s failed.", instance->param->redisaddrs); - free(instance); - return NULL; - } - redis_sep = strtok_r(param->redisaddrs, ",", &save_ptr); - sprintf(instance->redisaddr, "%s", redis_sep); - } - evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); - - if(param->fsstatid_trig) - { - evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance); - now = time(NULL); - remain = instance->param->fsstat_period - (now % instance->param->fsstat_period); - tv.tv_sec = remain; - tv.tv_usec = 0; - evtimer_add(&instance->timer_statistic, &tv); - } - return instance; -} - -void tango_cache_global_init(void) -{ - curl_global_init(CURL_GLOBAL_NOTHING); -} - |
