summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author[email protected] <[email protected]>2021-07-22 10:25:42 +0800
committer[email protected] <[email protected]>2021-07-22 10:25:42 +0800
commit5217e9188e7208b6451d9b06484c759eb3e27f15 (patch)
tree80e155a62fdda4279a3cabc21013653d6d673f57
parent8b236b38cbe9c5184f117a869545a50038c9640b (diff)
支持多业务系统的配置并发进行同步
-rw-r--r--client/doris_client_fetch.cpp32
-rw-r--r--client/doris_client_fetch.h6
-rw-r--r--include/doris_client.h8
-rw-r--r--server/bin/conf/doris_client.conf35
-rw-r--r--server/bin/conf/doris_main.conf58
-rw-r--r--server/doris_server_http.cpp115
-rw-r--r--server/doris_server_main.cpp208
-rw-r--r--server/doris_server_main.h45
-rw-r--r--server/doris_server_receive.cpp225
-rw-r--r--server/doris_server_receive.h46
10 files changed, 452 insertions, 326 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp
index 28318c6..85f6857 100644
--- a/client/doris_client_fetch.cpp
+++ b/client/doris_client_fetch.cpp
@@ -137,8 +137,8 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
instance->retry_times = 0;
if(instance->curmeta.curoffset == 0)
{
- instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name,
- instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata);
+ instance->cbs.cfgfile_start(instance, instance->curmeta.table_name,
+ instance->curmeta.size, instance->curmeta.cfg_num, instance->cbs.userdata);
MD5_Init(&instance->ctx.md5ctx);
}
}
@@ -180,7 +180,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo
return;
}
- instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata);
+ instance->cbs.cfgfile_update(instance, ptr, bytes, instance->cbs.userdata);
MD5_Update(&instance->ctx.md5ctx, ptr, bytes);
instance->curmeta.curoffset += bytes;
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
@@ -228,14 +228,14 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo
}
else
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s",
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s",
instance->curmeta.table_name, instance->req_version, md5buffer);
}
instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1;
- instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata);
+ instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata);
if(instance->array_index == instance->array_size)
{
- instance->param->cbs.version_finish(instance, instance->param->cbs.userdata);
+ instance->cbs.version_finish(instance, instance->cbs.userdata);
instance->status = FETCH_STATUS_META;
doris_update_new_version(instance);
cJSON_Delete(instance->meta);
@@ -268,7 +268,7 @@ out_md5:
if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail)
{
instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1;
- instance->param->cbs.version_error(instance, instance->param->cbs.userdata);
+ instance->cbs.version_error(instance, instance->cbs.userdata);
instance->retry_times = 0;
instance->status = FETCH_STATUS_META;
cJSON_Delete(instance->meta);
@@ -296,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
doris_http_ctx_add_header(instance->ctx.httpctx, range);
}
- snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid);
+ snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.table_name, instance->req_version, instance->args.bizname);
if(doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
@@ -371,7 +371,7 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s",
instance->cur_version, instance->estr.buff);
- instance->param->cbs.version_start(instance, instance->meta, instance->param->cbs.userdata);
+ instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
instance->array_size = cJSON_GetArraySize(instance->array);
assert(instance->array_size > 0);
@@ -420,7 +420,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż����
if(instance->ctx.httpctx != NULL)
{
- snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid);
+ snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname);
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->status = FETCH_STATUS_META;
@@ -530,15 +530,12 @@ static int doris_client_register_field_stat(struct doris_parameter *param, void
return 0;
}
-struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs,
- struct doris_arguments *args, void *runtimelog)
+struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
{
struct doris_parameter *param;
param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter));
param->manage_evbase = manage_evbase;
- param->cbs = *cbs;
- param->args= *args;
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", &param->retry_interval, 10);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", &param->fetch_frag_size, 5242880);
@@ -599,7 +596,8 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
event_add(&instance->timer_statistic, &tv);
}
-struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog)
+struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase,
+ struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog)
{
struct doris_instance *instance;
struct timeval tv;
@@ -608,7 +606,9 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct
instance->param = param;
instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog;
- instance->cur_version = param->args.current_version;
+ instance->cbs = *cbs;
+ instance->args= *args;
+ instance->cur_version = args->current_version;
instance->req_version = instance->cur_version + 1; //TODO
instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog);
diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h
index 7125540..66afbdf 100644
--- a/client/doris_client_fetch.h
+++ b/client/doris_client_fetch.h
@@ -23,9 +23,6 @@ enum FETCH_CFG_STATUS
struct doris_parameter
{
- struct doris_callbacks cbs;
- struct doris_arguments args;
-
u_int32_t retry_interval;
u_int32_t fetch_frag_size;
u_int32_t fetch_max_tries;
@@ -82,6 +79,9 @@ struct doris_confile_ctx
struct doris_instance
{
+ struct doris_callbacks cbs;
+ struct doris_arguments args;
+
enum FETCH_CFG_STATUS status;
u_int32_t retry_times;
diff --git a/include/doris_client.h b/include/doris_client.h
index b994e11..99b2a3e 100644
--- a/include/doris_client.h
+++ b/include/doris_client.h
@@ -45,8 +45,8 @@ struct doris_statistics
struct doris_arguments
{
+ char bizname[32];
int64_t current_version; //��ǰ�ѻ�ȡ��ϵ����°汾�ţ���������һ���汾ȡ����
- int32_t businessid;
int32_t judian_id;
};
@@ -62,9 +62,9 @@ struct doris_callbacks
void (*version_finish)(struct doris_instance *instance, void *userdata);
};
-struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs,
- struct doris_arguments *args, void *runtimelog);
-struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog);
+struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog);
+struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase,
+ struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog);
#endif
diff --git a/server/bin/conf/doris_client.conf b/server/bin/conf/doris_client.conf
new file mode 100644
index 0000000..062aeeb
--- /dev/null
+++ b/server/bin/conf/doris_client.conf
@@ -0,0 +1,35 @@
+[DORIS_CLIENT]
+fetch_fail_retry_interval=5
+fetch_fragmet_size=5242880
+fetch_confile_max_tries=3
+
+fsstat_log_appname=DorisClient
+fsstat_log_filepath=./log/doris_client.fs
+fsstat_log_interval=2
+fsstat_log_print_mode=1
+fsstat_log_dst_ip=192.168.10.90
+fsstat_log_dst_port=8125
+
+[DORIS_CLIENT.master_server]
+max_connection_per_host=1
+max_cnnt_pipeline_num=10
+https_connection_on=0
+max_curl_session_num=10
+
+http_server_listen_port=9897
+http_server_manage_port=9897
+http_server_ip_list=192.168.10.8
+
+[DORIS_CLIENT.backup1_server]
+max_connection_per_host=1
+max_cnnt_pipeline_num=10
+https_connection_on=0
+max_curl_session_num=10
+
+http_server_listen_port=9897
+http_server_manage_port=9897
+http_server_ip_list=192.168.11.241
+
+[DORIS_CLIENT.backup2_server]
+
+
diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf
index d0ea133..518d694 100644
--- a/server/bin/conf/doris_main.conf
+++ b/server/bin/conf/doris_main.conf
@@ -3,13 +3,9 @@ worker_thread_num=2
server_listen_port=9898
manage_listen_port=2233
https_connection_on=1
+cache_file_frag_size=100
-#1-Doris client; 2-local file
-receive_config_way=2
-cache_file_frag_size=67108864
-store_config_path=./doris_store_path
-receive_config_path_full=./doris_receive_path/full/index
-receive_config_path_inc=./doris_receive_path/inc/index
+business_system_list=T1_1;VoIP
run_log_dir=./log
run_log_lv=20
@@ -20,40 +16,20 @@ fsstat_log_print_mode=1
fsstat_log_dst_ip=192.168.10.90
fsstat_log_dst_port=8125
+[T1_1]
+#1-Doris client; 2-local file
+receive_config_way=2
+grafana_monitor_status_id=3
+store_config_path=./doris_store_t1
+receive_config_path_full=./doris_receive_t1/full/index
+receive_config_path_inc=./doris_receive_t1/inc/index
+#doris_client_confile=./conf/doris_client.conf
-
-[DORIS_CLIENT]
-fetch_fail_retry_interval=5
-fetch_fragmet_size=5242880
-fetch_confile_max_tries=3
-
-fsstat_log_appname=DorisClient
-fsstat_log_filepath=./log/doris_client.fs
-fsstat_log_interval=2
-fsstat_log_print_mode=1
-fsstat_log_dst_ip=192.168.10.90
-fsstat_log_dst_port=8125
-
-[DORIS_CLIENT.master_server]
-max_connection_per_host=1
-max_cnnt_pipeline_num=10
-https_connection_on=1
-max_curl_session_num=10
-
-http_server_listen_port=9897
-http_server_manage_port=9897
-http_server_ip_list=192.168.10.8
-
-[DORIS_CLIENT.backup1_server]
-max_connection_per_host=1
-max_cnnt_pipeline_num=10
-https_connection_on=0
-max_curl_session_num=10
-
-http_server_listen_port=9897
-http_server_manage_port=9897
-http_server_ip_list=192.168.11.241
-
-[DORIS_CLIENT.backup2_server]
-
+[VoIP]
+receive_config_way=2
+grafana_monitor_status_id=4
+store_config_path=./doris_store_voip
+receive_config_path_full=./doris_receive_voip/full/index
+receive_config_path_inc=./doris_receive_voip/inc/index
+#doris_client_confile=./conf/doris_client.conf
diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp
index 1cff399..aa81c7a 100644
--- a/server/doris_server_http.cpp
+++ b/server/doris_server_http.cpp
@@ -73,47 +73,63 @@ int doris_create_listen_socket(int bind_port)
void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
{
- struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg;
struct evkeyvalq params;
- const char *version;
+ const char *version, *bizname;
int64_t verlong;
char *endptr=NULL, length[64];
struct version_list_node *vernode;
struct evbuffer *evbuf;
+ struct doris_business *business;
+ map<string, struct doris_business*>::iterator iter;
- statistic->statistic.field[DRS_FSSTAT_CLIENT_META_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_META_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
if(NULL == (version = evhttp_find_header(&params, "version")))
{
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found");
return;
}
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return;
}
+ if(NULL == (bizname = evhttp_find_header(&params, "business")))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
+ return;
+ }
+ if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
+ return;
+ }
evhttp_clear_headers(&params);
-
- pthread_rwlock_rdlock(&g_doris_server_info.rwlock);
- if(verlong > g_doris_server_info.cfgver_head->latest_version)
+ business = iter->second;
+
+ pthread_rwlock_rdlock(&business->rwlock);
+ if(verlong > business->cfgver_head->latest_version)
{
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
- statistic->statistic.field[DRS_FSSTAT_SEND_META_NONEW] += 1;
+ pthread_rwlock_unlock(&business->rwlock);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found");
return;
}
- vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head);
+ vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong)
{
vernode = TAILQ_NEXT(vernode, version_node);
@@ -121,9 +137,9 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evbuf = evbuffer_new();
evbuffer_add(evbuf, vernode->metacont, vernode->metalen);
sprintf(length, "%u", vernode->metalen);
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
+ pthread_rwlock_unlock(&business->rwlock);
- statistic->statistic.field[DRS_FSSTAT_SEND_META_RES] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_META_RES], FS_OP_ADD, 1);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/json");
evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
@@ -131,8 +147,8 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evbuffer_free(evbuf);
}
-void doris_response_file_range(struct evhttp_request *req, const char *tablename,
- int64_t verlong, size_t start, size_t end, bool range, struct worker_statistic_info *statistic)
+void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename,
+ int64_t verlong, size_t start, size_t end, bool range)
{
struct version_list_node *vernode;
struct table_list_node *tablenode;
@@ -140,16 +156,26 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
struct evbuffer *evbuf;
char length[128];
size_t filesize, res_length=0, copy_len, offset=start;
+ struct doris_business *business;
+ map<string, struct doris_business*>::iterator iter;
- pthread_rwlock_rdlock(&g_doris_server_info.rwlock);
- if(verlong > g_doris_server_info.cfgver_head->latest_version)
+ if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
{
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
- statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
+ return;
+ }
+ business = iter->second;
+
+ pthread_rwlock_rdlock(&business->rwlock);
+ if(verlong > business->cfgver_head->latest_version)
+ {
+ pthread_rwlock_unlock(&business->rwlock);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "Version too old");
return;
}
- vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head);
+ vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong)
{
vernode = TAILQ_NEXT(vernode, version_node);
@@ -161,8 +187,8 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
}
if(tablenode==NULL || start>tablenode->filesize)
{
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
- statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1;
+ pthread_rwlock_unlock(&business->rwlock);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "No valid content found");
return;
}
@@ -183,12 +209,12 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
offset += copy_len;
res_length += copy_len;
}
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
+ pthread_rwlock_unlock(&business->rwlock);
assert(res_length == end + 1 - start);
sprintf(length, "%lu", res_length);
- statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES] += 1;
- statistic->statistic.field[DRS_FSSTAT_SEND_FILE_BYTES] += res_length;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_FILE_RES], FS_OP_ADD, 1);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_BYTES], 0, FS_OP_ADD, res_length);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
if(range)
{
@@ -203,31 +229,30 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
{
- struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg;
struct evkeyvalq params;
- const char *version, *tablename, *content_range;
+ const char *version, *tablename, *content_range, *bizname;
int64_t verlong;
char *endptr=NULL;
size_t req_start=0, req_end=0;
- statistic->statistic.field[DRS_FSSTAT_CLIENT_FILE_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_FILE_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
if(NULL==(version=evhttp_find_header(&params, "version")) || NULL==(tablename=evhttp_find_header(&params, "tablename")))
{
evhttp_clear_headers(&params);
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found");
return;
}
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{
evhttp_clear_headers(&params);
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return;
}
@@ -235,12 +260,19 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1)
{
evhttp_clear_headers(&params);
- statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid");
return;
}
+ if(NULL == (bizname = evhttp_find_header(&params, "business")))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
+ return;
+ }
- doris_response_file_range(req, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true, statistic);
+ doris_response_file_range(req, bizname, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true);
evhttp_clear_headers(&params);
}
@@ -350,14 +382,10 @@ void* thread_doris_http_server(void *arg)
{
struct event_base *worker_evbase;
struct evhttp *worker_http;
- struct worker_statistic_info statistic;
- struct timeval tv;
prctl(PR_SET_NAME, "http_server");
- memset(&statistic, 0, sizeof(struct worker_statistic_info));
worker_evbase = event_base_new();
-
worker_http = evhttp_new(worker_evbase);
if(g_doris_server_info.ssl_conn_on)
@@ -370,9 +398,9 @@ void* thread_doris_http_server(void *arg)
evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance);
}
- evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, &statistic);
- evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, &statistic);
- evhttp_set_gencb(worker_http, doris_http_server_generic_cb, &statistic);
+ evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, NULL);
+ evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, NULL);
+ evhttp_set_gencb(worker_http, doris_http_server_generic_cb, NULL);
evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD);
if(evhttp_accept_socket(worker_http, g_doris_server_info.listener))
@@ -381,11 +409,6 @@ void* thread_doris_http_server(void *arg)
assert(0); return NULL;
}
- evtimer_assign(&statistic.timer_statistic, worker_evbase, doris_worker_statistic_timer_cb, &statistic);
- tv.tv_sec = g_doris_server_info.fsstat_period;
- tv.tv_usec = 0;
- evtimer_add(&statistic.timer_statistic, &tv);
-
event_base_dispatch(worker_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp
index 6cb8462..64fa952 100644
--- a/server/doris_server_main.cpp
+++ b/server/doris_server_main.cpp
@@ -17,7 +17,7 @@
#include "doris_server_http.h"
struct doris_global_info g_doris_server_info;
-static unsigned long doris_vesion_20210719=20210719L;
+static unsigned long doris_vesion_20210722=20210722L;
int doris_mkdir_according_path(const char * path)
{
@@ -54,9 +54,24 @@ int doris_mkdir_according_path(const char * path)
return 0;
}
+static int doris_chech_name_valid(const char *name)
+{
+ size_t i, namelen=strlen(name);
+
+ for(i=0; i<namelen; i++)
+ {
+ if(!((name[i]>='a' && name[i]<='z')||(name[i]>='A' && name[i]<='Z') ||
+ (name[i]>='0' && name[i]<='9') || name[i]=='_' || name[i]==':'))
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
int32_t doris_read_profile_configs(const char *config_file)
{
- char tmp_buf[4096], tmp_dir[512], tmp_dir2[512];
+ char tmp_buf[4096], tmp_dir[512];
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "run_log_dir", g_doris_server_info.root_log_dir, sizeof(g_doris_server_info.root_log_dir), "./log");
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "run_log_lv", &g_doris_server_info.log_level, 10);
@@ -86,37 +101,8 @@ int32_t doris_read_profile_configs(const char *config_file)
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
- MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_write_file_on", &g_doris_server_info.write_file_sw, 1);
-
- if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "store_config_path", g_doris_server_info.store_path_root, sizeof(g_doris_server_info.store_path_root)))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]store_config_path not found!", config_file);
- assert(0);return -1;
- }
- snprintf(tmp_dir, 512, "%s/full/index", g_doris_server_info.store_path_root);
- snprintf(tmp_dir2,512, "%s/inc/index", g_doris_server_info.store_path_root);
- if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
- return -1;
- }
-
- MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "receive_config_way", &g_doris_server_info.recv_way, RECV_WAY_DRS_CLIENT);
- if(g_doris_server_info.recv_way == RECV_WAY_IDX_FILE)
- {
- if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_full", g_doris_server_info.recv_path_full, sizeof(g_doris_server_info.recv_path_full)))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file);
- assert(0);return -1;
- }
- if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_inc", g_doris_server_info.recv_path_inc, sizeof(g_doris_server_info.recv_path_inc)))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file);
- assert(0);return -1;
- }
- MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
- }
+ MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
if(g_doris_server_info.ssl_conn_on)
{
@@ -143,9 +129,11 @@ int32_t doris_read_profile_configs(const char *config_file)
static int doris_server_register_field_stat(struct doris_global_info *param)
{
- const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvFullVer", "RecvIncVer", "RecvErrVer", "FileStarts", "FileComplete",
- "ClientInvReq", "ClientMetaReq", "SendResMeta", "SendNoNewMeta", "ClientFileReq", "SendFiles", "SendBytes", "SendFile404"};
- const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed", "CurFullVer", "CurIncVer", "TotalCfgNum"};
+ const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq",
+ "ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendBytes", "SendFile404"};
+ const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed"};
+ const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "SendResMeta", "SendFiles",
+ "CurFullVer", "CurIncVer", "TotalCfgNum"};
int value;
param->fsstat_handle = FS_create_handle();
@@ -176,6 +164,10 @@ static int doris_server_register_field_stat(struct doris_global_info *param)
{
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
}
+ for(int i=0; i<DRS_FSSTAT_CLUMN_NUM; i++)
+ {
+ param->fsstat_column[i] = FS_register(param->fsstat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, column_names[i]);
+ }
FS_start(param->fsstat_handle);
return 0;
}
@@ -191,6 +183,96 @@ static void instance_fsstat_output_timer_cb(int fd, short kind, void *userp)
event_add((struct event*)userp, &tv);
}
+static int32_t doris_init_config_for_business(struct doris_global_info *info, struct event_base *manage_evbase, const char *config_file)
+{
+ char tmpbuffer[4096], tmp_dir[256], tmp_dir2[256], *bizname, *save=NULL;
+ struct doris_business *business;
+ map<string, struct doris_parameter *>::iterator iter;
+
+ if(0>=MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "business_system_list", tmpbuffer, sizeof(tmpbuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list not found!", config_file);
+ assert(0);return -1;
+ }
+ for(bizname=strtok_r(tmpbuffer, ";", &save); bizname!=NULL; bizname=strtok_r(NULL, ";", &save))
+ {
+ if(!doris_chech_name_valid(bizname))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list bizname %s invalid, name must match: [a-zA-Z_:][a-zA-Z0-9_:]", bizname, config_file);
+ assert(0);return -1;
+ }
+ business = &info->business[info->business_num++];
+ snprintf(business->bizname, sizeof(business->bizname), "%s", bizname);
+ pthread_rwlock_init(&business->rwlock, NULL);
+
+ business->cfgver_head = config_version_handle_new();
+ if(info->name2business->find(string(business->bizname)) != info->name2business->end())
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]business_system_list duplicate system name: %s!", bizname, config_file, business->bizname);
+ assert(0);return -1;
+ }
+ info->name2business->insert(make_pair(string(business->bizname), business));
+
+ MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3);
+ MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1);
+ if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file);
+ assert(0);return -1;
+ }
+ snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root);
+ snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root);
+ if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
+ return -1;
+ }
+
+ MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT);
+ if(business->recv_way == RECV_WAY_DRS_CLIENT)
+ {
+ if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir)))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file);
+ assert(0);return -1;
+ }
+ if((iter=info->confile2param->find(string(tmp_dir))) != info->confile2param->end())
+ {
+ business->param = iter->second;
+ }
+ else
+ {
+ business->param = doris_parameter_new(tmp_dir, manage_evbase, info->log_runtime);
+ if(business->param == NULL)
+ {
+ assert(0);return -2;
+ }
+ info->confile2param->insert(make_pair(string(tmp_dir), business->param));
+ }
+ }
+ else
+ {
+ if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
+ assert(0);return -1;
+ }
+ if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
+ assert(0);return -1;
+ }
+ }
+ business->fs_lineid = FS_register(info->fsstat_handle, FS_STYLE_LINE, FS_CALC_CURRENT, business->bizname);;
+
+ snprintf(tmp_dir, 512, "latest_cfg_version_%s", business->bizname);
+ business->mm_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version.");
+ snprintf(tmp_dir, 512, "total_config_num_%s", business->bizname);
+ business->mm_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
+ }
+ return 0;
+}
+
static void manager_statistic_threads_requests_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
@@ -224,30 +306,11 @@ int main(int argc, char **argv)
{
return -1;
}
- g_doris_server_info.cfgver_head = config_version_handle_new();
- pthread_rwlock_init(&g_doris_server_info.rwlock, NULL);
evthread_use_pthreads();
+ g_doris_server_info.name2business = new map<string, struct doris_business*>;
+ g_doris_server_info.confile2param = new map<string, struct doris_parameter *>;
manage_evbase = event_base_new();
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if(g_doris_server_info.recv_way == RECV_WAY_DRS_CLIENT)
- {
- if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, manage_evbase))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
- assert(0);return -4;
- }
- }
- else
- {
- if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, NULL))
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
- assert(0);return -4;
- }
- }
-
/*Doris manager server*/
g_doris_server_info.manager = doris_create_listen_socket(g_doris_server_info.manager_port);
if(g_doris_server_info.manager < 0)
@@ -263,10 +326,7 @@ int main(int argc, char **argv)
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
- g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210719);
- g_doris_server_info.mm_latest_ver = MESA_Monitor_register(g_doris_server_info.monitor, "latest_cfg_version", MONITOR_METRICS_GAUGE, "Latest doris config version.");
- g_doris_server_info.mm_total_cfgnum = MESA_Monitor_register(g_doris_server_info.monitor, "total_config_num", MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
-
+ g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210722);
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
{
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
@@ -274,6 +334,34 @@ int main(int argc, char **argv)
assert(0); return -7;
}
+ //Ϊÿ��ҵ��ϵͳ��ʼ����ȡ���õĽṹ
+ if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE))
+ {
+ return -8;
+ }
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ for(u_int32_t i=0; i<g_doris_server_info.business_num; i++)
+ {
+ if(g_doris_server_info.business[i].recv_way == RECV_WAY_DRS_CLIENT)
+ {
+ if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, &g_doris_server_info.business[i]))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -4;
+ }
+ }
+ else
+ {
+ if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i]))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -4;
+ }
+ }
+ }
+
/*Doris http server*/
if(g_doris_server_info.server_role_sw)
{
diff --git a/server/doris_server_main.h b/server/doris_server_main.h
index f91694b..5678097 100644
--- a/server/doris_server_main.h
+++ b/server/doris_server_main.h
@@ -18,8 +18,14 @@
#include "MESA_Monitor.h"
+#include "doris_client.h"
#include "doris_server_receive.h"
+#include <map>
+#include <string>
+
+using namespace std;
+
#ifndef __FILENAME__
#define __FILENAME__ __FILE__
#endif
@@ -27,9 +33,30 @@
MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args)
#define NIRVANA_CONFIG_FILE "./conf/doris_main.conf"
+#define MAX_BUSINESS_NUM 64
#define RECV_WAY_DRS_CLIENT 1
#define RECV_WAY_IDX_FILE 2
+#define RECV_WAY_HTTP_POST 3
+
+struct doris_business
+{
+ char bizname[32];
+ u_int32_t recv_way;
+ u_int32_t write_file_sw;
+ char recv_path_full[256];
+ char recv_path_inc[256];
+ char store_path_root[256];
+ struct version_list_handle *cfgver_head;
+ struct doris_parameter *param;
+
+ int64_t total_cfgnum;
+ int32_t mm_latest_ver;
+ int32_t mm_total_cfgnum;
+ u_int32_t mm_status_codeid; //MM�ڲ��쳣״̬id
+ u_int32_t fs_lineid;
+ pthread_rwlock_t rwlock;
+};
struct doris_global_info
{
@@ -38,15 +65,9 @@ struct doris_global_info
int32_t manager_port;
int32_t sock_recv_bufsize;
u_int32_t ssl_conn_on;
- u_int32_t recv_way;
- char recv_path_full[256];
- char recv_path_inc[256];
- char store_path_root[256];
- char store_path_inc[256];
u_int32_t scan_idx_interval;
u_int32_t cache_frag_size;
u_int32_t server_role_sw;
- u_int32_t write_file_sw;
char ssl_CA_path[256];
char ssl_cert_file[256];
@@ -55,15 +76,16 @@ struct doris_global_info
pthread_mutex_t *lock_cs;
SSL_CTX *ssl_instance;
- struct version_list_handle *cfgver_head;
evutil_socket_t listener;
evutil_socket_t manager;
- pthread_rwlock_t rwlock;
+
+ struct doris_business business[MAX_BUSINESS_NUM];
+ u_int32_t business_num;
+ map<string, struct doris_business*> *name2business;
+ map<string, struct doris_parameter *> *confile2param;
struct MESA_MonitorHandler *monitor;
- int32_t mm_latest_ver;
- int32_t mm_total_cfgnum;
-
+
/*logs*/
u_int32_t log_level;
u_int32_t statistic_period;
@@ -79,6 +101,7 @@ struct doris_global_info
int32_t fsstat_dst_port;
int32_t fsstat_field[DRS_FSSTAT_FIELD_NUM];
int32_t fsstat_status[DRS_FSSTAT_STATUS_NUM];
+ int32_t fsstat_column[DRS_FSSTAT_CLUMN_NUM];
};
int doris_mkdir_according_path(const char * path);
diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp
index 3538f4f..594a6a1 100644
--- a/server/doris_server_receive.cpp
+++ b/server/doris_server_receive.cpp
@@ -16,6 +16,7 @@
struct scanner_timer_priv
{
+ struct doris_business *business;
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
@@ -24,45 +25,16 @@ struct scanner_timer_priv
extern struct doris_global_info g_doris_server_info;
-
-void doris_worker_statistic_timer_cb(int fd, short kind, void *userp)
-{
- struct worker_statistic_info *statistic = (struct worker_statistic_info *)userp;
- struct timeval tv;
- struct doris_srv_statistics incr_statistic;
- long long *plast_statistic = (long long*)&statistic->statistic_last;
- long long *pnow_statistic = (long long*)&statistic->statistic;
- long long *pinc_statistic = (long long*)&incr_statistic;
-
- for(u_int32_t i=0; i<sizeof(struct doris_srv_statistics)/sizeof(long long); i++)
- {
- pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
- }
- statistic->statistic_last = statistic->statistic;
-
- for(u_int32_t i=0; i<DRS_FSSTAT_FIELD_NUM; i++)
- {
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
- }
- for(u_int32_t i=0; i<DRS_FSSTAT_STATUS_NUM; i++)
- {
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
- }
- tv.tv_sec = g_doris_server_info.fsstat_period;
- tv.tv_usec = 0;
- event_add(&statistic->timer_statistic, &tv);
-}
-
-void config_frag_node_cleanup(struct confile_save *save, struct cont_frag_node *fragnode)
+void config_frag_node_cleanup(struct cont_frag_node *fragnode)
{
if(fragnode == NULL) return;
- save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] -= fragnode->totalsize;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_SUB, fragnode->totalsize);
free(fragnode->content);
free(fragnode);
}
-void config_table_node_cleanup(struct confile_save *save, struct table_list_node *table_node)
+void config_table_node_cleanup(struct table_list_node *table_node)
{
struct cont_frag_node *fragnode;
@@ -71,12 +43,12 @@ void config_table_node_cleanup(struct confile_save *save, struct table_list_node
while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head)))
{
TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node);
- config_frag_node_cleanup(save, fragnode);
+ config_frag_node_cleanup(fragnode);
}
free(table_node);
}
-void config_version_node_cleanup(struct confile_save *save, struct version_list_node *vernode)
+void config_version_node_cleanup(struct version_list_node *vernode)
{
struct table_list_node *tablenode;
@@ -85,7 +57,7 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_
while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head)))
{
TAILQ_REMOVE(&vernode->table_head, tablenode, table_node);
- config_table_node_cleanup(save, tablenode);
+ config_table_node_cleanup(tablenode);
}
free(vernode->metacont);
cJSON_Delete(vernode->metajson);
@@ -94,14 +66,14 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_
free(vernode);
}
-void config_version_handle_cleanup(struct confile_save *save, struct version_list_handle *version)
+void config_version_handle_cleanup(struct version_list_handle *version)
{
struct version_list_node *vernode;
while(NULL != (vernode = TAILQ_FIRST(&version->version_head)))
{
TAILQ_REMOVE(&version->version_head, vernode, version_node);
- config_version_node_cleanup(save, vernode);
+ config_version_node_cleanup(vernode);
}
free(version);
}
@@ -134,7 +106,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
doris_common_timer_start(&delay_event->timer_event);
return;
}
- config_version_handle_cleanup(delay_event->save, handle);
+ config_version_handle_cleanup(handle);
free(delay_event);
}
@@ -144,7 +116,6 @@ static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_
delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event));
delay_event->data = version;
- delay_event->save = save;
evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event);
doris_common_timer_start(&delay_event->timer_event);
}
@@ -156,16 +127,20 @@ void doris_config_file_version_start(struct doris_instance *instance, cJSON *met
if(save->type == CFG_UPDATE_TYPE_FULL)
{
- snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
- snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version);
- snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
+ snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", save->business->store_path_root, save->version);
+ snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
+ snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", save->business->store_path_root, save->version);
}
else
{
- snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
- snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version);
+ snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", save->business->store_path_root, save->version);
+ snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
+ }
+ if(NULL==(save->fp_idx_file = fopen(save->tmp_index_path, "w+")))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->tmp_index_path, strerror(errno));
+ assert(0);
}
- save->fp_idx_file = fopen(save->tmp_index_path, "w+");
}
void doris_config_file_version_finish(struct doris_instance *instance, void *userdata)
@@ -175,18 +150,18 @@ void doris_config_file_version_finish(struct doris_instance *instance, void *use
fclose(save->fp_idx_file);
if(rename(save->tmp_index_path, save->inc_index_path))
{
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
}
if(save->type == CFG_UPDATE_TYPE_FULL)
{
if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //����Ӳ����
{
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
}
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu write finished, index file: %s", save->version, save->inc_index_path);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", save->business->bizname, save->version, save->inc_index_path);
}
void doris_config_file_version_error(struct doris_instance *instance, void *userdata)
@@ -203,7 +178,7 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user
fclose(save->fp_cfg_file);
remove(save->cfg_file_path);
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %llu error, rolling back...", save->version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
}
void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
@@ -217,7 +192,7 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char
type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
now = time(NULL);
localtm = localtime_r(&now, &savetime);
- snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", g_doris_server_info.store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
+ snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", save->business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
if(access(dir, F_OK))
{
doris_mkdir_according_path(dir);
@@ -225,8 +200,15 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char
snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version);
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path);
- save->fp_cfg_file = fopen(save->cfg_file_path, "w+");
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s start writing...", save->cfg_file_path);
+ if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
+ assert(0);
+ }
+ else
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", save->business->bizname, save->cfg_file_path);
+ }
}
@@ -236,7 +218,11 @@ void doris_config_file_cfgfile_update(struct doris_instance *instance, const cha
size_t writen_len;
writen_len = fwrite(data, 1, len, save->fp_cfg_file);
- assert(writen_len==len);
+ if(writen_len != len)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
+ assert(0);
+ }
}
void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata)
@@ -244,7 +230,7 @@ void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *use
struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_cfg_file);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s write finished", save->cfg_file_path);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", save->business->bizname, save->cfg_file_path);
}
/*memϵ�к������������ڴ�*/
@@ -277,24 +263,26 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
cJSON_Delete(save->cur_vernode->metajson);
save->cur_vernode->metajson = NULL;
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %lu mem finished, info: %s", save->version, save->cur_vernode->metacont);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", save->business->bizname, save->version, save->cur_vernode->metacont);
- if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && g_doris_server_info.cfgver_head->latest_version!=0)
+ if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && save->business->cfgver_head->latest_version!=0)
{
cur_version = config_version_handle_new();
cur_version->latest_version = save->cur_vernode->version;
TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
- pthread_rwlock_wrlock(&g_doris_server_info.rwlock);
- tmplist = g_doris_server_info.cfgver_head;
- g_doris_server_info.cfgver_head = cur_version;
- pthread_rwlock_unlock(&g_doris_server_info.rwlock);
+ pthread_rwlock_wrlock(&save->business->rwlock);
+ tmplist = save->business->cfgver_head;
+ save->business->cfgver_head = cur_version;
+ pthread_rwlock_unlock(&save->business->rwlock);
cfgver_handle_delay_destroy(save, save->evbase, tmplist);
}
else
{
- TAILQ_INSERT_TAIL(&g_doris_server_info.cfgver_head->version_head, save->cur_vernode, version_node);
- g_doris_server_info.cfgver_head->latest_version = save->cur_vernode->version;
+ pthread_rwlock_wrlock(&save->business->rwlock);
+ TAILQ_INSERT_TAIL(&save->business->cfgver_head->version_head, save->cur_vernode, version_node);
+ save->business->cfgver_head->latest_version = save->cur_vernode->version;
+ pthread_rwlock_unlock(&save->business->rwlock);
}
save->cur_vernode = NULL;
}
@@ -303,9 +291,9 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd
{
struct confile_save *save=(struct confile_save *)userdata;
- config_frag_node_cleanup(save, save->cur_frag);
- config_table_node_cleanup(save, save->cur_table);
- config_version_node_cleanup(save, save->cur_vernode);
+ config_frag_node_cleanup(save->cur_frag);
+ config_table_node_cleanup(save->cur_table);
+ config_version_node_cleanup(save->cur_vernode);
save->cur_frag = NULL;
save->cur_table = NULL;
save->cur_vernode = NULL;
@@ -325,7 +313,7 @@ void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char
save->cur_table->filesize = size;
TAILQ_INIT(&save->cur_table->frag_head);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu start loading to memory...", tablename, save->version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", save->business->bizname, tablename, save->version);
}
void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
@@ -333,7 +321,7 @@ void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char
struct confile_save *save=(struct confile_save *)userdata;
size_t cache_len, offset=0;
- save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] += len;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len);
while(len > 0)
{
if(save->cur_frag == NULL)
@@ -389,7 +377,7 @@ void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char
}
assert(save->cur_table->cur_totallen == save->cur_table->filesize);
TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu load to memory finished", save->cur_table->tablename, save->version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", save->business->bizname, save->cur_table->tablename, save->version);
save->cur_table = NULL;
}
@@ -404,47 +392,49 @@ void doris_config_common_version_start(struct confile_save *save, cJSON *meta)
save->type = sub->valueint;
assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC);
save->version_cfgnum = 0;
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu start updating...", save->version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", save->business->bizname, save->version);
}
void doris_config_common_version_finish(struct confile_save *save)
{
if(save->type == CFG_UPDATE_TYPE_FULL)
{
- save->statistic.statistic.status[DRS_FSSTAT_CUR_FULL_VERSION] = save->version;
- save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] = save->version_cfgnum;
- save->statistic.statistic.field[DRS_FSSTAT_RECV_FULL_VER] += 1;
+ save->business->total_cfgnum = save->version_cfgnum;
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, save->version);
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, save->version_cfgnum);
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1);
}
else
{
- save->statistic.statistic.status[DRS_FSSTAT_CUR_INC_VERSION] = save->version;
- save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] += save->version_cfgnum;
- save->statistic.statistic.field[DRS_FSSTAT_RECV_INC_VER] += 1;
+ save->business->total_cfgnum += save->version_cfgnum;
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, save->version);
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, save->version_cfgnum);
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1);
}
- MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_latest_ver, MONITOR_VALUE_SET, save->version);
- MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_total_cfgnum, MONITOR_VALUE_SET,
- save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM]);
- MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, MONITOR_STATUS_VERSION_ERR, NULL, NULL);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu update finished", save->version);
+ MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_latest_ver, MONITOR_VALUE_SET, save->version);
+ MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_total_cfgnum, MONITOR_VALUE_SET, save->business->total_cfgnum);
+ MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, save->business->mm_status_codeid, NULL, NULL);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", save->business->bizname, save->version);
}
void doris_config_common_version_error(struct confile_save *save)
{
- save->statistic.statistic.field[DRS_FSSTAT_RECV_ERR_VER] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1);
//Grafana+Promethues��չʾ�ڲ��쳣״̬
- MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, MONITOR_STATUS_VERSION_ERR,
+ MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid,
"Version receive error", "Receive config file error, please check producer");
}
void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum)
{
save->version_cfgnum += cfgnum;
- save->statistic.statistic.field[DRS_FSSTAT_RECV_START_FILES] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1);
}
void doris_config_common_cfgfile_finish(struct confile_save *save)
{
- save->statistic.statistic.field[DRS_FSSTAT_RECV_CMPLT_FILES] += 1;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1);
+ FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1);
}
/*localmemϵ�к���������ʱ�ӱ��ػ�����Ļص�*/
@@ -504,8 +494,10 @@ void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const
/*�ޱ��ϵ�к�������������ʱ�ص�*/
void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
+ struct confile_save *save=(struct confile_save *)userdata;
+
doris_config_common_version_start((struct confile_save *)userdata, meta);
- if(g_doris_server_info.write_file_sw)
+ if(save->business->write_file_sw)
{
doris_config_file_version_start(instance, meta, userdata);
}
@@ -517,7 +509,9 @@ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, vo
void doris_config_version_finish(struct doris_instance *instance, void *userdata)
{
- if(g_doris_server_info.write_file_sw)
+ struct confile_save *save=(struct confile_save *)userdata;
+
+ if(save->business->write_file_sw)
{
doris_config_file_version_finish(instance, userdata);
}
@@ -530,8 +524,10 @@ void doris_config_version_finish(struct doris_instance *instance, void *userdata
void doris_config_version_error(struct doris_instance *instance, void *userdata)
{
+ struct confile_save *save=(struct confile_save *)userdata;
+
doris_config_common_version_error((struct confile_save *)userdata);
- if(g_doris_server_info.write_file_sw)
+ if(save->business->write_file_sw)
{
doris_config_file_version_error(instance, userdata);
}
@@ -543,8 +539,10 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata)
void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
+ struct confile_save *save=(struct confile_save *)userdata;
+
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
- if(g_doris_server_info.write_file_sw)
+ if(save->business->write_file_sw)
{
doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata);
}
@@ -556,7 +554,9 @@ void doris_config_cfgfile_start(struct doris_instance *instance, const char *tab
void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
- if(g_doris_server_info.write_file_sw)
+ struct confile_save *save=(struct confile_save *)userdata;
+
+ if(save->business->write_file_sw)
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
}
@@ -568,8 +568,10 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da
void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
+ struct confile_save *save=(struct confile_save *)userdata;
+
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
- if(g_doris_server_info.write_file_sw)
+ if(save->business->write_file_sw)
{
doris_config_file_cfgfile_finish(instance, userdata);
}
@@ -581,16 +583,15 @@ void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md
void* thread_doris_client_recv_cfg(void *arg)
{
- struct event_base *manage_evbase=(struct event_base *)arg, *client_evbase;
- struct doris_parameter *param;
+ struct doris_business *business=(struct doris_business *)arg;
+ struct event_base *client_evbase;
struct doris_instance *instance;
struct doris_callbacks doris_cbs;
- struct doris_arguments doris_args={0, 0, 0};
+ struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type;
struct confile_save save;
char stored_path[512];
- struct timeval tv;
prctl(PR_SET_NAME, "client_recv");
@@ -599,6 +600,7 @@ void* thread_doris_client_recv_cfg(void *arg)
memset(&save, 0, sizeof(struct confile_save));
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
+ save.business = business;
scanner = doris_index_file_scanner(0);
@@ -611,10 +613,10 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
doris_cbs.userdata = &save;
- snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root);
+ snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
- snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root);
+ snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do {
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
@@ -630,24 +632,15 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
save.source_from = RECV_WAY_DRS_CLIENT;
+ memset(&doris_args, 0, sizeof(struct doris_arguments));
doris_args.current_version = scanner->cur_version;
- param = doris_parameter_new(NIRVANA_CONFIG_FILE, manage_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
- if(param == NULL)
- {
- assert(0);return NULL;
- }
-
- instance = doris_instance_new(param, client_evbase, g_doris_server_info.log_runtime);
+ sprintf(doris_args.bizname, "%s", business->bizname);
+ instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL)
{
assert(0);return NULL;
}
- evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
- tv.tv_sec = g_doris_server_info.fsstat_period;
- tv.tv_usec = 0;
- evtimer_add(&save.statistic.timer_statistic, &tv);
-
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
@@ -661,7 +654,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp)
struct timeval tv;
do {
- update_type = doris_index_file_traverse(timer_priv->scanner, g_doris_server_info.recv_path_inc,
+ update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc,
&timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime);
}while(update_type != CFG_UPDATE_TYPE_NONE);
@@ -672,6 +665,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp)
void* thread_index_file_recv_cfg(void *arg)
{
+ struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase;
struct confile_save save;
struct timeval tv;
@@ -688,8 +682,10 @@ void* thread_index_file_recv_cfg(void *arg)
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
+ save.business = business;
timer_priv.scanner = doris_index_file_scanner(0);
+ timer_priv.business = business;
/*Retaive latest config to memory from Stored configs*/
timer_priv.doris_cbs.version_start = doris_config_localmem_version_start;
@@ -700,10 +696,10 @@ void* thread_index_file_recv_cfg(void *arg)
timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
timer_priv.doris_cbs.userdata = &save;
- snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root);
+ snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
- snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root);
+ snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do{
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
@@ -718,7 +714,7 @@ void* thread_index_file_recv_cfg(void *arg)
timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
- update_type = doris_index_file_traverse(timer_priv.scanner, g_doris_server_info.recv_path_full,
+ update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full,
&timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
if(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR)
@@ -733,11 +729,6 @@ void* thread_index_file_recv_cfg(void *arg)
evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv);
evtimer_add(&timer_priv.timer_scanner, &tv);
- evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
- tv.tv_sec = g_doris_server_info.fsstat_period;
- tv.tv_usec = 0;
- evtimer_add(&save.statistic.timer_statistic, &tv);
-
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h
index 73ec493..457d708 100644
--- a/server/doris_server_receive.h
+++ b/server/doris_server_receive.h
@@ -7,34 +7,39 @@
#include <cjson/cJSON.h>
-#define MONITOR_STATUS_VERSION_ERR 3
-
enum DORIS_SERVER_FS_FILED
{
- DRS_FSSTAT_RECV_FULL_VER=0,
- DRS_FSSTAT_RECV_INC_VER,
- DRS_FSSTAT_RECV_ERR_VER,
+ DRS_FSSTAT_RECV_ERR_VER=0,
DRS_FSSTAT_RECV_START_FILES,
DRS_FSSTAT_RECV_CMPLT_FILES,
-
+
DRS_FSSTAT_CLIENT_INVALID_REQ,
DRS_FSSTAT_CLIENT_META_REQ,
- DRS_FSSTAT_SEND_META_RES,
DRS_FSSTAT_SEND_META_NONEW,
DRS_FSSTAT_CLIENT_FILE_REQ,
- DRS_FSSTAT_SEND_FILE_RES,
DRS_FSSTAT_SEND_FILE_BYTES,
DRS_FSSTAT_SEND_FILE_RES_404,
DRS_FSSTAT_FIELD_NUM,
};
+enum DORIS_SERVER_FS_COLUMN
+{
+ DRS_FSCLM_RECV_FULL_VER=0,
+ DRS_FSCLM_RECV_INC_VER,
+ DRS_FSCLM_RECV_FILES,
+ DRS_FSCLM_SEND_META_RES,
+ DRS_FSCLM_SEND_FILE_RES,
+ DRS_FSCLM_CUR_FULL_VERSION,
+ DRS_FSCLM_CUR_INC_VERSION,
+ DRS_FSCLM_CONFIG_TOTAL_NUM,
+
+ DRS_FSSTAT_CLUMN_NUM,
+};
+
enum DORIS_SERVER_FS_STATUS
{
DRS_FSSTAT_MEMORY_USED=0,
- DRS_FSSTAT_CUR_FULL_VERSION,
- DRS_FSSTAT_CUR_INC_VERSION,
- DRS_FSSTAT_CONFIG_TOTAL_NUM,
DRS_FSSTAT_STATUS_NUM,
};
@@ -82,21 +87,11 @@ struct version_list_handle
struct version_list_handle *config_version_handle_new(void);
-struct doris_srv_statistics
-{
- long long field[DRS_FSSTAT_FIELD_NUM];
- long long status[DRS_FSSTAT_STATUS_NUM];
-};
-
-struct worker_statistic_info
-{
- struct event timer_statistic;
- struct doris_srv_statistics statistic, statistic_last;
-};
-
+struct doris_business;
struct confile_save
{
struct event_base *evbase;
+ struct doris_business *business;
int64_t version;
int32_t source_from;
int32_t type;
@@ -108,8 +103,6 @@ struct confile_save
FILE *fp_cfg_file;
FILE *fp_idx_file;
- struct worker_statistic_info statistic;
-
struct version_list_node *cur_vernode;
struct table_list_node *cur_table;
struct cont_frag_node *cur_frag;
@@ -118,12 +111,9 @@ struct confile_save
struct common_timer_event
{
struct event timer_event;
- struct confile_save *save;
void *data;
};
-void doris_worker_statistic_timer_cb(int fd, short kind, void *userp);
-
void* thread_doris_client_recv_cfg(void *arg);
void* thread_index_file_recv_cfg(void *arg);