summaryrefslogtreecommitdiff
path: root/cache
diff options
context:
space:
mode:
author郑超 <[email protected]>2019-01-11 22:38:07 +0800
committer郑超 <[email protected]>2019-01-11 22:38:07 +0800
commitf5c153c59ec0c7c0e0a3d1d1b65f3e3ee171ec66 (patch)
tree0856e565714d2af266ed5237a288244f35f8cec0 /cache
parent6d33ec5891ba365d34a71a9947e00411d6029e3a (diff)
Feature cache clientv3.0.6-20190111
Diffstat (limited to 'cache')
-rw-r--r--cache/src/cache_evbase_client.cpp8
-rw-r--r--cache/src/tango_cache_client.cpp218
-rw-r--r--cache/src/tango_cache_client_in.h47
-rw-r--r--cache/src/tango_cache_transfer.cpp26
-rw-r--r--cache/test/cache_evbase_benchmark.cpp5
-rw-r--r--cache/test/lib/libtango_cache_client.abin0 -> 391562 bytes
-rw-r--r--cache/test/pangu_tg_cahce.conf47
-rw-r--r--cache/test/tango_cache_test.cpp2
8 files changed, 313 insertions, 40 deletions
diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp
index bef351e..327b95d 100644
--- a/cache/src/cache_evbase_client.cpp
+++ b/cache/src/cache_evbase_client.cpp
@@ -400,7 +400,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
- tango_cache_ctx_destroy(ctx_asyn->ctx, false);
+ tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
+ tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return NULL;
@@ -444,6 +445,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct
free(buffer->data);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
+ tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
@@ -478,6 +480,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc
evbuffer_free(buffer->evbuf);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
+ tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
@@ -511,6 +514,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
+ tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
@@ -546,6 +550,7 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
+ tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
@@ -576,6 +581,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
+ tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp
index 02d53ae..6653f23 100644
--- a/cache/src/tango_cache_client.cpp
+++ b/cache/src/tango_cache_client.cpp
@@ -215,6 +215,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta);
+ if(ctx->put.once_request.len > 0)
+ {
+ ctx->instance->statistic.memory_used -= ctx->put.once_request.size;
+ easy_string_destroy(&ctx->put.once_request);
+ }
if(ctx->put.evbuf!=NULL)
{
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
@@ -267,6 +272,10 @@ bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_L
//�����ϴ�API��ʹ��ctx��evbuffer�������޷�����ctx��ȡ����
enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size)
{
+ if(instance->param->fsstatid_trig)
+ {
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size);
+ }
if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize)
{
return OBJECT_IN_MINIO;
@@ -883,6 +892,127 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
return 0; //0-success; -1-error
}
+static void instance_statistic_timer_cb(int fd, short kind, void *userp)
+{
+ struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
+ struct timeval tv;
+ struct cache_statistics incr_statistic;
+ long long *plast_statistic = (long long*)&instance->statistic_last;
+ long long *pnow_statistic = (long long*)&instance->statistic;
+ long long *pinc_statistic = (long long*)&incr_statistic;
+
+ for(u_int32_t i=0; i<sizeof(struct cache_statistics)/sizeof(long long); i++)
+ {
+ pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
+ }
+ instance->statistic_last = instance->statistic;
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http);
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis);
+ tv.tv_sec = instance->param->fsstat_period;
+ tv.tv_usec = 0;
+ event_add(&instance->timer_statistic, &tv);
+}
+
+static int _unfold_IP_range(char* ip_range, char***ip_list, int size)
+{
+ int i=0,count=0, ret=0;
+ int range_digits[5];
+ memset(range_digits,0,sizeof(range_digits));
+ ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]);
+ if(ret!=4&&ret!=5)
+ {
+ return 0;
+ }
+ if(ret==4&&range_digits[4]==0)
+ {
+ range_digits[4]=range_digits[3];
+ }
+ for(i=0;i<5;i++)
+ {
+ if(range_digits[i]<0||range_digits[i]>255)
+ {
+ return 0;
+ }
+ }
+ count=range_digits[4]-range_digits[3]+1;
+ *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count));
+ for(i=0;i<count;i++)
+ {
+ (*ip_list)[size+i]=(char*)malloc(64);
+ snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i);
+ }
+ return count;
+}
+
+static int unfold_IP_range(const char* ip_range, char***ip_list)
+{
+ char *token=NULL,*sub_token=NULL,*saveptr;
+ char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1);
+ int count=0;
+ strcpy(buffer,ip_range);
+ for (token = buffer; ; token= NULL)
+ {
+ sub_token= strtok_r(token,";", &saveptr);
+ if (sub_token == NULL)
+ break;
+ count+=_unfold_IP_range(sub_token, ip_list,count);
+ }
+ free(buffer);
+ return count;
+}
+
+static int build_redis_cluster_addrs(const char *iplist, const char *ports, char *redisaddrs, size_t size, void *runtimelog)
+{
+ u_int32_t redis_ip_num;
+ u_int32_t redis_port_start, redis_port_end;
+ char **redis_iplist=NULL;
+ size_t addrlen;
+ int ret;
+
+ redis_ip_num = unfold_IP_range(iplist, &redis_iplist);
+ if(redis_ip_num ==0 )
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_IP_LIST %s failed.", iplist);
+ return -1;
+ }
+ ret = sscanf(ports, "%u-%u", &redis_port_start, &redis_port_end);
+ if(ret!=1 && ret!=2)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_PORT_RANGE %s failed.", iplist);
+ return -2;
+ }
+
+ memset(redisaddrs, 0, size);
+ for(u_int32_t i=0; i<redis_ip_num; i++)
+ {
+ addrlen = strlen(redisaddrs);
+ snprintf(redisaddrs+addrlen, size-addrlen, "%s:%u,", redis_iplist[i], redis_port_start);
+ }
+ addrlen = strlen(redisaddrs);
+ redisaddrs[addrlen-1] = '\0';
+ return 0;
+}
+
static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log)
{
wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER);
@@ -893,7 +1023,10 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt
}
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override));
- wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1);
+ if(strlen(wparam->wiredlb_datacenter) > 0)
+ {
+ wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1);
+ }
if(wparam->wiredlb_override)
{
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
@@ -907,11 +1040,54 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt
return 0;
}
+int register_field_stat(struct tango_cache_parameter *param, void *runtime_log)
+{
+ int value;
+ const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS",
+ "PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS",
+ "DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"};
+
+ param->fsstat_handle = FS_create_handle();
+ FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
+ value = 1;
+ FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value));
+ value = 2;
+ FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
+ value = 1;
+ FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
+ FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
+ FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
+ if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen);
+ return -1;
+ }
+
+ for(int i=0; i<=FS_FILED_TOTAL_DROP; i++)
+ {
+ param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
+ }
+ for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++)
+ {
+ param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]);
+ }
+ param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3);
+ if(param->fsstat_histlen_id < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed.");
+ return -1;
+ }
+ FS_start(param->fsstat_handle);
+ return 0;
+}
+
struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log)
{
u_int32_t intval;
u_int64_t longval;
struct tango_cache_parameter *param;
+ char redis_cluster_ip[512], redis_ports[256];
param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter));
@@ -950,7 +1126,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
//wiredlb
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
- MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64, "ASTANA");
+ MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->minio.wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
param->minio.wiredlb_ha_port = intval;
@@ -974,9 +1150,18 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
}
if(param->object_store_way != CACHE_ALL_MINIO)
{
- if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_ADDRS", param->redisaddrs, 4096) < 0)
+ if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section);
+ return NULL;
+ }
+ if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section);
+ return NULL;
+ }
+ if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log))
{
- MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_ADDRS not found.", profile_path, section);
return NULL;
}
MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", &param->redis_object_maxsize, 10240);
@@ -986,6 +1171,19 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
return NULL;
}
}
+
+ //FieldStat LOG
+ MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE");
+ MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log");
+ MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", &param->fsstat_period, 10);
+ MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", &param->fsstatid_trig, 0);
+ MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2");
+ MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", &param->fsstat_dst_port, 8125);
+ MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256);
+ if(param->fsstatid_trig && register_field_stat(param, runtime_log))
+ {
+ return NULL;
+ }
return param;
}
@@ -993,6 +1191,8 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
{
struct tango_cache_instance *instance;
char *redis_sep, *save_ptr=NULL;
+ struct timeval tv;
+ time_t now, remain;
instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance));
memset(instance, 0, sizeof(struct tango_cache_instance));
@@ -1021,6 +1221,16 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
sprintf(instance->redisaddr, "%s", redis_sep);
}
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
+
+ if(param->fsstatid_trig)
+ {
+ evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance);
+ now = time(NULL);
+ remain = instance->param->fsstat_period - (now % instance->param->fsstat_period);
+ tv.tv_sec = remain;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_statistic, &tv);
+ }
return instance;
}
diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h
index fe679fb..f457fed 100644
--- a/cache/src/tango_cache_client_in.h
+++ b/cache/src/tango_cache_client_in.h
@@ -3,6 +3,7 @@
#include <curl/curl.h>
#include <sys/queue.h>
+#include <pthread.h>
#include <event2/event.h>
#include <event.h>
@@ -11,6 +12,7 @@
#include <cjson/cJSON.h>
#include <MESA/wiredLB.h>
+#include <MESA/field_stat2.h>
#include "tango_cache_client.h"
#define RESPONSE_HDR_EXPIRES 1
@@ -21,6 +23,36 @@
#define CACHE_META_REDIS 1 //Ԫ��Ϣ��REDIS������MINIO
#define CACHE_SMALL_REDIS 2 //Ԫ��Ϣ��С�ļ���REDIS�����ļ���MINIO
+enum FIELD_STAT_FILEDS
+{
+ FS_FILED_GET_RECV=0,
+ FS_FILED_GET_S_TOTAL,
+ FS_FILED_GET_S_HTTP,
+ FS_FILED_GET_S_REDIS,
+ FS_FILED_GET_MISS,
+ FS_FILED_GET_E_TOTAL,
+ FS_FILED_GET_E_HTTP,
+ FS_FILED_GET_E_REDIS,
+ FS_FILED_PUT_RECV,
+ FS_FILED_PUT_S_TOTAL,
+ FS_FILED_PUT_S_HTTP,
+ FS_FILED_PUT_S_REDIS,
+ FS_FILED_PUT_E_TOTAL,
+ FS_FILED_PUT_E_HTTP,
+ FS_FILED_PUT_E_REDIS,
+ FS_FILED_DEL_RECV,
+ FS_FILED_DEL_SUCC,
+ FS_FILED_DEL_ERROR,
+ FS_FILED_TOTAL_DROP,
+
+ //Next use Status
+ FS_FILED_MEM_USED,
+ FS_FILED_SESS_HTTP,
+ FS_FILED_SESS_REDIS,
+
+ FS_FILED_NUM,
+};
+
enum CACHE_REQUEST_METHOD
{
CACHE_REQUEST_GET=0,
@@ -88,12 +120,25 @@ struct tango_cache_parameter
struct wiredlb_parameter minio;
char redisaddrs[4096];
u_int32_t redis_object_maxsize;//С�ļ�����redisʱ�����������С
+
+ //FieldStatLog
+ int32_t fsstat_dst_port;
+ char fsstat_dst_ip[64];
+ char fsstat_appname[16];
+ char fsstat_filepath[256];
+ u_int32_t fsstat_period;
+ u_int32_t fsstatid_trig;
+ char fsstat_histlen[256];
+ screen_stat_handle_t fsstat_handle;
+ int32_t fsstat_histlen_id;
+ int32_t fsstat_field_ids[FS_FILED_NUM];
};
struct tango_cache_instance
{
struct event_base* evbase;
struct event timer_event;
+ struct event timer_statistic;
CURLM *multi_hd;
enum CACHE_ERR_CODE error_code;
@@ -104,6 +149,7 @@ struct tango_cache_instance
const struct tango_cache_parameter *param;
void *runtime_log;
struct cache_statistics statistic;
+ struct cache_statistics statistic_last; //���ڶ��instanceʹ��ͬһ��fieldstat�ۼ�
};
struct multipart_etag_list
@@ -137,6 +183,7 @@ struct cache_ctx_data_put
char *combine_xml;
TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head;
cJSON *object_meta;
+ struct easy_string once_request; //һ����PUTʱ�洢�����ݣ�ʧ�ܵ�ʱ��������������ܸ��������ṹ
enum PUT_OBJECT_STATE state;
u_int32_t part_index; //��RESPONSE_HDR_
u_int32_t object_ttl;
diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp
index 183eb2d..7afa18c 100644
--- a/cache/src/tango_cache_transfer.cpp
+++ b/cache/src/tango_cache_transfer.cpp
@@ -65,24 +65,24 @@ static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *
size_t len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
- if(size==0 || count==0 || ctx->response.len>=ctx->response.size)
+ if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size)
{
return 0; //��һ������
}
- len = ctx->response.size - ctx->response.len; //ʣ����ϴ��ij���
+ len = ctx->put.once_request.size - ctx->put.once_request.len; //ʣ����ϴ��ij���
if(len > size * count)
{
len = size * count;
}
- memcpy(ptr, ctx->response.buff + ctx->response.len, len);
- ctx->response.len += len;
+ memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len);
+ ctx->put.once_request.len += len;
- if(ctx->response.len >= ctx->response.size)
+ if(ctx->put.once_request.len >= ctx->put.once_request.size)
{
- ctx->instance->statistic.memory_used -= ctx->response.size; //δʹ��cache buffer���Լ������ڴ�����
- easy_string_destroy(&ctx->response);
+ ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //δʹ��cache buffer���Լ������ڴ�����
+ easy_string_destroy(&ctx->put.once_request);
}
return len;
}
@@ -532,17 +532,17 @@ int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COP
if(way == PUT_MEM_COPY)
{
- ctx->response.buff = (char *)malloc(size);
- memcpy(ctx->response.buff, data, size);
+ ctx->put.once_request.buff = (char *)malloc(size);
+ memcpy(ctx->put.once_request.buff, data, size);
}
else
{
- ctx->response.buff = (char *)data;
+ ctx->put.once_request.buff = (char *)data;
}
- ctx->response.size = size;
- ctx->response.len = 0;
+ ctx->put.once_request.size = size;
+ ctx->put.once_request.len = 0;
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size);
+ curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
diff --git a/cache/test/cache_evbase_benchmark.cpp b/cache/test/cache_evbase_benchmark.cpp
index e3744f6..c5ce437 100644
--- a/cache/test/cache_evbase_benchmark.cpp
+++ b/cache/test/cache_evbase_benchmark.cpp
@@ -394,6 +394,7 @@ int main(int argc, char **argv)
pthread_attr_t attr;
void *runtime_log;
struct filecontentcmd filecmd;
+ time_t now, remain;
struct event ev_timer;
struct timeval tv;
@@ -431,7 +432,9 @@ int main(int argc, char **argv)
}
ev_base = event_base_new();
- tv.tv_sec = 10;
+ now = time(NULL);
+ remain = 10 - (now % 10);
+ tv.tv_sec = remain;
tv.tv_usec = 0;
evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer);
evtimer_add(&ev_timer, &tv);
diff --git a/cache/test/lib/libtango_cache_client.a b/cache/test/lib/libtango_cache_client.a
new file mode 100644
index 0000000..e395ce6
--- /dev/null
+++ b/cache/test/lib/libtango_cache_client.a
Binary files differ
diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf
index e3ef66c..21ac1f8 100644
--- a/cache/test/pangu_tg_cahce.conf
+++ b/cache/test/pangu_tg_cahce.conf
@@ -1,35 +1,42 @@
[TANGO_CACHE]
#Addresses of minio. Format is defined by WiredLB.
-MINIO_IP_LIST=10.3.35.1;
-MINIO_LISTEN_PORT=9000
+minio_ip_list=10.3.35.60-61;
+minio_listen_port=9000
#Maximum number of connections opened by per host.
-#MAX_CONNECTION_PER_HOST=1
+#max_connection_per_host=1
#Maximum number of requests in a pipeline.
-#MAX_CNNT_PIPELINE_NUM=20
+#max_cnnt_pipeline_num=20
#Maximum parellel sessions(http and redis) is allowed to open.
-#MAX_CURL_SESSION_NUM=100
+#max_curl_session_num=100
#Maximum time the request is allowed to take(seconds).
-#MAX_CURL_TRANSFER_TIMEOUT_S=0
+#max_curl_transfer_timeout_s=0
#Bucket name in minio.
-CACHE_BUCKET_NAME=openbucket
+cache_bucket_name=openbucket
#Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value.
-MAX_USED_MEMORY_SIZE_MB=5120
+max_used_memory_size_mb=5120
#Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute).
-CACHE_DEFAULT_TTL_SECOND=3600
+cache_default_ttl_second=3600
#Whether to hash the object key before cache actions. GET/PUT may be faster if you open it.
-CACHE_OBJECT_KEY_HASH_SWITCH=1
+cache_object_key_hash_switch=1
#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio;
-CACHE_STORE_OBJECT_WAY=2
-#If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis.
-REDIS_CACHE_OBJECT_SIZE=20480
-#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object.
-REDIS_CLUSTER_ADDRS=10.4.35.33:9001,10.4.35.34:9001
+cache_store_object_way=2
+#If cache_store_object_way is 2 and the size of a object is not bigger than this value, object will be stored in redis.
+redis_cache_object_size=20480
+#If cache_store_object_way is not 0, we will use redis to store meta and object.
+redis_cluster_ip_list=10.4.35.33-34;
+redis_cluster_port_range=9001-9016;
#Configs of WiredLB for Minios load balancer.
-#WIREDLB_OVERRIDE=1
-#WIREDLB_TOPIC=
-#WIREDLB_DATACENTER=
-WIREDLB_HEALTH_PORT=52101
-#WIREDLB_GROUP=
+#wiredlb_override=1
+#wiredlb_topic=
+#wiredlb_datacenter=
+wiredlb_health_port=52102
+#wiredlb_group=
+log_fsstat_appname=TANGO_CACHE
+log_fsstat_filepath=./field_stat.log
+log_fsstat_interval=10
+log_fsstat_trig=1
+log_fsstat_dst_ip=127.0.0.1
+log_fsstat_dst_port=8125
diff --git a/cache/test/tango_cache_test.cpp b/cache/test/tango_cache_test.cpp
index fb7f618..c00bd8c 100644
--- a/cache/test/tango_cache_test.cpp
+++ b/cache/test/tango_cache_test.cpp
@@ -329,7 +329,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
tango_cache_update_frag_data(ctx, buffer, n);
}
fclose(fp);
- if(tango_cache_update_end(ctx, pdata->filename, 256))
+ if(tango_cache_update_end(ctx, pdata->filename, 256) < 0)
{
put_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
}