diff options
| author | [email protected] <[email protected]> | 2021-07-19 17:21:38 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-07-19 17:21:38 +0800 |
| commit | 16a47fc07f454dd90e0b86a469799869b62c2794 (patch) | |
| tree | f6008d76d707890bb7e7405435aa7e50f7f8fd5f | |
| parent | 26b1a0850061a6fad963772991abcd6303cd50f3 (diff) | |
支持https;适应版本跳跃;增加md5校验;
| -rw-r--r-- | client/doris_client_fetch.cpp | 102 | ||||
| -rw-r--r-- | client/doris_client_fetch.h | 30 | ||||
| -rw-r--r-- | client/nirvana_conhash.cpp | 1 | ||||
| -rw-r--r-- | include/doris_client.h | 2 | ||||
| -rw-r--r-- | monitor/配置分发网络运行状态-1626422695467.json | 22 | ||||
| -rw-r--r-- | server/bin/conf/doris_main.conf | 4 | ||||
| -rw-r--r-- | server/doris_server_http.cpp | 121 | ||||
| -rw-r--r-- | server/doris_server_main.cpp | 16 | ||||
| -rw-r--r-- | server/doris_server_main.h | 2 | ||||
| -rw-r--r-- | server/doris_server_receive.cpp | 31 | ||||
| -rw-r--r-- | server/doris_server_receive.h | 2 | ||||
| -rw-r--r-- | server/doris_server_scandir.cpp | 28 |
12 files changed, 287 insertions, 74 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 50ed19b..28318c6 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -15,6 +15,26 @@ #include "doris_client_fetch.h" +static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size) +{ + unsigned char md5[17]={0}; + int i; + + if(MD5_Final(md5, c) != 1) + { + return -1; + } + if(size < 33) + return -1; + + for(i=0; i<16; i++) + { + sprintf(result + i*2, "%02x", md5[i]); + } + result[32] = '\0'; + return 0; +} + void easy_string_destroy(struct easy_string *estr) { if(estr->buff != NULL) @@ -55,7 +75,6 @@ void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) void doris_update_new_version(struct doris_instance *instance) { instance->cur_version = instance->new_version; - instance->new_version += 1; } void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s) @@ -84,6 +103,16 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance) sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); instance->curmeta.cfg_num = sub->valueint; + + if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5"))) + { + instance->curmeta.validate_md5 = 1; + snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring); + } + else + { + instance->curmeta.validate_md5 = 0; + } } void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) @@ -102,7 +131,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code if(res_code != 200 && res_code!=206) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d", - instance->curmeta.table_name, instance->new_version, code); + instance->curmeta.table_name, instance->req_version, code); return; } instance->retry_times = 0; @@ -110,6 +139,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code { instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata); + MD5_Init(&instance->ctx.md5ctx); } } @@ -151,6 +181,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo } instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata); + MD5_Update(&instance->ctx.md5ctx, ptr, bytes); instance->curmeta.curoffset += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; } @@ -159,34 +190,49 @@ void doris_http_fetch_confile(struct doris_instance *instance); void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; + char md5buffer[64]; + bool direct_fail=false; - if(instance->ctx.res_code != 200 && instance->ctx.res_code!=206) + if(res!=CURLE_OK) { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", + instance->curmeta.table_name, instance->req_version, res_code, err); goto out_error; } - if(res!=CURLE_OK || (res_code!=200 && res_code!=206)) + if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", - instance->curmeta.table_name, instance->new_version, res_code, err); goto out_error; } + if(instance->ctx.contl_total != 0) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", - instance->curmeta.table_name, instance->new_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", + instance->curmeta.table_name, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", - instance->curmeta.table_name, instance->new_version, instance->ctx.contlength, instance->curmeta.size); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", + instance->curmeta.table_name, instance->req_version, instance->ctx.contlength, instance->curmeta.size); } instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1; if(instance->curmeta.curoffset >= instance->curmeta.size) //���ļ�������� { + doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64); + if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer)) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s", + instance->curmeta.table_name, instance->req_version, md5buffer, instance->curmeta.md5str); + direct_fail=true;goto out_md5; + } + else + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s", + instance->curmeta.table_name, instance->req_version, md5buffer); + } instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; - instance->param->cbs.cfgfile_finish(instance, instance->param->cbs.userdata); + instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata); if(instance->array_index == instance->array_size) { instance->param->cbs.version_finish(instance, instance->param->cbs.userdata); @@ -209,16 +255,17 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo return; out_error: - instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; if(instance->ctx.res_code == 404) //404Ӧ������¿�ʼ���� { - instance->retry_times = instance->param->fetch_max_tries; + direct_fail = true; } else { instance->retry_times++; } - if(instance->retry_times >= instance->param->fetch_max_tries) +out_md5: + instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; + if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail) { instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; instance->param->cbs.version_error(instance, instance->param->cbs.userdata); @@ -249,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) doris_http_ctx_add_header(instance->ctx.httpctx, range); } - snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->new_version, instance->param->args.businessid); + snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; @@ -259,7 +306,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s", - instance->curmeta.table_name, instance->new_version, range); + instance->curmeta.table_name, instance->req_version, range); } } @@ -278,7 +325,7 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon if(res_code != 200) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d", - instance->cur_version, instance->new_version, code); + instance->cur_version, instance->req_version, code); } } @@ -297,25 +344,29 @@ void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; + cJSON *sub; - if(instance->ctx.res_code != 200) + if(res!=CURLE_OK) { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", + instance->cur_version, instance->req_version, res_code, err); goto out_error; } - if(res!=CURLE_OK || res_code!=200) + if(instance->ctx.res_code != 200 || res_code!=200) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", - instance->cur_version, instance->new_version, res_code, err); goto out_error; } instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->new_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff); goto out_error; } + sub = cJSON_GetObjectItem(instance->meta, "version"); + instance->new_version = sub->valuedouble; + instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", instance->cur_version, instance->estr.buff); @@ -366,9 +417,10 @@ static void doris_http_fetch_meta(struct doris_instance *instance) instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); } + instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż���� if(instance->ctx.httpctx != NULL) { - snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->new_version, instance->param->args.businessid); + snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->status = FETCH_STATUS_META; @@ -387,7 +439,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->new_version); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version); } } @@ -557,7 +609,7 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; instance->cur_version = param->args.current_version; - instance->new_version = instance->cur_version + 1; //TODO + instance->req_version = instance->cur_version + 1; //TODO instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); if(instance->httpins_master == NULL) diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 5b482eb..7125540 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -1,6 +1,7 @@ #ifndef __DORIS_CLIENT_FETCH_IN_H__ #define __DORIS_CLIENT_FETCH_IN_H__ +#include <openssl/md5.h> #include <MESA/field_stat2.h> #include "doris_client.h" @@ -46,28 +47,32 @@ struct doris_parameter int32_t fsstat_status[FSSTAT_DORIS_STATUS_NUM]; }; -struct md5_long -{ - u_int64_t md5l; - u_int64_t md5h; -}; - struct fetch_file_meta { const char *table_name; size_t size; size_t curoffset; u_int32_t cfg_num; - union { - char md5[16]; - struct md5_long md5long; - }; + u_int32_t validate_md5; + char md5str[36]; +}; + +struct md5_long +{ + u_int64_t md5l; + u_int64_t md5h; +}; +union doris_md5 +{ + char md5[16]; + struct md5_long md5long; }; struct doris_confile_ctx { struct doris_http_ctx *httpctx; + MD5_CTX md5ctx; long res_code; size_t contlength; size_t contl_start; @@ -81,8 +86,9 @@ struct doris_instance u_int32_t retry_times; struct doris_http_instance *cur_httpins; - int64_t cur_version; - int64_t new_version; + int64_t cur_version; //Ԫ��Ϣ + int64_t req_version; //�ļ� + int64_t new_version; //�µ�Ԫ��Ϣ struct easy_string estr; cJSON *meta, *array; u_int32_t array_size; diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp index 9a95403..99674fd 100644 --- a/client/nirvana_conhash.cpp +++ b/client/nirvana_conhash.cpp @@ -348,6 +348,7 @@ enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const str } inner_bucket->bucket.bucket_id = bucket->bucket_id; inner_bucket->bucket.tag = bucket->tag; + inner_bucket->bucket_index = bucket_index; if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num))) { diff --git a/include/doris_client.h b/include/doris_client.h index fea6826..b994e11 100644 --- a/include/doris_client.h +++ b/include/doris_client.h @@ -57,7 +57,7 @@ struct doris_callbacks void (*version_start)(struct doris_instance *instance, cJSON *meta, void *userdata); //meta�������汾���������ڶ���Ч void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata); void (*cfgfile_update)(struct doris_instance *instance, const char *data, size_t len, void *userdata); - void (*cfgfile_finish)(struct doris_instance *instance, void *userdata); + void (*cfgfile_finish)(struct doris_instance *instance, const char *md5, void *userdata); void (*version_error)(struct doris_instance *instance, void *userdata); //�����ļ�ʧ�ܣ��ð汾��Ҫ�ع� void (*version_finish)(struct doris_instance *instance, void *userdata); }; diff --git a/monitor/配置分发网络运行状态-1626422695467.json b/monitor/配置分发网络运行状态-1626422695467.json index 8cfb5c9..f8777aa 100644 --- a/monitor/配置分发网络运行状态-1626422695467.json +++ b/monitor/配置分发网络运行状态-1626422695467.json @@ -65,7 +65,7 @@ "scroll": true, "showHeader": true, "sort": { - "col": 2, + "col": 13, "desc": false }, "styles": [ @@ -288,7 +288,7 @@ "unit": "none" }, { - "alias": "UserState", + "alias": "RecvState", "colorMode": "cell", "colors": [ "rgba(245, 54, 54, 0.9)", @@ -315,8 +315,8 @@ "value": "0" }, { - "text": "plug HTTP down", - "value": "10" + "text": "VersionError", + "value": "3" } ] }, @@ -449,25 +449,25 @@ "refId": "D" }, { - "expr": "0+user_defined_status{job=~\"doris\"}", + "expr": "0+latest_cfg_version{job=~\"doris\"}", "format": "table", "instant": true, "intervalFactor": 1, - "refId": "I" + "refId": "K" }, { - "expr": "0+latest_cfg_version{job=~\"doris\"}", + "expr": "0+total_config_num{job=~\"doris\"}", "format": "table", "instant": true, "intervalFactor": 1, - "refId": "K" + "refId": "L" }, { - "expr": "0+total_config_num{job=~\"doris\"}", + "expr": "0+user_defined_status{job=~\"doris\"}", "format": "table", "instant": true, "intervalFactor": 1, - "refId": "L" + "refId": "I" } ], "title": "Doris运行状态", @@ -514,5 +514,5 @@ "timezone": "", "title": "配置分发网络运行状态", "uid": "HZfW8wi7k", - "version": 5 + "version": 7 }
\ No newline at end of file diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf index 5ad1007..d0ea133 100644 --- a/server/bin/conf/doris_main.conf +++ b/server/bin/conf/doris_main.conf @@ -2,7 +2,9 @@ worker_thread_num=2 server_listen_port=9898 manage_listen_port=2233 +https_connection_on=1 +#1-Doris client; 2-local file receive_config_way=2 cache_file_frag_size=67108864 store_config_path=./doris_store_path @@ -35,7 +37,7 @@ fsstat_log_dst_port=8125 [DORIS_CLIENT.master_server] max_connection_per_host=1 max_cnnt_pipeline_num=10 -https_connection_on=0 +https_connection_on=1 max_curl_session_num=10 http_server_listen_port=9897 diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp index 69bbe29..1cff399 100644 --- a/server/doris_server_http.cpp +++ b/server/doris_server_http.cpp @@ -13,11 +13,12 @@ #include <sys/prctl.h> #include <poll.h> +#include <event2/bufferevent_ssl.h> + #include "doris_server_main.h" #include "doris_server_http.h" - -extern struct nirvana_global_info g_doris_server_info; +extern struct doris_global_info g_doris_server_info; static inline void set_sockopt_keepalive(int sd, int keepidle, int keepintvl, int keepcnt) { @@ -90,15 +91,18 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) if(NULL == (version = evhttp_find_header(¶ms, "version"))) { statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found"); return; } if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') { statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); return; } + evhttp_clear_headers(¶ms); pthread_rwlock_rdlock(&g_doris_server_info.rwlock); if(verlong > g_doris_server_info.cfgver_head->latest_version) @@ -108,6 +112,7 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found"); return; } + vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head); while(vernode->version < verlong) { @@ -193,6 +198,7 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/stream"); evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive"); evhttp_send_reply(req, HTTP_OK, "OK", evbuf); + evbuffer_free(evbuf); } void doris_http_server_file_cb(struct evhttp_request *req, void *arg) @@ -213,12 +219,14 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg) } if(NULL==(version=evhttp_find_header(¶ms, "version")) || NULL==(tablename=evhttp_find_header(¶ms, "tablename"))) { + evhttp_clear_headers(¶ms); statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found"); return; } if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') { + evhttp_clear_headers(¶ms); statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); return; @@ -226,12 +234,14 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg) if(NULL!=(content_range = evhttp_find_header(evhttp_request_get_input_headers(req), "Range")) && sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1) { + evhttp_clear_headers(¶ms); statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid"); return; } doris_response_file_range(req, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true, statistic); + evhttp_clear_headers(¶ms); } void doris_http_server_generic_cb(struct evhttp_request *req, void *arg) @@ -239,6 +249,103 @@ void doris_http_server_generic_cb(struct evhttp_request *req, void *arg) evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported."); } +pthread_t nirvana_pthreads_thread_id(void) +{ + return pthread_self(); +} + +void nirvana_pthreads_locking_callback(int mode, int type, const char *file, int line) +{ + if(mode & CRYPTO_LOCK) + { + pthread_mutex_lock(&g_doris_server_info.lock_cs[type]); + } + else + { + pthread_mutex_unlock(&g_doris_server_info.lock_cs[type]); + } +} + +int server_verify_callback(int ok, X509_STORE_CTX *ctx) +{ + X509 *client_cert; + char *subject, *issuer; + + client_cert = X509_STORE_CTX_get_current_cert(ctx); + + subject = X509_NAME_oneline(X509_get_subject_name(client_cert), 0, 0); + issuer = X509_NAME_oneline(X509_get_issuer_name(client_cert), 0, 0); + + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "ClientCert suject: %s, issuer: %s, state: %d.", subject, issuer, ok); + OPENSSL_free(subject); + OPENSSL_free(issuer); + return ok; +} + +SSL_CTX *doris_connections_create_ssl_ctx(void) +{ + int crypto_num; + SSL_CTX *ssl_ctx; + char session_id_appname[] = "DorisServer"; + + SSL_library_init(); + SSLeay_add_ssl_algorithms(); + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + ERR_load_BIO_strings(); + + crypto_num = CRYPTO_num_locks(); + g_doris_server_info.lock_cs = (pthread_mutex_t *)OPENSSL_malloc(crypto_num * sizeof(pthread_mutex_t)); + for(int i=0; i<crypto_num; i++) + { + pthread_mutex_init(&g_doris_server_info.lock_cs[i], NULL); + } + CRYPTO_set_id_callback(nirvana_pthreads_thread_id); + CRYPTO_set_locking_callback(nirvana_pthreads_locking_callback); + + ssl_ctx = SSL_CTX_new(SSLv23_server_method()); + //SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_PEER|SSL_VERIFY_FAIL_IF_NO_PEER_CERT|SSL_VERIFY_CLIENT_ONCE, server_verify_callback); + //����SESSION Resumption��˫�����ã���Ϊ��֤��˫��ģ� + SSL_CTX_set_session_cache_mode(ssl_ctx, SSL_SESS_CACHE_BOTH); + //�������HoldĬ��SSL_SESSION_CACHE_MAX_SIZE_DEFAULT(1024*20)��SESSION��0-�������� + SSL_CTX_sess_set_cache_size(ssl_ctx, SSL_SESSION_CACHE_MAX_SIZE_DEFAULT); + SSL_CTX_set_session_id_context(ssl_ctx, (unsigned char*)session_id_appname, strlen(session_id_appname)); + SSL_CTX_set_default_passwd_cb_userdata(ssl_ctx, g_doris_server_info.ssl_key_passwd); + + if(!SSL_CTX_load_verify_locations(ssl_ctx, NULL, g_doris_server_info.ssl_CA_path)) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_load_verify_locations error: %s.", ERR_reason_error_string(ERR_get_error())); + SSL_CTX_free(ssl_ctx); + return NULL; + } + if(!SSL_CTX_use_certificate_file(ssl_ctx, g_doris_server_info.ssl_cert_file, SSL_FILETYPE_PEM)) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_use_certificate_file error: %s.", ERR_reason_error_string(ERR_get_error())); + SSL_CTX_free(ssl_ctx); + return NULL; + } + if(SSL_CTX_use_PrivateKey_file(ssl_ctx, g_doris_server_info.ssl_key_file, SSL_FILETYPE_PEM) < 0) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_use_PrivateKey_file_pass error: %s.", ERR_reason_error_string(ERR_get_error())); + SSL_CTX_free(ssl_ctx); + return NULL; + } + if(!SSL_CTX_check_private_key(ssl_ctx)) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_check_private_key error: %s.", ERR_reason_error_string(ERR_get_error())); + SSL_CTX_free(ssl_ctx); + return NULL; + } + return ssl_ctx; +} + +struct bufferevent *doris_https_bufferevent_cb(struct event_base *evabse, void *arg) +{ + SSL_CTX *ssl_instance = (SSL_CTX *)arg; + + return bufferevent_openssl_socket_new(evabse, -1, SSL_new(ssl_instance), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE); +} + void* thread_doris_http_server(void *arg) { struct event_base *worker_evbase; @@ -253,6 +360,16 @@ void* thread_doris_http_server(void *arg) worker_http = evhttp_new(worker_evbase); + if(g_doris_server_info.ssl_conn_on) + { + g_doris_server_info.ssl_instance = doris_connections_create_ssl_ctx(); + if(g_doris_server_info.ssl_instance == NULL) + { + assert(0);return NULL; + } + evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance); + } + evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, &statistic); evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, &statistic); evhttp_set_gencb(worker_http, doris_http_server_generic_cb, &statistic); diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index b8a1301..6cb8462 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -16,8 +16,8 @@ #include "doris_server_main.h" #include "doris_server_http.h" -struct nirvana_global_info g_doris_server_info; -static unsigned long doris_vesion_20210716=20210716L; +struct doris_global_info g_doris_server_info; +static unsigned long doris_vesion_20210719=20210719L; int doris_mkdir_according_path(const char * path) { @@ -65,10 +65,10 @@ int32_t doris_read_profile_configs(const char *config_file) snprintf(tmp_dir, 256, "%s/runtime_log", g_doris_server_info.root_log_dir); if(doris_mkdir_according_path(tmp_dir)) { - printf("mkdir %s for duran runtimelog failed: %s\n", tmp_dir, strerror(errno)); + printf("mkdir %s for runtimelog failed: %s\n", tmp_dir, strerror(errno)); return -1; } - snprintf(tmp_buf, 256, "%s/nirvana_runtime.log", tmp_dir); + snprintf(tmp_buf, 256, "%s/doris_runtime.log", tmp_dir); g_doris_server_info.log_runtime = MESA_create_runtime_log_handle(tmp_buf, g_doris_server_info.log_level); if(NULL==g_doris_server_info.log_runtime) { @@ -117,7 +117,7 @@ int32_t doris_read_profile_configs(const char *config_file) MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10); } - MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "ssl_tcp_connection_on", &g_doris_server_info.ssl_conn_on, 0); + MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0); if(g_doris_server_info.ssl_conn_on) { MESA_load_profile_string_def(config_file, "DORIS_SERVER", "ssl_trusted_ca_path", g_doris_server_info.ssl_CA_path, 256, "./conf/ssl_CA_path/"); @@ -128,7 +128,7 @@ int32_t doris_read_profile_configs(const char *config_file) /*FiledStat*/ MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_appname", g_doris_server_info.fsstat_appname, 16, "DORIS_SERVER_S"); - MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_filepath", g_doris_server_info.fsstat_filepath, 256, "./log/nirvana_server.fs"); + MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_filepath", g_doris_server_info.fsstat_filepath, 256, "./log/doris_server.fs"); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "fsstat_log_interval", &g_doris_server_info.fsstat_period, 10); MESA_load_profile_int_def(config_file, "DORIS_SERVER", "fsstat_log_print_mode", &g_doris_server_info.fsstat_print_mode, 1); MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_dst_ip", g_doris_server_info.fsstat_dst_ip, 64, "127.0.0.1"); @@ -141,7 +141,7 @@ int32_t doris_read_profile_configs(const char *config_file) return 0; } -static int doris_server_register_field_stat(struct nirvana_global_info *param) +static int doris_server_register_field_stat(struct doris_global_info *param) { const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvFullVer", "RecvIncVer", "RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq", "ClientMetaReq", "SendResMeta", "SendNoNewMeta", "ClientFileReq", "SendFiles", "SendBytes", "SendFile404"}; @@ -263,7 +263,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210716); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210719); g_doris_server_info.mm_latest_ver = MESA_Monitor_register(g_doris_server_info.monitor, "latest_cfg_version", MONITOR_METRICS_GAUGE, "Latest doris config version."); g_doris_server_info.mm_total_cfgnum = MESA_Monitor_register(g_doris_server_info.monitor, "total_config_num", MONITOR_METRICS_GAUGE, "Total config num from latest full version till now."); diff --git a/server/doris_server_main.h b/server/doris_server_main.h index 75b5f56..f91694b 100644 --- a/server/doris_server_main.h +++ b/server/doris_server_main.h @@ -31,7 +31,7 @@ #define RECV_WAY_DRS_CLIENT 1 #define RECV_WAY_IDX_FILE 2 -struct nirvana_global_info +struct doris_global_info { u_int32_t iothreads; int32_t server_port; diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 113c0e0..3538f4f 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -22,7 +22,7 @@ struct scanner_timer_priv struct event timer_scanner; }; -extern struct nirvana_global_info g_doris_server_info; +extern struct doris_global_info g_doris_server_info; void doris_worker_statistic_timer_cb(int fd, short kind, void *userp) @@ -90,6 +90,7 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_ free(vernode->metacont); cJSON_Delete(vernode->metajson); cJSON_Delete(vernode->arrayjson); + cJSON_Delete(vernode->table_meta); free(vernode); } @@ -313,13 +314,11 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; - cJSON *table_meta; - table_meta = cJSON_CreateObject(); - cJSON_AddStringToObject(table_meta, "tablename", tablename); - cJSON_AddNumberToObject(table_meta, "cfg_num", cfgnum); - cJSON_AddNumberToObject(table_meta, "size", size); - cJSON_AddItemToArray(save->cur_vernode->arrayjson, table_meta); + save->cur_vernode->table_meta = cJSON_CreateObject(); + cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename); + cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum); + cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size); save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node)); snprintf(save->cur_table->tablename, 64, "%s", tablename); @@ -374,10 +373,14 @@ void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char assert(save->cur_table->cur_totallen <= save->cur_table->filesize); } -void doris_config_mem_cfgfile_finish(struct doris_instance *instance, void *userdata) +void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; + cJSON_AddStringToObject(save->cur_vernode->table_meta, "md5", md5); + cJSON_AddItemToArray(save->cur_vernode->arrayjson, save->cur_vernode->table_meta); + save->cur_vernode->table_meta = NULL; + if(save->cur_frag != NULL) { TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node); @@ -421,12 +424,16 @@ void doris_config_common_version_finish(struct confile_save *save) MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_latest_ver, MONITOR_VALUE_SET, save->version); MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_total_cfgnum, MONITOR_VALUE_SET, save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM]); + MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, MONITOR_STATUS_VERSION_ERR, NULL, NULL); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu update finished", save->version);
} void doris_config_common_version_error(struct confile_save *save) { save->statistic.statistic.field[DRS_FSSTAT_RECV_ERR_VER] += 1; + //Grafana+Promethues��չʾ�ڲ��쳣״̬ + MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, MONITOR_STATUS_VERSION_ERR, + "Version receive error", "Receive config file error, please check producer"); } void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum) @@ -485,12 +492,12 @@ void doris_config_localmem_cfgfile_update(struct doris_instance *instance, const } } -void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, void *userdata) +void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) { doris_config_common_cfgfile_finish((struct confile_save *)userdata); if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_finish(instance, userdata); + doris_config_mem_cfgfile_finish(instance, md5, userdata); } } @@ -559,7 +566,7 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da } } -void doris_config_cfgfile_finish(struct doris_instance *instance, void *userdata) +void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) { doris_config_common_cfgfile_finish((struct confile_save *)userdata); if(g_doris_server_info.write_file_sw) @@ -568,7 +575,7 @@ void doris_config_cfgfile_finish(struct doris_instance *instance, void *userdata } if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_finish(instance, userdata); + doris_config_mem_cfgfile_finish(instance, md5, userdata); } } diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h index 4de505b..73ec493 100644 --- a/server/doris_server_receive.h +++ b/server/doris_server_receive.h @@ -7,6 +7,7 @@ #include <cjson/cJSON.h> +#define MONITOR_STATUS_VERSION_ERR 3 enum DORIS_SERVER_FS_FILED { @@ -66,6 +67,7 @@ struct version_list_node int32_t metalen; int32_t cfg_type; //1-full, 2-inc cJSON *metajson, *arrayjson; + cJSON *table_meta; TAILQ_HEAD(__table_list_node, table_list_node) table_head; TAILQ_ENTRY(version_list_node) version_node; diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp index de55e4a..2ac8696 100644 --- a/server/doris_server_scandir.cpp +++ b/server/doris_server_scandir.cpp @@ -4,6 +4,7 @@ #include <string.h> #include <sys/stat.h> #include <openssl/evp.h> +#include <openssl/md5.h> #include <assert.h> #include <MESA/MESA_handle_logger.h> @@ -16,6 +17,26 @@ #define MESA_RUNTIME_LOGV4(handle, lv, fmt, args...) \ MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args) +static int scandir_md5_final_string(MD5_CTX *c, char *result, unsigned int size) +{ + unsigned char md5[17]={0}; + int i; + + if(MD5_Final(md5, c) != 1) + { + return -1; + } + if(size < 33) + return -1; + + for(i=0; i<16; i++) + { + sprintf(result + i*2, "%02x", md5[i]); + } + result[32] = '\0'; + return 0; +} + //replacement of glibc scandir, to adapt dictator malloc wrap #define ENLARGE_STEP 1024 int my_scandir(const char *dir, struct dirent ***namelist, @@ -209,12 +230,15 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab { FILE *fp; size_t readlen, remainlen, oncesize; + MD5_CTX md5ctx; + char md5buffer[64]; if((fp = fopen(table->cfg_path, "r")) == NULL) { MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "fopen table file %s failed: %s", table->cfg_path, strerror(errno)); return false; } + MD5_Init(&md5ctx); doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, doris_cbs->userdata); remainlen = table->filesize; @@ -225,8 +249,10 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab assert(readlen == oncesize); remainlen -= readlen; doris_cbs->cfgfile_update(NULL, scanner->oncebuf, readlen, doris_cbs->userdata); + MD5_Update(&md5ctx, scanner->oncebuf, readlen); } - doris_cbs->cfgfile_finish(NULL, doris_cbs->userdata); + scandir_md5_final_string(&md5ctx, md5buffer, 64); + doris_cbs->cfgfile_finish(NULL, md5buffer, doris_cbs->userdata); fclose(fp); return true; } |
