summaryrefslogtreecommitdiff
path: root/cache/src/tango_cache_client.cpp
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2023-12-29 14:39:03 +0800
committerfengweihao <[email protected]>2023-12-29 14:39:03 +0800
commitced991b4a0569f52fbae33aa8ce15c9d4ea6bda7 (patch)
treef28257f66772f6dcdf130d82272054dddfe6dc59 /cache/src/tango_cache_client.cpp
parentb011a9268042db22cc54ca8171640dbfb2ab617c (diff)
TSG-18286 Proxy支持虚拟表表名变更,删除代理本地缓存,删除tsg-http相关配置v4.8.56-20231229
Diffstat (limited to 'cache/src/tango_cache_client.cpp')
-rw-r--r--cache/src/tango_cache_client.cpp1251
1 files changed, 0 insertions, 1251 deletions
diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp
deleted file mode 100644
index 07674ad..0000000
--- a/cache/src/tango_cache_client.cpp
+++ /dev/null
@@ -1,1251 +0,0 @@
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <errno.h>
-#include <sys/time.h>
-#include <time.h>
-#include <string.h>
-#include <openssl/sha.h>
-#include <openssl/md5.h>
-
-#include <MESA/MESA_prof_load.h>
-
-#include "tango_cache_client_in.h"
-#include "tango_cache_transfer.h"
-#include "tango_cache_tools.h"
-#include "tango_cache_xml.h"
-#include "tango_cache_redis.h"
-
-int TANGO_CACHE_VERSION_20181009=0;
-
-static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size)
-{
- MD5_CTX c;
- unsigned char md5[17]={0};
-
- if(size < 33)
- return -1;
-
- MD5_Init(&c);
- MD5_Update(&c, data, len);
- MD5_Final(md5, &c);
-
- Base64_EncodeBlock(md5, 16, result);
- return 0;
-}
-
-void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
-{
- SHA256_CTX c;
- unsigned char sha256[128];
- u_int32_t length;
-
- SHA256_Init(&c);
- SHA256_Update(&c, data, len);
- SHA256_Final(sha256, &c);
-
- length = (size > 64)?32:(size-1)/2; //Ԥ��һ���ռ�
- for(u_int32_t i=0; i<length; i++)
- {
- sprintf(result + i*2, "%02x", sha256[i]);
- }
- result[length*2] = '\0';
-}
-
-static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize)
-{
- struct WLB_consumer_t chosen;
-
- if(wiredLB_lookup(wiredlb, key, keylen, &chosen))
- {
- return -1;
- }
- snprintf(host, hostsize, "%s:%hu", chosen.ip_addr, chosen.data_port);
- return 0;
-}
-
-enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx)
-{
- return ctx->error_code;
-}
-enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance)
-{
- return instance->error_code;
-}
-
-void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code)
-{
- ctx->fail_state = true;
- ctx->error_code = error_code;
-}
-
-const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx)
-{
- switch(ctx->error_code)
- {
- case CACHE_CACHE_MISS: return "cache not hit";
- case CACHE_TIMEOUT: return "cache not fresh";
- case CACHE_OUTOF_MEMORY: return "outof memory";
- case CACHE_ERR_WIREDLB: return "wiredlb error";
- case CACHE_ERR_SOCKPAIR: return "socketpair error";
- case CACHE_ERR_INTERNAL: return "internal error";
- case CACHE_ERR_REDIS_JSON: return "parse redis json error";
- case CACHE_ERR_REDIS_CONNECT:return "redis is not connected";
- case CACHE_ERR_REDIS_EXEC: return "redis command reply error";
- case CACHE_OUTOF_SESSION: return "two many curl sessions";
- case CACHE_UPDATE_CANCELED: return "update was canceled";
- case CACHE_ERR_EVBUFFER: return "evbuffer read write error";
- default: return ctx->error;
- }
-}
-
-void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out)
-{
- *out = instance->statistic;
-}
-
-struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result)
-{
- return (struct tango_cache_result *)promise_result;
-}
-
-static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
-{
- switch(ctx->method)
- {
- case CACHE_REQUEST_PUT:
- if(ctx->fail_state)
- {
- if(ctx->locate == OBJECT_IN_HOS)
- statistic->put_err_http += 1;
- else
- statistic->put_err_redis += 1;
- }
- else
- {
- if(ctx->locate == OBJECT_IN_HOS)
- statistic->put_succ_http += 1;
- else
- statistic->put_succ_redis += 1;
- }
- break;
- case CACHE_REQUEST_GET:
- case CACHE_REQUEST_HEAD:
- if(ctx->fail_state)
- {
- if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT)
- statistic->get_miss_num += 1;
- else if(ctx->locate == OBJECT_IN_HOS)
- statistic->get_err_http += 1;
- else
- statistic->get_err_redis += 1;
- }
- else
- {
- if(ctx->locate == OBJECT_IN_HOS)
- statistic->get_succ_http += 1;
- else
- statistic->get_succ_redis += 1;
- }
- break;
- case CACHE_REQUEST_DELETE:
- if(ctx->fail_state)
- {
- statistic->del_error_num += 1;
- }
- else
- {
- statistic->del_succ_num += 1;
- }
- break;
- case CACHE_REQUEST_DELETE_MUL:
- statistic->del_succ_num += ctx->del.succ_num;
- statistic->del_error_num += ctx->del.fail_num;
- break;
- default:break;
- }
-}
-
-void easy_string_destroy(struct easy_string *estr)
-{
- if(estr->buff != NULL)
- {
- free(estr->buff);
- estr->buff = NULL;
- estr->len = estr->size = 0;
- }
-}
-
-void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
-{
- if(estr->size-estr->len < len+1)
- {
- estr->size += len*4+1;
- estr->buff = (char*)realloc(estr->buff, estr->size);
- }
-
- memcpy(estr->buff+estr->len, data, len);
- estr->len += len;
- estr->buff[estr->len]='\0';
-}
-
-//callback: �Ƿ���ûص���������ҪΪ���ֱ�ӵ���APIʱʧ�ܣ����ٵ��ûص�������ͨ������ֵ�ж�
-void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
-{
- struct multipart_etag_list *etag;
-
- if(ctx->curl != NULL)
- {
- curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
- curl_easy_cleanup(ctx->curl);
- }
- easy_string_destroy(&ctx->response);
-
- switch(ctx->method)
- {
- case CACHE_REQUEST_GET:
- case CACHE_REQUEST_HEAD:
- easy_string_destroy(&ctx->get.response_tag);
- break;
-
- case CACHE_REQUEST_PUT:
- if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
- if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
- if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta);
- if(ctx->put.once_request.len > 0)
- {
- ctx->instance->statistic.memory_used -= ctx->put.once_request.size;
- easy_string_destroy(&ctx->put.once_request);
- }
- if(ctx->put.evbuf!=NULL)
- {
- ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
- evbuffer_free(ctx->put.evbuf);
- }
- while(NULL != (etag = TAILQ_FIRST(&ctx->put.etag_head)))
- {
- TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
- free(etag->etag);
- free(etag);
- }//no break here
- case CACHE_REQUEST_DELETE_MUL:
- if(ctx->headers != NULL)
- {
- curl_slist_free_all(ctx->headers);
- }//no break here
- case CACHE_REQUEST_DELETE:
- if(callback && ctx->promise != NULL)
- {
- if(ctx->fail_state)
- {
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- }
- else
- {
- promise_success(ctx->promise, NULL);
- }
- }
- break;
- default: break;
- }
- update_statistics(ctx, &ctx->instance->statistic);
- free(ctx);
-}
-
-//�ж�session�Ƿ񳬹����ƣ�����ȡ��ʼ���ж�where_to_get�Ƿ�ȫ����MINIO��
-bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get)
-{
- if(where_to_get == OBJECT_IN_HOS)
- {
- return (instance->statistic.session_http>=instance->param->maximum_sessions);
- }
- else
- {
- return (instance->statistic.session_redis>=instance->param->maximum_sessions);
- }
-}
-
-
-//�����ϴ�API��ʹ��ctx��evbuffer�������޷�����ctx��ȡ����
-enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size)
-{
- if(instance->param->fsstatid_trig)
- {
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size);
- }
- if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize)
- {
- return OBJECT_IN_HOS;
- }
- else
- {
- return OBJECT_IN_REDIS;
- }
-}
-
-void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize)
-{
- if(path != NULL)
- {
- if(ctx->locate == OBJECT_IN_HOS)
- {
- snprintf(path, pathsize, "http://%s/%s", ctx->hostaddr, ctx->object_key);
- }
- else
- {
- snprintf(path, pathsize, "redis://%s/%s", ctx->instance->redisaddr, ctx->object_key);
- }
- }
-}
-
-int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize)
-{
- if(!ctx->fail_state)
- {
- ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size);
- tango_cache_get_object_path(ctx, path, pathsize);
-
- if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
- {
- cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size);
- }
- }
- return do_tango_cache_update_end(ctx, false);
-}
-
-void tango_cache_update_cancel(struct tango_cache_ctx *ctx)
-{
- ctx->put.close_state = true;
- if(ctx->curl != NULL)
- {
- curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
- curl_easy_cleanup(ctx->curl);
- ctx->curl = NULL;
- }
- tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED);
- if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx))
- {
- ctx->put.state = PUT_STATE_CANCEL;
- }
- else
- {
- tango_cache_ctx_destroy(ctx);
- }
-}
-
-int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size)
-{
- if(ctx->fail_state)
- {
- return 0; //TODO: ��ʱ���Է���ֵ!!
- }
- if(evbuffer_add(ctx->put.evbuf, data, size))
- {
- tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
- return 0;
- }
- ctx->instance->statistic.memory_used += size;
- ctx->put.object_size += size;
- if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
- {
- cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
- }
- return 0;
-}
-
-int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf)
-{
- size_t size;
-
- if(ctx->fail_state)
- {
- return 0;//TODO: ��ʱ���Է���ֵ!!
- }
-
- size = evbuffer_get_length(evbuf);
- if(way == EVBUFFER_MOVE)
- {
- if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
- return 0;
- }
- }
- else
- {
- if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
- return 0;
- }
- }
- ctx->instance->statistic.memory_used += size;
- ctx->put.object_size += size;
- if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
- {
- cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
- }
- return 0;
-}
-
-struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc)
-{
- struct tango_cache_ctx *ctx;
- char buffer[2064], *user_tag=NULL, *user_tag_value=NULL;
- time_t expires, now, last_modify;
- struct easy_string hdr_estr={NULL, 0, 0};
-
- if(sessions_exceeds_limit(instance, maybe_loc) || (u_int64_t)instance->statistic.memory_used>=instance->param->maximum_used_mem)
- {
- instance->error_code = CACHE_OUTOF_MEMORY;
- instance->statistic.totaldrop_num += 1;
- return NULL;
- }
-
- ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
- ctx->instance = instance;
- ctx->promise = future_to_promise(f);
- ctx->method = CACHE_REQUEST_PUT;
- ctx->locate = maybe_loc;
-
- if(instance->param->hash_object_key)
- {
- caculate_sha256(meta->url, strlen(meta->url), buffer, 72);
- if(meta->user_log_name)
- {
- struct timespec start_time;
- clock_gettime(CLOCK_REALTIME,&start_time);
- snprintf(ctx->object_key, 256, "%s/%lu_%c%c_%c%c_%s", instance->param->bucketname, start_time.tv_nsec, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
- }
- else
- {
- snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
- }
- //����ԭʼURL
- snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- }
- else
- {
- snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url);
- }
- if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
- {
- instance->error_code = CACHE_ERR_WIREDLB;
- instance->statistic.totaldrop_num += 1;
- if(ctx->headers!=NULL) curl_slist_free_all(ctx->headers);
- free(ctx);
- return NULL;
- }
-
- //Expires�ֶΣ����ڻ����ڲ��ж������Ƿ�ʱ
- now = time(NULL);
- expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout;
- if(expires_timestamp2hdr_str(now + expires, buffer, 256))
- {
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- }
- ctx->put.object_ttl = expires;
- //Last-Modify�ֶΣ�����GETʱ�ж��Ƿ�����
- last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified;
- if(last_modify == 0)
- {
- last_modify = get_gmtime_timestamp(now);
- }
- sprintf(buffer, "x-amz-meta-lm: %lu", last_modify);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- //�б���֧�ֵı�׼ͷ��
- for(int i=0; i<HDR_CONTENT_NUM; i++)
- {
- if(meta->std_hdr[i] != NULL)
- {
- ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]);
- if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
- {
- easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i]));
- easy_string_savedata(&hdr_estr, "\r\n", 2);
- }
- }
- }
- if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL)
- {
- ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
- if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
- {
- easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n"));
- }
- }
- ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE
- //���������ͷ����GETʱ��ԭ������
- if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
- {
- user_tag = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //������������ռ䣻18=17+1: ͷ��+�ַ���������
- memcpy(user_tag, "x-amz-meta-user: ", 17);
- user_tag_value = user_tag+17;
- Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)user_tag_value);
- ctx->headers = curl_slist_append(ctx->headers, user_tag);
- }
-
- if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
- {
- ctx->put.object_meta = cJSON_CreateObject();
- if(instance->param->hash_object_key)
- {
- cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-Url", meta->url);
- }
- cJSON_AddNumberToObject(ctx->put.object_meta, "Expires", now + expires);
- cJSON_AddNumberToObject(ctx->put.object_meta, "X-Amz-Meta-Lm", last_modify);
- cJSON_AddStringToObject(ctx->put.object_meta, "Headers", hdr_estr.buff);
- if(user_tag_value != NULL)
- {
- cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-User", user_tag_value);
- }
- easy_string_destroy(&hdr_estr);
- }
- if(user_tag != NULL)
- {
- free(user_tag);
- }
-
- ctx->put.evbuf = evbuffer_new();
- TAILQ_INIT(&ctx->put.etag_head);
- return ctx;
-}
-
-struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
-{
- struct tango_cache_ctx *ctx;
- enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN;
-
- if(instance->param->object_store_way != CACHE_SMALL_REDIS)
- {
- maybe_loc = OBJECT_IN_HOS;
- }
-
- ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc);
- if(ctx == NULL)
- {
- return NULL;
- }
- ctx->instance->statistic.put_recv_num += 1;
- ctx->instance->error_code = CACHE_OK;
- return ctx;
-}
-
-//һ�����ϴ�ʱ��ֱ�Ӷ�λ�����ϴ���λ��
-struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta,
- size_t object_size, char *path, size_t pathsize)
-{
- struct tango_cache_ctx *ctx;
- enum OBJECT_LOCATION location;
-
- location = tango_cache_object_locate(instance, object_size);
- ctx = tango_cache_update_prepare(instance, f, meta, location);
- if(ctx == NULL)
- {
- return NULL;
- }
- tango_cache_get_object_path(ctx, path, pathsize);
-
- if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
- {
- cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size);
- }
- return ctx;
-}
-
-int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f,
- enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
-{
- struct tango_cache_ctx *ctx;
-
- ctx = tango_cache_update_once_prepare(instance, f, meta, size, path, pathsize);
- if(ctx == NULL)
- {
- if(way == PUT_MEM_FREE) free((void *)data);
- return -1;
- }
- return do_tango_cache_upload_once_data(ctx, way, data, size);
-}
-
-int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f,
- enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
-{
- struct tango_cache_ctx *ctx;
-
- ctx = tango_cache_update_once_prepare(instance, f, meta, evbuffer_get_length(evbuf), path, pathsize);
- if(ctx == NULL)
- {
- return -1;
- }
- return do_tango_cache_upload_once_evbuf(ctx, way, evbuf);
-}
-
-struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method,
- struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
-{
- struct tango_cache_ctx *ctx;
- char sha256[72];
-
- if(sessions_exceeds_limit(instance, where_to_get))
- {
- instance->error_code = CACHE_OUTOF_SESSION;
- instance->statistic.totaldrop_num += 1;
- return NULL;
- }
-
- ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
- ctx->instance = instance;
- ctx->promise = future_to_promise(f);
- promise_allow_many_successes(ctx->promise); //��λص��������ʱ����promise_finish
- ctx->method = method;
- ctx->get.state = GET_STATE_START;
- ctx->get.max_age = meta->get.max_age;
- ctx->get.min_fresh = meta->get.min_fresh;
-
- if(instance->param->hash_object_key)
- {
- caculate_sha256(meta->url, strlen(meta->url), sha256, 72);
- snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
- }
- else
- {
- snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url);
- }
- if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
- {
- instance->error_code = CACHE_ERR_WIREDLB;
- instance->statistic.totaldrop_num += 1;
- free(ctx);
- return NULL;
- }
- return ctx;
-}
-
-int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
-{
- struct tango_cache_ctx *ctx;
-
- if(instance->param->object_store_way != CACHE_SMALL_REDIS)
- {
- where_to_get = OBJECT_IN_HOS;
- }
-
- ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get);
- if(ctx == NULL)
- {
- return -1;
- }
- return do_tango_cache_fetch_object(ctx, where_to_get);
-}
-
-int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
-{
- struct tango_cache_ctx *ctx;
- enum OBJECT_LOCATION location;
-
- //���������Redis����Ԫ��Ϣ�洢��Redis��
- location = (instance->param->object_store_way != CACHE_ALL_HOS)?OBJECT_IN_REDIS:OBJECT_IN_HOS;
- ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location);
- if(ctx == NULL)
- {
- return -1;
- }
- return do_tango_cache_head_object(ctx, location);
-}
-
-struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket)
-{
- struct tango_cache_ctx *ctx;
- char sha256[72];
- const char *pbucket;
-
- if(sessions_exceeds_limit(instance, OBJECT_IN_HOS))
- {
- instance->error_code = CACHE_OUTOF_SESSION;
- instance->statistic.totaldrop_num += 1;
- return NULL;
- }
-
- ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
- ctx->instance = instance;
- ctx->promise = future_to_promise(f);
- ctx->method = CACHE_REQUEST_DELETE;
-
- pbucket = (bucket==NULL)?instance->param->bucketname:bucket;
- if(instance->param->hash_object_key)
- {
- caculate_sha256(objkey, strlen(objkey), sha256, 72);
- snprintf(ctx->object_key, 256, "%s/%c%c_%c%c_%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
- }
- else
- {
- snprintf(ctx->object_key, 256, "%s/%s", pbucket, objkey);
- }
- if(minio_addr != NULL)
- {
- snprintf(ctx->hostaddr, 48, "%s", minio_addr);
- }
- else if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
- {
- instance->error_code = CACHE_ERR_WIREDLB;
- instance->statistic.totaldrop_num += 1;
- free(ctx);
- return NULL;
- }
- return ctx;
-}
-
-int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket)
-{
- struct tango_cache_ctx *ctx;
-
- ctx = tango_cache_delete_prepare(instance, f, objkey, minio_addr, bucket);
- if(ctx == NULL)
- {
- return -1;
- }
- return (cache_delete_minio_object(ctx)==1)?0:-1;
-}
-
-struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num)
-{
- struct tango_cache_ctx *ctx;
- char md5[48]={0}, content_md5[64];
-
- if(sessions_exceeds_limit(instance, OBJECT_IN_HOS))
- {
- instance->error_code = CACHE_OUTOF_SESSION;
- instance->statistic.totaldrop_num += 1;
- return NULL;
- }
-
- ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
- ctx->instance = instance;
- ctx->promise = future_to_promise(f);
- ctx->method = CACHE_REQUEST_DELETE_MUL;
- ctx->del.succ_num = num;
-
- if(wired_load_balancer_lookup(instance->param->cache.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
- {
- instance->error_code = CACHE_ERR_WIREDLB;
- instance->statistic.totaldrop_num += num;
- free(ctx);
- return NULL;
- }
-
- construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size);
- caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48);
- sprintf(content_md5, "Content-MD5: %s", md5);
- ctx->headers = curl_slist_append(ctx->headers, content_md5);
- ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
- ctx->headers = curl_slist_append(ctx->headers, "Expect:");
- return ctx;
-}
-
-//TODO: AccessDenied
-int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num)
-{
- struct tango_cache_ctx *ctx;
-
- ctx = tango_cache_multi_delete_prepare(instance, f, objlist, num);
- if(ctx == NULL)
- {
- return -1;
- }
- return do_tango_cache_multi_delete(ctx);
-}
-
-static void check_multi_info(CURLM *multi)
-{
- CURLMsg *msg;
- int msgs_left;
- struct tango_cache_ctx *ctx;
- CURL *easy;
- CURLcode res;
- long res_code;
-
- while((msg = curl_multi_info_read(multi, &msgs_left)))
- {
- if(msg->msg != CURLMSG_DONE)
- {
- continue;
- }
-
- easy = msg->easy_handle;
- res = msg->data.result;
- curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
- curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code);
- curl_multi_remove_handle(multi, easy);
- curl_easy_cleanup(easy);
- ctx->curl = NULL;
- ctx->res_code = 0;
-
- switch(ctx->method)
- {
- case CACHE_REQUEST_GET:
- case CACHE_REQUEST_HEAD:
- tango_cache_curl_get_done(ctx, res, res_code);
- break;
- case CACHE_REQUEST_PUT:
- tango_cache_curl_put_done(ctx, res, res_code);
- break;
- case CACHE_REQUEST_DELETE:
- tango_cache_curl_del_done(ctx, res, res_code);
- break;
- case CACHE_REQUEST_DELETE_MUL:
- tango_cache_curl_muldel_done(ctx, res, res_code);
- break;
- default: break;
- }
- }
-}
-
-/* Called by libevent when we get action on a multi socket */
-static void libevent_socket_event_cb(int fd, short action, void *userp)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from event_assign
- UNUSED CURLMcode rc;
- int what, still_running;
-
- what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0);
-
- rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running);
- instance->statistic.session_http = still_running;
- assert(rc==CURLM_OK);
-
- check_multi_info(instance->multi_hd);
- if(still_running<=0 && evtimer_pending(&instance->timer_event, NULL))
- {
- evtimer_del(&instance->timer_event);
- }
-}
-
-/* Called by libevent when our timeout expires */
-static void libevent_timer_event_cb(int fd, short kind, void *userp)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
- UNUSED CURLMcode rc;
- int still_running;
-
- rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running);
- instance->statistic.session_http = still_running;
- assert(rc==CURLM_OK);
- check_multi_info(instance->multi_hd);
-}
-
-static int curl_socket_function_cb(CURL *curl, curl_socket_t sockfd, int what, void *userp, void *sockp)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from multi handle
- struct curl_socket_data *sockinfo = (struct curl_socket_data *)sockp; //curl_multi_assign, for socket
- int action;
-
- if(what == CURL_POLL_REMOVE)
- {
- if(sockinfo != NULL)
- {
- event_del(&sockinfo->sock_event);
- free(sockinfo);
- }
- }
- else
- {
- if(sockinfo == NULL)
- {
- sockinfo = (struct curl_socket_data *)calloc(1, sizeof(struct curl_socket_data));
- curl_multi_assign(instance->multi_hd, sockfd, sockinfo);
- }
- else
- {
- event_del(&sockinfo->sock_event);
- }
-
- action = (what&CURL_POLL_IN?EV_READ:0)|(what&CURL_POLL_OUT?EV_WRITE:0)|EV_PERSIST;
- event_assign(&sockinfo->sock_event, instance->evbase, sockfd, action, libevent_socket_event_cb, instance);
- event_add(&sockinfo->sock_event, NULL);
- }
-
- return 0;
-}
-
-static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
- struct timeval timeout;
- UNUSED CURLMcode rc;
- int still_running;
-
- timeout.tv_sec = timeout_ms/1000;
- timeout.tv_usec = (timeout_ms%1000)*1000;
-
- if(timeout_ms == 0)
- {
- //timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once.
- //To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs.
- rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running);
- instance->statistic.session_http = still_running;
- assert(rc==CURLM_OK);
- }
- else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer.
- {
- evtimer_del(&instance->timer_event);
- }
- else //update the timer to the new value.
- {
- evtimer_add(&instance->timer_event, &timeout);
- }
-
- return 0; //0-success; -1-error
-}
-
-static void instance_statistic_timer_cb(int fd, short kind, void *userp)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
- struct timeval tv;
- struct cache_statistics incr_statistic;
- long long *plast_statistic = (long long*)&instance->statistic_last;
- long long *pnow_statistic = (long long*)&instance->statistic;
- long long *pinc_statistic = (long long*)&incr_statistic;
-
- for(u_int32_t i=0; i<sizeof(struct cache_statistics)/sizeof(long long); i++)
- {
- pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
- }
- instance->statistic_last = instance->statistic;
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http);
- FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis);
- tv.tv_sec = instance->param->fsstat_period;
- tv.tv_usec = 0;
- event_add(&instance->timer_statistic, &tv);
-}
-
-static int _unfold_IP_range(char* ip_range, char***ip_list, int size)
-{
- int i=0,count=0, ret=0;
- int range_digits[5];
- memset(range_digits,0,sizeof(range_digits));
- ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]);
- if(ret!=4&&ret!=5)
- {
- return 0;
- }
- if(ret==4&&range_digits[4]==0)
- {
- range_digits[4]=range_digits[3];
- }
- for(i=0;i<5;i++)
- {
- if(range_digits[i]<0||range_digits[i]>255)
- {
- return 0;
- }
- }
- count=range_digits[4]-range_digits[3]+1;
- *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count));
- for(i=0;i<count;i++)
- {
- (*ip_list)[size+i]=(char*)malloc(64);
- snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i);
- }
- return count;
-}
-
-static int unfold_IP_range(const char* ip_range, char***ip_list)
-{
- char *token=NULL,*sub_token=NULL,*saveptr;
- char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1);
- int count=0;
- strcpy(buffer,ip_range);
- for (token = buffer; ; token= NULL)
- {
- sub_token= strtok_r(token,";", &saveptr);
- if (sub_token == NULL)
- break;
- count+=_unfold_IP_range(sub_token, ip_list,count);
- }
- free(buffer);
- return count;
-}
-
-static int build_redis_cluster_addrs(const char *iplist, const char *ports, char *redisaddrs, size_t size, void *runtimelog)
-{
- u_int32_t redis_ip_num;
- u_int32_t redis_port_start, redis_port_end;
- char **redis_iplist=NULL;
- size_t addrlen;
- int ret;
-
- redis_ip_num = unfold_IP_range(iplist, &redis_iplist);
- if(redis_ip_num ==0 )
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_IP_LIST %s failed.", iplist);
- return -1;
- }
- ret = sscanf(ports, "%u-%u", &redis_port_start, &redis_port_end);
- if(ret!=1 && ret!=2)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_PORT_RANGE %s failed.", iplist);
- return -2;
- }
-
- memset(redisaddrs, 0, size);
- for(u_int32_t i=0; i<redis_ip_num; i++)
- {
- addrlen = strlen(redisaddrs);
- snprintf(redisaddrs+addrlen, size-addrlen, "%s:%u,", redis_iplist[i], redis_port_start);
- }
- addrlen = strlen(redisaddrs);
- redisaddrs[addrlen-1] = '\0';
- return 0;
-}
-
-static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log)
-{
- wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER);
- if(wparam->wiredlb == NULL)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
- return -1;
- }
- wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
- wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override));
- if(strlen(wparam->wiredlb_datacenter) > 0)
- {
- wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1);
- }
- if(wparam->wiredlb_override)
- {
- wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
- wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port));
- }
- if(wiredLB_init(wparam->wiredlb) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group);
- return -1;
- }
- return 0;
-}
-
-int register_field_stat(struct tango_cache_parameter *param, void *runtime_log)
-{
- int value;
- const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS",
- "PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS",
- "DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"};
-
- param->fsstat_handle = FS_create_handle();
- FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
- value = 1;
- FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value));
- value = 2;
- FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
- value = 1;
- FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
- FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
- FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
- FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
- if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen);
- return -1;
- }
-
- for(int i=0; i<=FS_FILED_TOTAL_DROP; i++)
- {
- param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
- }
- for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++)
- {
- param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]);
- }
- param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3);
- if(param->fsstat_histlen_id < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed.");
- return -1;
- }
- FS_start(param->fsstat_handle);
- return 0;
-}
-
-struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log)
-{
- u_int32_t intval;
- u_int64_t longval;
- struct tango_cache_parameter *param;
- char redis_cluster_ip[512], redis_ports[256];
-
- param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter));
-
- //multi curl
- MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1);
- param->maximum_host_cnns = intval;
- MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20);
- param->maximum_pipelines = intval;
- MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 0);
- param->transfer_timeout = intval;
-
- //instance
- MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &param->maximum_sessions, 100);
- MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
- longval = intval;
- param->maximum_used_mem = longval * 1024 * 1024;
- MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &param->hash_object_key, 1);
- MESA_load_profile_string_def(profile_path, section, "CACHE_TOKEN", param->cache_token, 256, "c21f969b5f03d33d43e04f8f136e7682");
- if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
- return NULL;
- }
- MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &param->upload_block_size, 5242880);
- if(param->upload_block_size < 5242880)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
- return NULL;
- }
- MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 604800);
- if(intval < 60)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
- return NULL;
- }
- param->relative_ttl = intval;
-
- //wiredlb
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->cache.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
- MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->cache.wiredlb_datacenter, 64);
- MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->cache.wiredlb_override, 1);
- MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
- param->cache.wiredlb_ha_port = intval;
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->cache.wiredlb_group, 64, "MINIO_GROUP");
- MESA_load_profile_uint_def(profile_path, section, "CACHE_LISTEN_PORT", &param->cache.port, 9000);
- if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_IP_LIST", param->cache.iplist, 4096) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BROKERS_LIST not found.", profile_path, section);
- return NULL;
- }
- if(wired_load_balancer_init(&param->cache, runtime_log))
- {
- return NULL;
- }
-
- MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", &param->object_store_way, CACHE_ALL_HOS);
- if(param->object_store_way!=CACHE_ALL_HOS && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section);
- return NULL;
- }
- if(param->object_store_way != CACHE_ALL_HOS)
- {
- if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section);
- return NULL;
- }
- if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section);
- return NULL;
- }
- if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log))
- {
- return NULL;
- }
- MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", &param->redis_object_maxsize, 10240);
- if(param->redis_object_maxsize >= param->upload_block_size)
- {
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CACHE_OBJECT_SIZE must be smaller than CACHE_UPLOAD_BLOCK_SIZE.", profile_path, section);
- return NULL;
- }
- }
-
- //FieldStat LOG
- MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE");
- MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log");
- MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", &param->fsstat_period, 10);
- MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", &param->fsstatid_trig, 0);
- MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2");
- MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", &param->fsstat_dst_port, 8125);
- MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256);
- if(param->fsstatid_trig && register_field_stat(param, runtime_log))
- {
- return NULL;
- }
- return param;
-}
-
-struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog)
-{
- struct tango_cache_instance *instance;
- char *redis_sep, *save_ptr=NULL;
- struct timeval tv;
- time_t now, remain;
-
- instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance));
- memset(instance, 0, sizeof(struct tango_cache_instance));
- instance->runtime_log = runtimelog;
- instance->evbase = evbase;
- instance->param = param;
-
- instance->multi_hd = curl_multi_init();
- curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->maximum_host_cnns);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->maximum_pipelines);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
- curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
-
- if(param->object_store_way != CACHE_ALL_HOS)
- {
- if(redis_asyn_connect_init(instance))
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s failed.", instance->param->redisaddrs);
- free(instance);
- return NULL;
- }
- redis_sep = strtok_r(param->redisaddrs, ",", &save_ptr);
- sprintf(instance->redisaddr, "%s", redis_sep);
- }
- evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
-
- if(param->fsstatid_trig)
- {
- evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance);
- now = time(NULL);
- remain = instance->param->fsstat_period - (now % instance->param->fsstat_period);
- tv.tv_sec = remain;
- tv.tv_usec = 0;
- evtimer_add(&instance->timer_statistic, &tv);
- }
- return instance;
-}
-
-void tango_cache_global_init(void)
-{
- curl_global_init(CURL_GLOBAL_NOTHING);
-}
-