diff options
| author | [email protected] <[email protected]> | 2021-07-19 17:21:38 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-07-19 17:21:38 +0800 |
| commit | 16a47fc07f454dd90e0b86a469799869b62c2794 (patch) | |
| tree | f6008d76d707890bb7e7405435aa7e50f7f8fd5f /client | |
| parent | 26b1a0850061a6fad963772991abcd6303cd50f3 (diff) | |
支持https;适应版本跳跃;增加md5校验;
Diffstat (limited to 'client')
| -rw-r--r-- | client/doris_client_fetch.cpp | 102 | ||||
| -rw-r--r-- | client/doris_client_fetch.h | 30 | ||||
| -rw-r--r-- | client/nirvana_conhash.cpp | 1 |
3 files changed, 96 insertions, 37 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 50ed19b..28318c6 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -15,6 +15,26 @@ #include "doris_client_fetch.h" +static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size) +{ + unsigned char md5[17]={0}; + int i; + + if(MD5_Final(md5, c) != 1) + { + return -1; + } + if(size < 33) + return -1; + + for(i=0; i<16; i++) + { + sprintf(result + i*2, "%02x", md5[i]); + } + result[32] = '\0'; + return 0; +} + void easy_string_destroy(struct easy_string *estr) { if(estr->buff != NULL) @@ -55,7 +75,6 @@ void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) void doris_update_new_version(struct doris_instance *instance) { instance->cur_version = instance->new_version; - instance->new_version += 1; } void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s) @@ -84,6 +103,16 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance) sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); instance->curmeta.cfg_num = sub->valueint; + + if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5"))) + { + instance->curmeta.validate_md5 = 1; + snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring); + } + else + { + instance->curmeta.validate_md5 = 0; + } } void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) @@ -102,7 +131,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code if(res_code != 200 && res_code!=206) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d", - instance->curmeta.table_name, instance->new_version, code); + instance->curmeta.table_name, instance->req_version, code); return; } instance->retry_times = 0; @@ -110,6 +139,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code { instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata); + MD5_Init(&instance->ctx.md5ctx); } } @@ -151,6 +181,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo } instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata); + MD5_Update(&instance->ctx.md5ctx, ptr, bytes); instance->curmeta.curoffset += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; } @@ -159,34 +190,49 @@ void doris_http_fetch_confile(struct doris_instance *instance); void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; + char md5buffer[64]; + bool direct_fail=false; - if(instance->ctx.res_code != 200 && instance->ctx.res_code!=206) + if(res!=CURLE_OK) { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", + instance->curmeta.table_name, instance->req_version, res_code, err); goto out_error; } - if(res!=CURLE_OK || (res_code!=200 && res_code!=206)) + if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", - instance->curmeta.table_name, instance->new_version, res_code, err); goto out_error; } + if(instance->ctx.contl_total != 0) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", - instance->curmeta.table_name, instance->new_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", + instance->curmeta.table_name, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", - instance->curmeta.table_name, instance->new_version, instance->ctx.contlength, instance->curmeta.size); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", + instance->curmeta.table_name, instance->req_version, instance->ctx.contlength, instance->curmeta.size); } instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1; if(instance->curmeta.curoffset >= instance->curmeta.size) //���ļ�������� { + doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64); + if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer)) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s", + instance->curmeta.table_name, instance->req_version, md5buffer, instance->curmeta.md5str); + direct_fail=true;goto out_md5; + } + else + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s", + instance->curmeta.table_name, instance->req_version, md5buffer); + } instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; - instance->param->cbs.cfgfile_finish(instance, instance->param->cbs.userdata); + instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata); if(instance->array_index == instance->array_size) { instance->param->cbs.version_finish(instance, instance->param->cbs.userdata); @@ -209,16 +255,17 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo return; out_error: - instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; if(instance->ctx.res_code == 404) //404Ӧ������¿�ʼ���� { - instance->retry_times = instance->param->fetch_max_tries; + direct_fail = true; } else { instance->retry_times++; } - if(instance->retry_times >= instance->param->fetch_max_tries) +out_md5: + instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; + if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail) { instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; instance->param->cbs.version_error(instance, instance->param->cbs.userdata); @@ -249,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) doris_http_ctx_add_header(instance->ctx.httpctx, range); } - snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->new_version, instance->param->args.businessid); + snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; @@ -259,7 +306,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s", - instance->curmeta.table_name, instance->new_version, range); + instance->curmeta.table_name, instance->req_version, range); } } @@ -278,7 +325,7 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon if(res_code != 200) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d", - instance->cur_version, instance->new_version, code); + instance->cur_version, instance->req_version, code); } } @@ -297,25 +344,29 @@ void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; + cJSON *sub; - if(instance->ctx.res_code != 200) + if(res!=CURLE_OK) { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", + instance->cur_version, instance->req_version, res_code, err); goto out_error; } - if(res!=CURLE_OK || res_code!=200) + if(instance->ctx.res_code != 200 || res_code!=200) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", - instance->cur_version, instance->new_version, res_code, err); goto out_error; } instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->new_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff); goto out_error; } + sub = cJSON_GetObjectItem(instance->meta, "version"); + instance->new_version = sub->valuedouble; + instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", instance->cur_version, instance->estr.buff); @@ -366,9 +417,10 @@ static void doris_http_fetch_meta(struct doris_instance *instance) instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); } + instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż���� if(instance->ctx.httpctx != NULL) { - snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->new_version, instance->param->args.businessid); + snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->status = FETCH_STATUS_META; @@ -387,7 +439,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->new_version); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version); } } @@ -557,7 +609,7 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; instance->cur_version = param->args.current_version; - instance->new_version = instance->cur_version + 1; //TODO + instance->req_version = instance->cur_version + 1; //TODO instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); if(instance->httpins_master == NULL) diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 5b482eb..7125540 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -1,6 +1,7 @@ #ifndef __DORIS_CLIENT_FETCH_IN_H__ #define __DORIS_CLIENT_FETCH_IN_H__ +#include <openssl/md5.h> #include <MESA/field_stat2.h> #include "doris_client.h" @@ -46,28 +47,32 @@ struct doris_parameter int32_t fsstat_status[FSSTAT_DORIS_STATUS_NUM]; }; -struct md5_long -{ - u_int64_t md5l; - u_int64_t md5h; -}; - struct fetch_file_meta { const char *table_name; size_t size; size_t curoffset; u_int32_t cfg_num; - union { - char md5[16]; - struct md5_long md5long; - }; + u_int32_t validate_md5; + char md5str[36]; +}; + +struct md5_long +{ + u_int64_t md5l; + u_int64_t md5h; +}; +union doris_md5 +{ + char md5[16]; + struct md5_long md5long; }; struct doris_confile_ctx { struct doris_http_ctx *httpctx; + MD5_CTX md5ctx; long res_code; size_t contlength; size_t contl_start; @@ -81,8 +86,9 @@ struct doris_instance u_int32_t retry_times; struct doris_http_instance *cur_httpins; - int64_t cur_version; - int64_t new_version; + int64_t cur_version; //Ԫ��Ϣ + int64_t req_version; //�ļ� + int64_t new_version; //�µ�Ԫ��Ϣ struct easy_string estr; cJSON *meta, *array; u_int32_t array_size; diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp index 9a95403..99674fd 100644 --- a/client/nirvana_conhash.cpp +++ b/client/nirvana_conhash.cpp @@ -348,6 +348,7 @@ enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const str } inner_bucket->bucket.bucket_id = bucket->bucket_id; inner_bucket->bucket.tag = bucket->tag; + inner_bucket->bucket_index = bucket_index; if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num))) { |
