summaryrefslogtreecommitdiff
path: root/cache
diff options
context:
space:
mode:
authorzhangchengwei <[email protected]>2018-11-07 15:05:55 +0800
committerzhengchao <[email protected]>2018-11-13 10:48:18 +0800
commitf1822e04c535971f5b7695af52b7e4f4a35b7e6f (patch)
tree692831112e5384a400e39368084c56f69f5b2181 /cache
parent1fbaee37a5b45b45a02fac99d3638aac25004cf8 (diff)
将读取配置单独抽出形成parameter API;支持Redis多机备份和故障切换。
Diffstat (limited to 'cache')
-rw-r--r--cache/include/cache_evbase_client.h5
-rw-r--r--cache/include/tango_cache_client.h5
-rw-r--r--cache/src/cache_evbase_client.cpp15
-rw-r--r--cache/src/tango_cache_client.cpp204
-rw-r--r--cache/src/tango_cache_client_in.h58
-rw-r--r--cache/src/tango_cache_redis.cpp119
-rw-r--r--cache/src/tango_cache_redis.h3
-rw-r--r--cache/src/tango_cache_transfer.cpp38
-rw-r--r--cache/test/cache_evbase_test.cpp5
-rw-r--r--cache/test/cache_evbase_test_threads.cpp5
-rw-r--r--cache/test/pangu_tg_cahce.conf22
-rw-r--r--cache/test/tango_cache_test.c5
12 files changed, 318 insertions, 166 deletions
diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h
index 529a805..8b910c9 100644
--- a/cache/include/cache_evbase_client.h
+++ b/cache/include/cache_evbase_client.h
@@ -28,8 +28,11 @@ void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, s
void cache_evbase_global_init(void);
+//ÿ��minio��Ⱥ��bucket����һ��parameter
+struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog);
+
/*����ʵ����ÿ�߳�һ������ʹ��ʱ����*/
-struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog);
+struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog);
//GET�ӿڣ��ɹ�����0��ʧ�ܷ���-1��future�ص���������������߳���ִ�У���ͬ
diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h
index df60254..2473916 100644
--- a/cache/include/tango_cache_client.h
+++ b/cache/include/tango_cache_client.h
@@ -95,6 +95,7 @@ struct tango_cache_meta_put
struct response_freshness put;
};
+struct tango_cache_parameter;
struct tango_cache_instance;
struct tango_cache_ctx;
@@ -105,9 +106,11 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str
/*ÿ������ִ��һ�γ�ʼ��*/
void tango_cache_global_init(void);
+//ÿ��minio��Ⱥ��bucket����һ��parameter
+struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtimelog);
/*��������API�̲߳���ȫ*/
//ÿ�������̴߳���һ��instance
-struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog);
+struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog);
/* GET�ӿڵ�API*/
diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp
index 5383384..a173832 100644
--- a/cache/src/cache_evbase_client.cpp
+++ b/cache/src/cache_evbase_client.cpp
@@ -168,7 +168,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
break;
case CACHE_ASYN_HEAD:
f = ctx_asyn->ctx->future;
- if(ctx_asyn->instance_asyn->instance->head_meta_source == HEAD_META_FROM_REDIS)
+ if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
ret = tango_cache_head_redis(ctx_asyn->ctx);
}
@@ -395,7 +395,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct
}
if(path != NULL)
{
- snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
+ snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key);
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
@@ -442,7 +442,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc
}
if(path != NULL)
{
- snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
+ snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key);
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
@@ -555,7 +555,12 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
return 0;
}
-struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog)
+struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog)
+{
+ return tango_cache_parameter_new(profile_path, section, runtimelog);
+}
+
+struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog)
{
evutil_socket_t notification_fd[2];
struct cache_evbase_instance *instance_asyn;
@@ -576,7 +581,7 @@ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path
instance_asyn->evbase = evbase;
instance_asyn->notify_readfd = notification_fd[0];
instance_asyn->notify_sendfd = notification_fd[1];
- instance_asyn->instance = tango_cache_instance_new(evbase, profile_path, section, runtimelog);
+ instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog);
if(instance_asyn->instance == NULL)
{
free(instance_asyn);
diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp
index c87e885..7f67c6c 100644
--- a/cache/src/tango_cache_client.cpp
+++ b/cache/src/tango_cache_client.cpp
@@ -123,7 +123,7 @@ struct tango_cache_result *tango_cache_read_result(future_result_t *promise_resu
void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize)
{
- snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
@@ -269,9 +269,9 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data,
return 0;
}
ctx->instance->statistic.memory_used += size;
- if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
+ if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
- cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
+ cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
}
return 0;
}
@@ -303,9 +303,9 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
}
}
ctx->instance->statistic.memory_used += size;
- if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
+ if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
- cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
+ cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
}
return 0;
}
@@ -316,7 +316,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
char buffer[2064];
time_t expires, now, last_modify;
- if((u_int64_t)instance->statistic.memory_used>=instance->cache_limit_size || instance->statistic.session_num>=instance->max_session_num)
+ if((u_int64_t)instance->statistic.memory_used>=instance->param->cache_limit_size || instance->statistic.session_num>=instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_MEMORY;
instance->statistic.totaldrop_num += 1;
@@ -328,7 +328,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
ctx->future = f;
ctx->method = CACHE_REQUEST_PUT;
- if(instance->hash_object_key)
+ if(instance->param->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), buffer, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
@@ -340,7 +340,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
- if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
+ if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -350,7 +350,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
//Expires�ֶΣ����ڻ����ڲ��ж������Ƿ�ʱ
now = time(NULL);
- expires = (meta->put.timeout==0||meta->put.timeout>instance->relative_ttl)?instance->relative_ttl:meta->put.timeout;
+ expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout;
if(expires_timestamp2hdr_str(now + expires, buffer, 256))
{
ctx->headers = curl_slist_append(ctx->headers, buffer);
@@ -417,7 +417,7 @@ int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct f
}
if(path != NULL)
{
- snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
+ snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_data(ctx, way, data, size);
@@ -435,7 +435,7 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct
}
if(path != NULL)
{
- snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
+ snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_evbuf(ctx, way, evbuf);
@@ -446,7 +446,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
struct tango_cache_ctx *ctx;
char sha256[72];
- if(instance->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->max_session_num)
+ if(instance->param->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -461,7 +461,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
ctx->get.max_age = meta->get.max_age;
ctx->get.min_fresh = meta->get.min_fresh;
- if(instance->hash_object_key)
+ if(instance->param->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
@@ -470,7 +470,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
- if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
+ if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -502,7 +502,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future
return -1;
}
- if(ctx->instance->head_meta_source == HEAD_META_FROM_REDIS)
+ if(instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
return tango_cache_head_redis(ctx);
}
@@ -517,7 +517,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
struct tango_cache_ctx *ctx;
char sha256[72];
- if(instance->statistic.session_num >= instance->max_session_num)
+ if(instance->statistic.session_num >= instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -529,7 +529,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
ctx->future = f;
ctx->method = CACHE_REQUEST_DELETE;
- if(instance->hash_object_key)
+ if(instance->param->hash_object_key)
{
caculate_sha256(objkey, strlen(objkey), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
@@ -538,7 +538,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
{
snprintf(ctx->object_key, 256, "%s", objkey);
}
- if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
+ if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -565,7 +565,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
struct tango_cache_ctx *ctx;
char md5[48]={0}, content_md5[48];
- if(instance->statistic.session_num >= instance->max_session_num)
+ if(instance->statistic.session_num >= instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -578,7 +578,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
ctx->method = CACHE_REQUEST_DELETE_MUL;
ctx->del.succ_num = num;
- if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
+ if(wired_load_balancer_lookup(instance->param->minio.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += num;
@@ -586,7 +586,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
return NULL;
}
- construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size);
+ construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size);
caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48);
sprintf(content_md5, "Content-MD5: %s", md5);
ctx->headers = curl_slist_append(ctx->headers, content_md5);
@@ -750,94 +750,119 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
return 0; //0-success; -1-error
}
-static int wired_load_balancer_init(struct tango_cache_instance *instance)
+static int wired_load_balancer_init(const char *topic, const char *datacenter, int override, struct wiredlb_parameter *wparam, void *runtime_log)
{
- instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER);
- if(instance->wiredlb == NULL)
+ wparam->wiredlb = wiredLB_create(topic, wparam->wiredlb_group, WLB_PRODUCER);
+ if(wparam->wiredlb == NULL)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
return -1;
}
- wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port));
- wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
- wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
- if(instance->wiredlb_override)
+ wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
+ wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &override, sizeof(override));
+ wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, datacenter, strlen(datacenter)+1);
+ if(override)
{
- wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1);
- wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port));
+ wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
+ wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port));
}
- if(wiredLB_init(instance->wiredlb) < 0)
+ if(wiredLB_init(wparam->wiredlb) < 0)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n");
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group);
return -1;
}
return 0;
}
-static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section)
+struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log)
{
u_int32_t intval;
u_int64_t longval;
+ struct tango_cache_parameter *param;
+ param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter));
+
+ //multi curl
MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1);
- instance->max_cnn_host = intval;
+ param->max_cnn_host = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20);
- instance->max_pipeline_num = intval;
- MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &instance->max_session_num, 200);
+ param->max_pipeline_num = intval;
+ MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15);
+ param->transfer_timeout = intval;
+
+ //instance
+ MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &param->max_session_num, 200);
MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
longval = intval;
- instance->cache_limit_size = longval * 1024 * 1024;
- MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15);
- instance->transfer_timeout = intval;
- if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0)
+ param->cache_limit_size = longval * 1024 * 1024;
+ MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &param->hash_object_key, 1);
+ if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
- return -1;
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
+ return NULL;
}
-
- MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &instance->head_meta_source, HEAD_META_FROM_MINIO);
- if(instance->head_meta_source == HEAD_META_FROM_REDIS)
+ MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &param->upload_block_size, 5242880);
+ if(param->upload_block_size < 5242880)
{
- MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", instance->redis_key, 256, instance->bucketname);
- if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IP", instance->redis_ip, 256) < 0)
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IP not found.\n", profile_path, section);
- return -1;
- }
- MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &instance->redis_port, 6379);
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
+ return NULL;
}
- MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1);
- MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &instance->minio_port, 9000);
- if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", instance->minio_iplist, 4096) < 0)
+ MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
+ if(intval < 60)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
- return -1;
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
+ return NULL;
}
- MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880);
- if(instance->upload_block_size < 5242880)
+ param->relative_ttl = intval;
+
+ //wiredlb common
+ MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
+ MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->wiredlb_datacenter, 64, "ASTANA");
+ MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->wiredlb_override, 1);
+
+ //wiredlb minio
+ MESA_load_profile_uint_def(profile_path, section, "WIREDLB_MINIO_HEALTH_PORT", &intval, 52100);
+ param->minio.wiredlb_ha_port = intval;
+ MESA_load_profile_string_def(profile_path, section, "WIREDLB_MINIO_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP");
+ MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &param->minio.port, 9000);
+ if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
- return -1;
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
+ return NULL;
}
- MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
- if(intval < 60)
+ if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->minio, runtime_log))
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
- return -1;
+ return NULL;
}
- instance->relative_ttl = intval;
- //Wired_LB����
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", instance->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN");
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA");
- MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1);
- MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
- instance->wiredlb_ha_port = (u_int16_t)intval;
- return 0;
+ //wiredlb redis
+ MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &param->head_meta_source, HEAD_META_FROM_MINIO);
+ if(param->head_meta_source == HEAD_META_FROM_REDIS)
+ {
+ MESA_load_profile_uint_def(profile_path, section, "WIREDLB_REDIS_HEALTH_PORT", &intval, 0);
+ param->redis.wiredlb_ha_port = intval;
+ MESA_load_profile_string_def(profile_path, section, "WIREDLB_REDIS_GROUP", param->redis.wiredlb_group, 64, "REDIS_GROUP");
+ MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", param->redis_key, 256, param->bucketname);
+ MESA_load_profile_uint_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &param->redis.port, 6379);
+ if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IPLIST", param->redis.iplist, 256) < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IPLIST not found.\n", profile_path, section);
+ return NULL;
+ }
+ if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_MAIN_REDIS_IP", param->redis.mainip, 64) < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_MAIN_REDIS_IP not found.\n", profile_path, section);
+ return NULL;
+ }
+ if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->redis, runtime_log))
+ {
+ return NULL;
+ }
+ }
+ return param;
}
-struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog)
+struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog)
{
struct tango_cache_instance *instance;
@@ -845,39 +870,26 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,
memset(instance, 0, sizeof(struct tango_cache_instance));
instance->runtime_log = runtimelog;
instance->evbase = evbase;
+ instance->param = param;
- if(load_local_configure(instance, profile_path, section))
- {
- free(instance);
- return NULL;
- }
- if(wired_load_balancer_init(instance))
- {
- free(instance);
- return NULL;
- }
-
instance->multi_hd = curl_multi_init();
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host);
- curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, instance->max_pipeline_num);
+ curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->max_cnn_host);
+ curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->max_pipeline_num);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
- if(instance->head_meta_source == HEAD_META_FROM_REDIS)
+ if(param->head_meta_source == HEAD_META_FROM_REDIS)
{
- if(redis_asyn_connect_init(instance))
+ if(redis_asyn_connect_init(instance, instance->param->redis.mainip))
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", instance->redis_ip, instance->redis_port);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.",
+ instance->current_redisip, instance->param->redis.port);
free(instance);
return NULL;
}
- else
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u success.", instance->redis_ip, instance->redis_port);
- }
}
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
return instance;
diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h
index d795b15..ec8b4f5 100644
--- a/cache/src/tango_cache_client_in.h
+++ b/cache/src/tango_cache_client_in.h
@@ -50,39 +50,55 @@ struct easy_string
size_t size;
};
-struct tango_cache_instance
+struct wiredlb_parameter
{
- char minio_iplist[4096];
- char bucketname[256];
- char wiredlb_topic[64];
char wiredlb_group[64];
- char wiredlb_datacenter[64];
- u_int32_t minio_port;
- u_int32_t wiredlb_override;
- u_int16_t wiredlb_ha_port;
- u_int32_t hash_object_key;
- struct event_base* evbase;
- struct event timer_event;
- struct cache_statistics statistic;
- CURLM *multi_hd;
- void *runtime_log;
+ char mainip[64]; //Ĭ�Ϸ��ʵ�redis��ַ
+ char iplist[4096];//minio: minio�б���redis: mainip���˺󣬿�ѡ���б�������mainip
+ u_int32_t port;
+ short wiredlb_ha_port;
WLB_handle_t wiredlb;
- time_t relative_ttl; //������������
- u_int64_t cache_limit_size;
+};
+
+struct tango_cache_parameter
+{
+ char bucketname[256];
+ char redis_key[256];
long max_cnn_host;
long transfer_timeout;//������ʱ������
long max_pipeline_num;
+ u_int64_t cache_limit_size;
u_int32_t max_session_num;
u_int32_t upload_block_size; //minio�ֶ��ϴ������С����
- enum CACHE_ERR_CODE error_code;
+ time_t relative_ttl; //������������
+ u_int32_t hash_object_key;
+ //wiredlb
int head_meta_source; //���Դ�MINIO��REDIS��ȡԪ��Ϣ
+ u_int32_t wiredlb_override;
+ char wiredlb_topic[64];
+ char wiredlb_datacenter[64];
+
+ struct wiredlb_parameter minio;
+ struct wiredlb_parameter redis;
+};
+
+struct tango_cache_instance
+{
+ struct event_base* evbase;
+ struct event timer_event;
+ CURLM *multi_hd;
+ enum CACHE_ERR_CODE error_code;
+
//Ԫ��Ϣ��ȡ��ʽRedis
- redisAsyncContext *redis_ac;
- char redis_key[256];
- char redis_ip[128];
- int redis_port;
int redis_connecting;
+ redisAsyncContext *redis_ac;
+ char current_redisip[64];
+ struct event timer_redis;
+
+ const struct tango_cache_parameter *param;
+ void *runtime_log;
+ struct cache_statistics statistic;
};
struct multipart_etag_list
diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp
index 1d0b583..5c771fc 100644
--- a/cache/src/tango_cache_redis.cpp
+++ b/cache/src/tango_cache_redis.cpp
@@ -39,19 +39,82 @@ struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]=
{"content-md5", "Content-MD5: "}
};
+//һ��mainip���ӳɹ������л�����
+static void main_redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
+{
+ struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
+
+ if(status == REDIS_OK)
+ {
+ evtimer_del(&instance->timer_redis);
+ if(instance->redis_connecting == CACHE_REDIS_CONNECTED)
+ {
+ redisAsyncDisconnect(instance->redis_ac);
+ }
+ sprintf(instance->current_redisip, "%s", instance->param->redis.mainip);
+ instance->redis_ac = (struct redisAsyncContext *)ac;
+ instance->redis_connecting = CACHE_REDIS_CONNECTED;
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.",
+ instance->param->redis.mainip, instance->param->redis.port);
+ }
+ else
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.",
+ instance->param->redis.mainip, instance->param->redis.port, ac->errstr);
+ }
+}
+
static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
if(status == REDIS_OK)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", instance->redis_ip, instance->redis_port);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.",
+ instance->current_redisip, instance->param->redis.port);
}
else
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.",
+ instance->current_redisip, instance->param->redis.port, ac->errstr);
}
instance->redis_connecting = CACHE_REDIS_DISCONNECTED;
+
+ if(!strcmp(instance->current_redisip, instance->param->redis.mainip))
+ {
+ main_redis_check_timer_start(instance);
+ }
+}
+
+void main_redis_check_timer_cb(evutil_socket_t fd, short what, void *arg)
+{
+ struct tango_cache_instance *instance = (struct tango_cache_instance *)arg;
+ redisAsyncContext *redis_ac;
+ struct timeval tv;
+
+ redis_ac = redisAsyncConnect(instance->param->redis.mainip, instance->param->redis.port);
+ if(redis_ac == NULL)
+ {
+ return ;
+ }
+ redisLibeventAttach(redis_ac, instance->evbase);
+ redisAsyncSetConnectionData(redis_ac, instance);
+ redisAsyncSetConnectCallback(redis_ac, main_redis_asyn_connect_cb);
+ redisAsyncSetDisconnectCallback(redis_ac, redis_asyn_disconnect_cb);
+
+ tv.tv_sec = 60;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_redis, &tv);
+}
+
+void main_redis_check_timer_start(struct tango_cache_instance *instance)
+{
+ struct timeval tv;
+
+ tv.tv_sec = 60;
+ tv.tv_usec = 0;
+ evtimer_assign(&instance->timer_redis, instance->evbase, main_redis_check_timer_cb, instance);
+ evtimer_add(&instance->timer_redis, &tv);
}
static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
@@ -60,19 +123,27 @@ static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status
if(status == REDIS_OK)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", instance->redis_ip, instance->redis_port);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.",
+ instance->current_redisip, instance->param->redis.port);
instance->redis_connecting = CACHE_REDIS_CONNECTED;
}
else
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr);
instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.",
+ instance->current_redisip, instance->param->redis.port, ac->errstr);
+ if(!strcmp(instance->current_redisip, instance->param->redis.mainip))
+ {
+ main_redis_check_timer_start(instance);
+ }
}
}
-int redis_asyn_connect_init(struct tango_cache_instance *instance)
+int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip)
{
- instance->redis_ac = redisAsyncConnect(instance->redis_ip, instance->redis_port);
+ sprintf(instance->current_redisip, "%s", redisip); //mainip�õ�ʱ��ʹ��mainip
+
+ instance->redis_ac = redisAsyncConnect(instance->current_redisip, instance->param->redis.port);
if(instance->redis_ac == NULL)
{
return -1;
@@ -85,6 +156,29 @@ int redis_asyn_connect_init(struct tango_cache_instance *instance)
return 0;
}
+int wiredlb_redis_asyn_connect(struct tango_cache_instance *instance)
+{
+ struct WLB_consumer_t cons_array[64];
+ int i, cons_num;
+
+ cons_num = wiredLB_list(instance->param->redis.wiredlb, 64, cons_array);
+ for(i=0; i<cons_num; i++)
+ {
+ if(strcmp(instance->param->redis.mainip, cons_array[i].ip_addr))
+ {
+ if(0==redis_asyn_connect_init(instance, cons_array[i].ip_addr))
+ {
+ break;
+ }
+ }
+ }
+ if(i == cons_num)
+ {
+ return -1;
+ }
+ return 0;
+}
+
static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent)
{
cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires;
@@ -225,22 +319,23 @@ int tango_cache_head_redis(struct tango_cache_ctx *ctx)
{
case CACHE_REDIS_CONNECTED:
ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s",
- ctx->instance->redis_key, ctx->instance->bucketname, ctx->object_key);
+ ctx->instance->param->redis_key, ctx->instance->param->bucketname, ctx->object_key);
if(ret != REDIS_OK)
{
- //redisAsyncDisconnect(ctx->instance->redis_ac);
- redis_asyn_connect_init(ctx->instance);
+ ctx->instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
+ if(!strcmp(ctx->instance->current_redisip, ctx->instance->param->redis.mainip))
+ {
+ main_redis_check_timer_start(ctx->instance);
+ }
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
}
break;
case CACHE_REDIS_DISCONNECTED:
case CACHE_REDIS_CONNECT_IDLE:
- redis_asyn_connect_init(ctx->instance);
+ wiredlb_redis_asyn_connect(ctx->instance);
case CACHE_REDIS_CONNECTING:
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
diff --git a/cache/src/tango_cache_redis.h b/cache/src/tango_cache_redis.h
index 8126025..7edf119 100644
--- a/cache/src/tango_cache_redis.h
+++ b/cache/src/tango_cache_redis.h
@@ -7,7 +7,8 @@
#include "tango_cache_client_in.h"
int tango_cache_head_redis(struct tango_cache_ctx *ctx);
-int redis_asyn_connect_init(struct tango_cache_instance *instance);
+int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip);
+void main_redis_check_timer_start(struct tango_cache_instance *instance);
#endif
diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp
index c404e77..ad97087 100644
--- a/cache/src/tango_cache_transfer.cpp
+++ b/cache/src/tango_cache_transfer.cpp
@@ -123,11 +123,11 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
ctx->put.upload_offset = 0;
if(full)
{
- snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
else
{
- snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
+ snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
}
@@ -141,7 +141,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -184,7 +184,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
return -1;
}
- snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ��ʹ�ûص���������fread�����Է��ֹر�Expectʱ�ᵼ�¿���curl_multi_socket_action
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
@@ -193,7 +193,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -214,13 +214,13 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
return -1;
}
- snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -238,13 +238,13 @@ bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
return false;
}
- snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
+ snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -264,7 +264,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
}
construct_complete_xml(ctx, &ctx->put.combine_xml, &len);
- snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
+ snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
@@ -273,7 +273,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //���Content-Length
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(ctx->headers != NULL)
{
@@ -422,7 +422,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(upload_length >= ctx->instance->upload_block_size)
+ if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
@@ -453,7 +453,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(upload_length >= ctx->instance->upload_block_size)
+ if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
@@ -494,13 +494,13 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM
}
ctx->put.state = PUT_STATE_END;
- snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(way == PUT_MEM_COPY)
{
@@ -608,7 +608,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
return -1;
}
- snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname);
+ snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //���Content-Length����CURLOPT_COPYPOSTFIELDS֮ǰ����
curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
@@ -617,7 +617,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -847,7 +847,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
return -1;
}
- snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
+ snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
if(ctx->method == CACHE_REQUEST_HEAD)
{
@@ -858,7 +858,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
diff --git a/cache/test/cache_evbase_test.cpp b/cache/test/cache_evbase_test.cpp
index 9077f19..b3f0350 100644
--- a/cache/test/cache_evbase_test.cpp
+++ b/cache/test/cache_evbase_test.cpp
@@ -191,6 +191,7 @@ int main(int argc, char **argv)
struct future_pdata *pdata;
struct cache_evbase_ctx *ctx;
void *runtime_log;
+ struct tango_cache_parameter *parameter;
if(argc != 2 && argc!=3)
{
@@ -209,7 +210,9 @@ int main(int argc, char **argv)
}
cache_evbase_global_init();
- instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ assert(parameter != NULL);
+ instance_asyn = cache_evbase_instance_new(parameter, runtime_log);
assert(instance_asyn!=NULL);
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
diff --git a/cache/test/cache_evbase_test_threads.cpp b/cache/test/cache_evbase_test_threads.cpp
index b792bea..f2924b7 100644
--- a/cache/test/cache_evbase_test_threads.cpp
+++ b/cache/test/cache_evbase_test_threads.cpp
@@ -280,6 +280,7 @@ int main(int argc, char **argv)
pthread_t thread_tid;
pthread_attr_t attr;
struct pthread_data pdata[20];
+ struct tango_cache_parameter *parameter;
if(argc!=3)
{
@@ -294,7 +295,9 @@ int main(int argc, char **argv)
}
cache_evbase_global_init();
- instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ assert(parameter != NULL);
+ instance_asyn = cache_evbase_instance_new(parameter, runtime_log);
assert(instance_asyn!=NULL);
pthread_attr_init(&attr);
diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf
index 53cca1d..a1e663c 100644
--- a/cache/test/pangu_tg_cahce.conf
+++ b/cache/test/pangu_tg_cahce.conf
@@ -1,12 +1,15 @@
[TANGO_CACHE]
-#MINIO IP地址,目前只支持一个
+#MINIO IP地址列表,WiredLB格式
MINIO_IP_LIST=192.168.10.61-64;
MINIO_LISTEN_PORT=9000
#每个域名最多开启的链接数
-MAX_CONNECTION_PER_HOST=10
+#MAX_CONNECTION_PER_HOST=1
+MAX_CNNT_PIPELINE_NUM=20
+#MAX_CURL_SESSION_NUM=100
+MAX_CURL_TRANSFER_TIMEOUT_S=15
#bucket的名称
-CACHE_BUCKET_NAME=images
+CACHE_BUCKET_NAME=openbucket
#缓存最大占用的内存空间大小,超出空间时上传失败
MAX_USED_MEMORY_SIZE_MB=5120
#上传时Expires头部的过期时间,单位秒,最小60(1分钟)
@@ -15,16 +18,21 @@ CACHE_DEFAULT_TTL_SECOND=3600
CACHE_OBJECT_KEY_HASH_SWITCH=1
#HEAD元信息的来源,1-MINIO,2-REDIS
-CACHE_HEAD_FROM_SOURCE=1
+CACHE_HEAD_FROM_SOURCE=2
#使用Redis作为元信息获取源
CACHE_HEAD_REDIS_KEY=MINIO_EVENTS_INFO
-CACHE_HEAD_REDIS_IP=192.168.10.63
+#主要的Redis IP地址,优先使用
+CACHE_HEAD_MAIN_REDIS_IP=192.168.10.63
+#只有在主Redis挂掉时,从下述列表选择一个连接,WiredLB格式。
+CACHE_HEAD_REDIS_IPLIST=192.168.10.62-63;
CACHE_HEAD_REDIS_PORT=6379
#WIRED LOAD BALANCER配置
#WIREDLB_OVERRIDE=1
#WIREDLB_TOPIC=
-#WIREDLB_GROUP=
#WIREDLB_DATACENTER=
-#WIREDLB_HEALTH_PORT=52100
+WIREDLB_MINIO_HEALTH_PORT=52100
+#WIREDLB_MINIO_GROUP=
+WIREDLB_REDIS_HEALTH_PORT=52101
+#WIREDLB_REDIS_GROUP=
diff --git a/cache/test/tango_cache_test.c b/cache/test/tango_cache_test.c
index 19c6b19..b449a11 100644
--- a/cache/test/tango_cache_test.c
+++ b/cache/test/tango_cache_test.c
@@ -398,6 +398,7 @@ int main(int crgc, char **arg)
struct event ev_timer;
struct timeval tv;
void *runtime_log;
+ struct tango_cache_parameter *parameter;
runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10);
if(NULL==runtime_log)
@@ -414,7 +415,9 @@ int main(int crgc, char **arg)
init_fifo();
tango_cache_global_init();
- tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ parameter = tango_cache_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
+ assert(parameter != NULL);
+ tango_instance = tango_cache_instance_new(parameter, ev_base, runtime_log);
tv.tv_sec = 10;
tv.tv_usec = 0;