summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
author[email protected] <[email protected]>2021-07-19 17:21:38 +0800
committer[email protected] <[email protected]>2021-07-19 17:21:38 +0800
commit16a47fc07f454dd90e0b86a469799869b62c2794 (patch)
treef6008d76d707890bb7e7405435aa7e50f7f8fd5f /client
parent26b1a0850061a6fad963772991abcd6303cd50f3 (diff)
支持https;适应版本跳跃;增加md5校验;
Diffstat (limited to 'client')
-rw-r--r--client/doris_client_fetch.cpp102
-rw-r--r--client/doris_client_fetch.h30
-rw-r--r--client/nirvana_conhash.cpp1
3 files changed, 96 insertions, 37 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp
index 50ed19b..28318c6 100644
--- a/client/doris_client_fetch.cpp
+++ b/client/doris_client_fetch.cpp
@@ -15,6 +15,26 @@
#include "doris_client_fetch.h"
+static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
+{
+ unsigned char md5[17]={0};
+ int i;
+
+ if(MD5_Final(md5, c) != 1)
+ {
+ return -1;
+ }
+ if(size < 33)
+ return -1;
+
+ for(i=0; i<16; i++)
+ {
+ sprintf(result + i*2, "%02x", md5[i]);
+ }
+ result[32] = '\0';
+ return 0;
+}
+
void easy_string_destroy(struct easy_string *estr)
{
if(estr->buff != NULL)
@@ -55,7 +75,6 @@ void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
void doris_update_new_version(struct doris_instance *instance)
{
instance->cur_version = instance->new_version;
- instance->new_version += 1;
}
void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s)
@@ -84,6 +103,16 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance)
sub = cJSON_GetObjectItem(cur_a_item, "cfg_num");
instance->curmeta.cfg_num = sub->valueint;
+
+ if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5")))
+ {
+ instance->curmeta.validate_md5 = 1;
+ snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring);
+ }
+ else
+ {
+ instance->curmeta.validate_md5 = 0;
+ }
}
void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
@@ -102,7 +131,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
if(res_code != 200 && res_code!=206)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d",
- instance->curmeta.table_name, instance->new_version, code);
+ instance->curmeta.table_name, instance->req_version, code);
return;
}
instance->retry_times = 0;
@@ -110,6 +139,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
{
instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name,
instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata);
+ MD5_Init(&instance->ctx.md5ctx);
}
}
@@ -151,6 +181,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo
}
instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata);
+ MD5_Update(&instance->ctx.md5ctx, ptr, bytes);
instance->curmeta.curoffset += bytes;
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
}
@@ -159,34 +190,49 @@ void doris_http_fetch_confile(struct doris_instance *instance);
void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_instance *instance = (struct doris_instance *)userp;
+ char md5buffer[64];
+ bool direct_fail=false;
- if(instance->ctx.res_code != 200 && instance->ctx.res_code!=206)
+ if(res!=CURLE_OK)
{
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s",
+ instance->curmeta.table_name, instance->req_version, res_code, err);
goto out_error;
}
- if(res!=CURLE_OK || (res_code!=200 && res_code!=206))
+ if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s",
- instance->curmeta.table_name, instance->new_version, res_code, err);
goto out_error;
}
+
if(instance->ctx.contl_total != 0)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu",
- instance->curmeta.table_name, instance->new_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu",
+ instance->curmeta.table_name, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total);
}
else
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu",
- instance->curmeta.table_name, instance->new_version, instance->ctx.contlength, instance->curmeta.size);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu",
+ instance->curmeta.table_name, instance->req_version, instance->ctx.contlength, instance->curmeta.size);
}
instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1;
if(instance->curmeta.curoffset >= instance->curmeta.size) //����������
{
+ doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64);
+ if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer))
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s",
+ instance->curmeta.table_name, instance->req_version, md5buffer, instance->curmeta.md5str);
+ direct_fail=true;goto out_md5;
+ }
+ else
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s",
+ instance->curmeta.table_name, instance->req_version, md5buffer);
+ }
instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1;
- instance->param->cbs.cfgfile_finish(instance, instance->param->cbs.userdata);
+ instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata);
if(instance->array_index == instance->array_size)
{
instance->param->cbs.version_finish(instance, instance->param->cbs.userdata);
@@ -209,16 +255,17 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo
return;
out_error:
- instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1;
if(instance->ctx.res_code == 404) //404Ӧ������¿�ʼ����
{
- instance->retry_times = instance->param->fetch_max_tries;
+ direct_fail = true;
}
else
{
instance->retry_times++;
}
- if(instance->retry_times >= instance->param->fetch_max_tries)
+out_md5:
+ instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1;
+ if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail)
{
instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1;
instance->param->cbs.version_error(instance, instance->param->cbs.userdata);
@@ -249,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
doris_http_ctx_add_header(instance->ctx.httpctx, range);
}
- snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->new_version, instance->param->args.businessid);
+ snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid);
if(doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
@@ -259,7 +306,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
{
instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s",
- instance->curmeta.table_name, instance->new_version, range);
+ instance->curmeta.table_name, instance->req_version, range);
}
}
@@ -278,7 +325,7 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon
if(res_code != 200)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d",
- instance->cur_version, instance->new_version, code);
+ instance->cur_version, instance->req_version, code);
}
}
@@ -297,25 +344,29 @@ void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long
void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_instance *instance = (struct doris_instance *)userp;
+ cJSON *sub;
- if(instance->ctx.res_code != 200)
+ if(res!=CURLE_OK)
{
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
+ instance->cur_version, instance->req_version, res_code, err);
goto out_error;
}
- if(res!=CURLE_OK || res_code!=200)
+ if(instance->ctx.res_code != 200 || res_code!=200)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
- instance->cur_version, instance->new_version, res_code, err);
goto out_error;
}
instance->meta = cJSON_Parse(instance->estr.buff);
if(instance->meta == NULL)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->new_version, instance->estr.buff);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff);
goto out_error;
}
+ sub = cJSON_GetObjectItem(instance->meta, "version");
+ instance->new_version = sub->valuedouble;
+ instance->req_version = instance->new_version;
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s",
instance->cur_version, instance->estr.buff);
@@ -366,9 +417,10 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
}
+ instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż����
if(instance->ctx.httpctx != NULL)
{
- snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->new_version, instance->param->args.businessid);
+ snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid);
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->status = FETCH_STATUS_META;
@@ -387,7 +439,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->new_version);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version);
}
}
@@ -557,7 +609,7 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct
instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog;
instance->cur_version = param->args.current_version;
- instance->new_version = instance->cur_version + 1; //TODO
+ instance->req_version = instance->cur_version + 1; //TODO
instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog);
if(instance->httpins_master == NULL)
diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h
index 5b482eb..7125540 100644
--- a/client/doris_client_fetch.h
+++ b/client/doris_client_fetch.h
@@ -1,6 +1,7 @@
#ifndef __DORIS_CLIENT_FETCH_IN_H__
#define __DORIS_CLIENT_FETCH_IN_H__
+#include <openssl/md5.h>
#include <MESA/field_stat2.h>
#include "doris_client.h"
@@ -46,28 +47,32 @@ struct doris_parameter
int32_t fsstat_status[FSSTAT_DORIS_STATUS_NUM];
};
-struct md5_long
-{
- u_int64_t md5l;
- u_int64_t md5h;
-};
-
struct fetch_file_meta
{
const char *table_name;
size_t size;
size_t curoffset;
u_int32_t cfg_num;
- union {
- char md5[16];
- struct md5_long md5long;
- };
+ u_int32_t validate_md5;
+ char md5str[36];
+};
+
+struct md5_long
+{
+ u_int64_t md5l;
+ u_int64_t md5h;
+};
+union doris_md5
+{
+ char md5[16];
+ struct md5_long md5long;
};
struct doris_confile_ctx
{
struct doris_http_ctx *httpctx;
+ MD5_CTX md5ctx;
long res_code;
size_t contlength;
size_t contl_start;
@@ -81,8 +86,9 @@ struct doris_instance
u_int32_t retry_times;
struct doris_http_instance *cur_httpins;
- int64_t cur_version;
- int64_t new_version;
+ int64_t cur_version; //Ԫ��Ϣ
+ int64_t req_version; //�ļ�
+ int64_t new_version; //�µ�Ԫ��Ϣ
struct easy_string estr;
cJSON *meta, *array;
u_int32_t array_size;
diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp
index 9a95403..99674fd 100644
--- a/client/nirvana_conhash.cpp
+++ b/client/nirvana_conhash.cpp
@@ -348,6 +348,7 @@ enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const str
}
inner_bucket->bucket.bucket_id = bucket->bucket_id;
inner_bucket->bucket.tag = bucket->tag;
+ inner_bucket->bucket_index = bucket_index;
if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num)))
{